Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,12 @@ Flight.ActionType toProtocol() {
.setDescription(description)
.build();
}

@Override
public String toString() {
return "ActionType{" +
"type='" + type + '\'' +
", description='" + description + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.arrow.flight.grpc.StatusUtils;

/**
* A handler for server-sent application metadata messages during a Flight DoPut operation.
*
Expand Down Expand Up @@ -53,7 +55,7 @@ public void onNext(PutResult val) {

@Override
public final void onError(Throwable t) {
completed.completeExceptionally(t);
completed.completeExceptionally(StatusUtils.fromThrowable(t));
}

@Override
Expand Down
95 changes: 95 additions & 0 deletions java/flight/src/main/java/org/apache/arrow/flight/CallStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

import java.util.Objects;

/**
* The result of a Flight RPC.
*/
public class CallStatus {

private final FlightStatusCode code;
private final Throwable cause;
private final String description;

public static final CallStatus OK = FlightStatusCode.OK.toStatus();
public static final CallStatus UNKNOWN = FlightStatusCode.UNKNOWN.toStatus();
public static final CallStatus INTERNAL = FlightStatusCode.INTERNAL.toStatus();
public static final CallStatus INVALID_ARGUMENT = FlightStatusCode.INVALID_ARGUMENT.toStatus();
public static final CallStatus TIMED_OUT = FlightStatusCode.TIMED_OUT.toStatus();
public static final CallStatus CANCELLED = FlightStatusCode.CANCELLED.toStatus();
public static final CallStatus UNAUTHENTICATED = FlightStatusCode.UNAUTHENTICATED.toStatus();
public static final CallStatus UNAUTHORIZED = FlightStatusCode.UNAUTHORIZED.toStatus();
public static final CallStatus UNIMPLEMENTED = FlightStatusCode.UNIMPLEMENTED.toStatus();
public static final CallStatus UNAVAILABLE = FlightStatusCode.UNAVAILABLE.toStatus();

/**
* Create a new status.
* @param code The status code.
* @param cause An exception that resulted in this status (or null).
* @param description A description of the status (or null).
*/
public CallStatus(FlightStatusCode code, Throwable cause, String description) {
this.code = Objects.requireNonNull(code);
this.cause = cause;
this.description = description == null ? "" : description;
}

/**
* The status code describing the result of the RPC.
*/
public FlightStatusCode code() {
return code;
}

/**
* The exception that led to this result. May be null.
*/
public Throwable cause() {
return cause;
}

/**
* A description of the result.
*/
public String description() {
return description;
}

/**
* Return a copy of this status with an error message.
*/
public CallStatus withDescription(String message) {
return new CallStatus(code, cause, message);
}

/**
* Return a copy of this status with the given exception as the cause. This will not be sent over the wire.
*/
public CallStatus withCause(Throwable t) {
return new CallStatus(code, t, description);
}

/**
* Convert the status to an equivalent exception.
*/
public FlightRuntimeException toRuntimeException() {
return new FlightRuntimeException(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.flight;

import java.util.Set;
import java.util.concurrent.ExecutorService;

import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.impl.Flight;
Expand Down Expand Up @@ -48,9 +49,9 @@ class FlightBindingService implements BindableService {
private final BufferAllocator allocator;

public FlightBindingService(BufferAllocator allocator, FlightProducer producer,
ServerAuthHandler authHandler) {
ServerAuthHandler authHandler, ExecutorService executor) {
this.allocator = allocator;
this.delegate = new FlightService(allocator, producer, authHandler);
this.delegate = new FlightService(allocator, producer, authHandler, executor);
}

public static MethodDescriptor<Flight.Ticket, ArrowMessage> getDoGetDescriptor(BufferAllocator allocator) {
Expand Down
90 changes: 53 additions & 37 deletions java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.net.ssl.SSLException;

Expand All @@ -30,6 +29,7 @@
import org.apache.arrow.flight.auth.ClientAuthHandler;
import org.apache.arrow.flight.auth.ClientAuthInterceptor;
import org.apache.arrow.flight.auth.ClientAuthWrapper;
import org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.Empty;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
Expand All @@ -43,12 +43,11 @@
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;

import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
Expand Down Expand Up @@ -96,18 +95,22 @@ private FlightClient(BufferAllocator incomingAllocator, ManagedChannel channel)
* @return FlightInfo Iterable
*/
public Iterable<FlightInfo> listFlights(Criteria criteria, CallOption... options) {
return ImmutableList.copyOf(CallOptions.wrapStub(blockingStub, options).listFlights(criteria.asCriteria()))
.stream()
.map(t -> {
try {
return new FlightInfo(t);
} catch (URISyntaxException e) {
// We don't expect this will happen for conforming Flight implementations. For instance, a Java server
// itself wouldn't be able to construct an invalid Location.
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
final Iterator<Flight.FlightInfo> flights;
try {
flights = CallOptions.wrapStub(blockingStub, options)
.listFlights(criteria.asCriteria());
} catch (StatusRuntimeException sre) {
throw StatusUtils.fromGrpcRuntimeException(sre);
}
return () -> StatusUtils.wrapIterator(flights, t -> {
try {
return new FlightInfo(t);
} catch (URISyntaxException e) {
// We don't expect this will happen for conforming Flight implementations. For instance, a Java server
// itself wouldn't be able to construct an invalid Location.
throw new RuntimeException(e);
}
});
}

/**
Expand All @@ -116,11 +119,14 @@ public Iterable<FlightInfo> listFlights(Criteria criteria, CallOption... options
* @param options RPC-layer hints for the call.
*/
public Iterable<ActionType> listActions(CallOption... options) {
return ImmutableList.copyOf(CallOptions.wrapStub(blockingStub, options)
.listActions(Empty.getDefaultInstance()))
.stream()
.map(ActionType::new)
.collect(Collectors.toList());
final Iterator<Flight.ActionType> actions;
try {
actions = CallOptions.wrapStub(blockingStub, options)
.listActions(Empty.getDefaultInstance());
} catch (StatusRuntimeException sre) {
throw StatusUtils.fromGrpcRuntimeException(sre);
}
return () -> StatusUtils.wrapIterator(actions, ActionType::new);
}

/**
Expand All @@ -131,8 +137,8 @@ public Iterable<ActionType> listActions(CallOption... options) {
* @return An iterator of results.
*/
public Iterator<Result> doAction(Action action, CallOption... options) {
return Iterators
.transform(CallOptions.wrapStub(blockingStub, options).doAction(action.toProtocol()), Result::new);
return StatusUtils
.wrapIterator(CallOptions.wrapStub(blockingStub, options).doAction(action.toProtocol()), Result::new);
}

/**
Expand Down Expand Up @@ -183,16 +189,20 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo
Preconditions.checkNotNull(descriptor);
Preconditions.checkNotNull(root);

SetStreamObserver resultObserver = new SetStreamObserver(allocator, metadataListener);
final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
ClientCalls.asyncBidiStreamingCall(
authInterceptor.interceptCall(doPutDescriptor, callOptions, channel), resultObserver);
// send the schema to start.
DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext);
return new PutObserver(new VectorUnloader(
root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */),
observer, metadataListener);
try {
SetStreamObserver resultObserver = new SetStreamObserver(allocator, metadataListener);
final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
ClientCalls.asyncBidiStreamingCall(
authInterceptor.interceptCall(doPutDescriptor, callOptions, channel), resultObserver);
// send the schema to start.
DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext);
return new PutObserver(new VectorUnloader(
root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */),
observer, metadataListener);
} catch (StatusRuntimeException sre) {
throw StatusUtils.fromGrpcRuntimeException(sre);
}
}

/**
Expand All @@ -207,6 +217,8 @@ public FlightInfo getInfo(FlightDescriptor descriptor, CallOption... options) {
// We don't expect this will happen for conforming Flight implementations. For instance, a Java server
// itself wouldn't be able to construct an invalid Location.
throw new RuntimeException(e);
} catch (StatusRuntimeException sre) {
throw StatusUtils.fromGrpcRuntimeException(sre);
}
}

Expand Down Expand Up @@ -241,7 +253,7 @@ public void onNext(ArrowMessage value) {

@Override
public void onError(Throwable t) {
delegate.onError(t);
delegate.onError(StatusUtils.toGrpcException(t));
}

@Override
Expand Down Expand Up @@ -274,7 +286,7 @@ public void onNext(Flight.PutResult value) {

@Override
public void onError(Throwable t) {
listener.onError(t);
listener.onError(StatusUtils.fromThrowable(t));
}

@Override
Expand Down Expand Up @@ -307,13 +319,17 @@ public void putNext(ArrowBuf appMetadata) {
while (!observer.isReady()) {
/* busy wait */
}
// Takes ownership of appMetadata
observer.onNext(new ArrowMessage(batch, appMetadata));
try {
// Takes ownership of appMetadata
observer.onNext(new ArrowMessage(batch, appMetadata));
} catch (StatusRuntimeException sre) {
throw StatusUtils.fromGrpcRuntimeException(sre);
}
}

@Override
public void error(Throwable ex) {
observer.onError(ex);
observer.onError(StatusUtils.toGrpcException(ex));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

/**
* An exception raised from a Flight RPC.
*
* <p>In service implementations, raising an instance of this exception will provide clients with a more detailed
* message and error code.
*/
public class FlightRuntimeException extends RuntimeException {
private final CallStatus status;

/**
* Create a new exception from the given status.
*/
FlightRuntimeException(CallStatus status) {
super(status.description(), status.cause());
this.status = status;
}

public CallStatus status() {
return status;
}
}
Loading