Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.util.VisibleForTesting;

import io.grpc.Server;
import io.grpc.ServerInterceptors;
Expand All @@ -58,14 +59,19 @@ public class FlightServer implements AutoCloseable {

private final Location location;
private final Server server;
// The executor used by the gRPC server. We don't use it here, but we do need to clean it up with the server.
// May be null, if a user-supplied executor was provided (as we do not want to clean that up)
@VisibleForTesting
final ExecutorService grpcExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid exposing this if you added this to awaitTermination as well (but that is a little tricky to do correctly). I'm OK if you don't want to do it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how would that work in the test? We'd shut down the executor in awaitTermination and wait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have thought we shutdown the executorservice in shutdown

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er, right. I can do that, but I still don't see how that'd let us avoid exposing it in test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it tests the behavior implicitly vs explicitly as the way you have it now. Like I said I'm fine with either approach.


/** The maximum size of an individual gRPC message. This effectively disables the limit. */
static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE;

/** Create a new instance from a gRPC server. For internal use only. */
private FlightServer(Location location, Server server) {
private FlightServer(Location location, Server server, ExecutorService grpcExecutor) {
this.location = location;
this.server = server;
this.grpcExecutor = grpcExecutor;
}

/** Start the server. */
Expand Down Expand Up @@ -103,6 +109,9 @@ public void awaitTermination() throws InterruptedException {
/** Request that the server shut down. */
public void shutdown() {
server.shutdown();
if (grpcExecutor != null) {
grpcExecutor.shutdown();
}
}

/**
Expand Down Expand Up @@ -227,13 +236,24 @@ public FlightServer build() {
}

// Share one executor between the gRPC service, DoPut, and Handshake
final ExecutorService exec = executor != null ? executor : Executors.newCachedThreadPool();
final ExecutorService exec;
// We only want to have FlightServer close the gRPC executor if we created it here. We should not close
// user-supplied executors.
final ExecutorService grpcExecutor;
if (executor != null) {
exec = executor;
grpcExecutor = null;
} else {
exec = Executors.newCachedThreadPool();
grpcExecutor = exec;
}
final FlightBindingService flightService = new FlightBindingService(allocator, producer, authHandler, exec);
builder
.executor(exec)
.maxInboundMessageSize(maxInboundMessageSize)
.addService(
ServerInterceptors.intercept(
new FlightBindingService(allocator, producer, authHandler, exec),
flightService,
new ServerAuthInterceptor(authHandler)));

// Allow hooking into the gRPC builder. This is not guaranteed to be available on all Arrow versions or
Expand All @@ -259,7 +279,7 @@ public FlightServer build() {
});

builder.intercept(new ServerInterceptorAdapter(interceptors));
return new FlightServer(location, builder.build());
return new FlightServer(location, builder.build(), grpcExecutor);
}

/**
Expand Down Expand Up @@ -294,6 +314,9 @@ public Builder useTls(final InputStream certChain, final InputStream key) {

/**
* Set the executor used by the server.
*
* <p>Flight will NOT take ownership of the executor. The application must clean it up if one is provided. (If not
* provided, Flight will use a default executor which it will clean up.)
*/
public Builder executor(ExecutorService executor) {
this.executor = executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.arrow.flight;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -54,6 +56,48 @@ public void builderConsumer() throws Exception {
}
}

/**
* Make sure that if Flight supplies a default executor to gRPC, then it is closed along with the server.
*/
@Test
public void defaultExecutorClosed() throws Exception {
final ExecutorService executor;
try (
BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
FlightServer server =
FlightTestUtil.getStartedServer(
(location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
.build()
)) {
Assert.assertNotNull(server.grpcExecutor);
executor = server.grpcExecutor;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you try shutting down here and verify it does shutdown?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there's no accessible reference to the actual executor to confirm it afterwards.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could plumb one through, but it'd be adding references specifically for testing.

(Sorry for the delay again, October was quite the busy month.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making resources less generic and take the reference to ExecutorService in the class instead of list of AutoCloseable? Then make awaitTermination rely on both the server and the executor (if its not null). That way you don't need anything specifically visible for testing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I've updated the PR.

Assert.assertTrue(executor.isShutdown());
}

/**
* Make sure that if the user provides an executor to gRPC, then Flight does not close it.
*/
@Test
public void suppliedExecutorNotClosed() throws Exception {
final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
try (
BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
FlightServer server =
FlightTestUtil.getStartedServer(
(location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
.executor(executor)
.build()
)) {
Assert.assertNull(server.grpcExecutor);
}
Assert.assertFalse(executor.isShutdown());
} finally {
executor.shutdown();
}
}

@Test
public void domainSocket() throws Exception {
Assume.assumeTrue("We have a native transport available", FlightTestUtil.isNativeTransportAvailable());
Expand Down