From 40eab1176d583f896a93159ac3552473e40a5b7d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 24 Jul 2019 10:59:43 -0700 Subject: [PATCH] fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140) * fix issue with CuratorLoadQueuePeon shutting down executors it does not own * use lifecycled executors * maybe this --- .../coordinator/CuratorLoadQueuePeon.java | 2 -- .../CuratorDruidCoordinatorTest.java | 22 +++++++++++++++---- .../org/apache/druid/cli/CliCoordinator.java | 12 +++++++--- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java index a4d5d948cf07..6a526e8ca7cc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -334,8 +334,6 @@ public void stop() queuedSize.set(0L); failedAssignCount.set(0); - processingExecutor.shutdown(); - callBackExecutor.shutdown(); } private void entryRemoved(SegmentHolder segmentHolder, String path) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 9cd02069e01e..1cea98e1bf3d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -73,7 +73,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -117,6 +119,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; + private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d"); + private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d"); + public CuratorDruidCoordinatorTest() { jsonMapper = TestHelper.makeJsonMapper(); @@ -186,16 +191,16 @@ public void setUp() throws Exception curator, SOURCE_LOAD_PATH, objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"), + peonExec, + callbackExec, druidCoordinatorConfig ); destinationLoadQueuePeon = new CuratorLoadQueuePeon( curator, DESTINATION_LOAD_PATH, objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"), + peonExec, + callbackExec, druidCoordinatorConfig ); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); @@ -260,6 +265,15 @@ public void tearDown() throws Exception @Rule public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS); + @Test + public void testStopDoesntKillPoolItDoesntOwn() throws Exception + { + setupView(); + sourceLoadQueuePeon.stop(); + Assert.assertFalse(peonExec.isShutdown()); + Assert.assertFalse(callbackExec.isShutdown()); + } + @Test public void testMoveSegment() throws Exception { diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 98bb958d9c50..834b41723edd 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -47,7 +47,9 @@ import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.http.JettyHttpClientModule; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ExecutorServices; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.metadata.MetadataRuleManager; @@ -248,7 +250,8 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( ScheduledExecutorFactory factory, DruidCoordinatorConfig config, @EscalatedGlobal HttpClient httpClient, - ZkPathsConfig zkPaths + ZkPathsConfig zkPaths, + Lifecycle lifecycle ) { boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); @@ -256,9 +259,12 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( if (useHttpLoadQueuePeon) { callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); } else { - callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon" - + "-callbackexec--%d"); + callBackExec = Execs.multiThreaded( + config.getNumCuratorCallBackThreads(), + "LoadQueuePeon-callbackexec--%d" + ); } + ExecutorServices.manageLifecycle(lifecycle, callBackExec); return new LoadQueueTaskMaster( curator, jsonMapper,