From 60a42018e3b19a980c84abe3ff7cd4daf75520c2 Mon Sep 17 00:00:00 2001
From: David Li
Date: Sat, 12 Oct 2019 11:08:06 -0400
Subject: [PATCH] ARROW-6867: [FlightRPC][Java] clean up default executor
---
.../org/apache/arrow/flight/FlightServer.java | 31 +++++++++++--
.../arrow/flight/TestServerOptions.java | 44 +++++++++++++++++++
2 files changed, 71 insertions(+), 4 deletions(-)
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java
index 80963527334..74cb472e6db 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -40,6 +40,7 @@
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.util.VisibleForTesting;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
@@ -58,14 +59,19 @@ public class FlightServer implements AutoCloseable {
private final Location location;
private final Server server;
+ // The executor used by the gRPC server. We don't use it here, but we do need to clean it up with the server.
+ // May be null, if a user-supplied executor was provided (as we do not want to clean that up)
+ @VisibleForTesting
+ final ExecutorService grpcExecutor;
/** The maximum size of an individual gRPC message. This effectively disables the limit. */
static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE;
/** Create a new instance from a gRPC server. For internal use only. */
- private FlightServer(Location location, Server server) {
+ private FlightServer(Location location, Server server, ExecutorService grpcExecutor) {
this.location = location;
this.server = server;
+ this.grpcExecutor = grpcExecutor;
}
/** Start the server. */
@@ -103,6 +109,9 @@ public void awaitTermination() throws InterruptedException {
/** Request that the server shut down. */
public void shutdown() {
server.shutdown();
+ if (grpcExecutor != null) {
+ grpcExecutor.shutdown();
+ }
}
/**
@@ -227,13 +236,24 @@ public FlightServer build() {
}
// Share one executor between the gRPC service, DoPut, and Handshake
- final ExecutorService exec = executor != null ? executor : Executors.newCachedThreadPool();
+ final ExecutorService exec;
+ // We only want to have FlightServer close the gRPC executor if we created it here. We should not close
+ // user-supplied executors.
+ final ExecutorService grpcExecutor;
+ if (executor != null) {
+ exec = executor;
+ grpcExecutor = null;
+ } else {
+ exec = Executors.newCachedThreadPool();
+ grpcExecutor = exec;
+ }
+ final FlightBindingService flightService = new FlightBindingService(allocator, producer, authHandler, exec);
builder
.executor(exec)
.maxInboundMessageSize(maxInboundMessageSize)
.addService(
ServerInterceptors.intercept(
- new FlightBindingService(allocator, producer, authHandler, exec),
+ flightService,
new ServerAuthInterceptor(authHandler)));
// Allow hooking into the gRPC builder. This is not guaranteed to be available on all Arrow versions or
@@ -259,7 +279,7 @@ public FlightServer build() {
});
builder.intercept(new ServerInterceptorAdapter(interceptors));
- return new FlightServer(location, builder.build());
+ return new FlightServer(location, builder.build(), grpcExecutor);
}
/**
@@ -294,6 +314,9 @@ public Builder useTls(final InputStream certChain, final InputStream key) {
/**
* Set the executor used by the server.
+ *
+ * Flight will NOT take ownership of the executor. The application must clean it up if one is provided. (If not
+ * provided, Flight will use a default executor which it will clean up.)
*/
public Builder executor(ExecutorService executor) {
this.executor = executor;
diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestServerOptions.java b/java/flight/src/test/java/org/apache/arrow/flight/TestServerOptions.java
index 496e1be40d4..0c32b276a72 100644
--- a/java/flight/src/test/java/org/apache/arrow/flight/TestServerOptions.java
+++ b/java/flight/src/test/java/org/apache/arrow/flight/TestServerOptions.java
@@ -18,6 +18,8 @@
package org.apache.arrow.flight;
import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -54,6 +56,48 @@ public void builderConsumer() throws Exception {
}
}
+ /**
+ * Make sure that if Flight supplies a default executor to gRPC, then it is closed along with the server.
+ */
+ @Test
+ public void defaultExecutorClosed() throws Exception {
+ final ExecutorService executor;
+ try (
+ BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
+ FlightServer server =
+ FlightTestUtil.getStartedServer(
+ (location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
+ .build()
+ )) {
+ Assert.assertNotNull(server.grpcExecutor);
+ executor = server.grpcExecutor;
+ }
+ Assert.assertTrue(executor.isShutdown());
+ }
+
+ /**
+ * Make sure that if the user provides an executor to gRPC, then Flight does not close it.
+ */
+ @Test
+ public void suppliedExecutorNotClosed() throws Exception {
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ try (
+ BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
+ FlightServer server =
+ FlightTestUtil.getStartedServer(
+ (location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
+ .executor(executor)
+ .build()
+ )) {
+ Assert.assertNull(server.grpcExecutor);
+ }
+ Assert.assertFalse(executor.isShutdown());
+ } finally {
+ executor.shutdown();
+ }
+ }
+
@Test
public void domainSocket() throws Exception {
Assume.assumeTrue("We have a native transport available", FlightTestUtil.isNativeTransportAvailable());