From d5cb8e1a5369276c2f8cc57e41bee6e5ecc75154 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 23 Jul 2019 12:39:44 -0700 Subject: [PATCH 1/3] fix issue with CuratorLoadQueuePeon shutting down executors it does not own --- .../coordinator/CuratorLoadQueuePeon.java | 2 -- .../CuratorDruidCoordinatorTest.java | 22 +++++++++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java index 10113fd0235a..5fd9ac5780d2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -335,8 +335,6 @@ public void stop() queuedSize.set(0L); failedAssignCount.set(0); - processingExecutor.shutdown(); - callBackExecutor.shutdown(); } private void entryRemoved(SegmentHolder segmentHolder, String path) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index eb3b25b6899f..7e9659e2060d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -73,7 +73,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -119,6 +121,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; + private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d"); + private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d"); + public CuratorDruidCoordinatorTest() { jsonMapper = TestHelper.makeJsonMapper(); @@ -187,16 +192,16 @@ public void setUp() throws Exception curator, SOURCE_LOAD_PATH, objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"), + peonExec, + callbackExec, druidCoordinatorConfig ); destinationLoadQueuePeon = new CuratorLoadQueuePeon( curator, DESTINATION_LOAD_PATH, objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"), + peonExec, + callbackExec, druidCoordinatorConfig ); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); @@ -261,6 +266,15 @@ public void tearDown() throws Exception @Rule public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS); + @Test + public void testStopDoesntKillPoolItDoesntOwn() throws Exception + { + setupView(); + sourceLoadQueuePeon.stop(); + Assert.assertFalse(peonExec.isShutdown()); + Assert.assertFalse(callbackExec.isShutdown()); + } + @Test public void testMoveSegment() throws Exception { From 440f25016cc36961d90c46ce05bc109a43f387b9 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 23 Jul 2019 13:27:20 -0700 Subject: [PATCH 2/3] use lifecycled executors --- .../main/java/org/apache/druid/cli/CliCoordinator.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index ecafcbfc7cc6..4ca650eb57c8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -46,7 +46,6 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.http.JettyHttpClientModule; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; @@ -254,10 +253,12 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); ExecutorService callBackExec; if (useHttpLoadQueuePeon) { - callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); + callBackExec = factory.create(1, "LoadQueuePeon-callbackexec--%d"); } else { - callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon" - + "-callbackexec--%d"); + callBackExec = factory.create( + config.getNumCuratorCallBackThreads(), + "LoadQueuePeon-callbackexec--%d" + ); } return new LoadQueueTaskMaster( curator, From 34096a2151ffd26c4c91ced263f74e3eb562a2e7 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 23 Jul 2019 13:34:13 -0700 Subject: [PATCH 3/3] maybe this --- .../java/org/apache/druid/cli/CliCoordinator.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 4ca650eb57c8..118b911586be 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -46,7 +46,10 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.http.JettyHttpClientModule; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ExecutorServices; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.metadata.MetadataRuleManager; @@ -247,19 +250,21 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( ScheduledExecutorFactory factory, DruidCoordinatorConfig config, @EscalatedGlobal HttpClient httpClient, - ZkPathsConfig zkPaths + ZkPathsConfig zkPaths, + Lifecycle lifecycle ) { boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); ExecutorService callBackExec; if (useHttpLoadQueuePeon) { - callBackExec = factory.create(1, "LoadQueuePeon-callbackexec--%d"); + callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); } else { - callBackExec = factory.create( + callBackExec = Execs.multiThreaded( config.getNumCuratorCallBackThreads(), "LoadQueuePeon-callbackexec--%d" ); } + ExecutorServices.manageLifecycle(lifecycle, callBackExec); return new LoadQueueTaskMaster( curator, jsonMapper,