From 35a44216f4f6812d1509e04b85aa86dd1104f9e4 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 30 Nov 2022 11:02:10 +0530 Subject: [PATCH 1/2] Fix an issue with WorkerSketchFetcher not terminating on shutdown --- .../org/apache/druid/msq/exec/ControllerImpl.java | 5 +++-- .../apache/druid/msq/exec/WorkerSketchFetcher.java | 12 +++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index cafc0f389258..318c33a759c7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -524,6 +524,8 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) context.registerController(this, closer); this.netClient = new ExceptionWrappingWorkerClient(context.taskClientFor(this)); + closer.register(netClient::close); + ClusterStatisticsMergeMode clusterStatisticsMergeMode = MultiStageQueryContext.getClusterStatisticsMergeMode(task.getQuerySpec().getQuery().context()); @@ -532,8 +534,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) int statisticsMaxRetainedBytes = WorkerMemoryParameters.createProductionInstanceForController(context.injector()) .getPartitionStatisticsMaxRetainedBytes(); this.workerSketchFetcher = new WorkerSketchFetcher(netClient, clusterStatisticsMergeMode, statisticsMaxRetainedBytes); - - closer.register(netClient::close); + closer.register(workerSketchFetcher::close); final boolean isDurableStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index 3482b50daaff..e233ca198330 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -25,6 +25,7 @@ import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; @@ -39,13 +40,12 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.IntStream; /** * Queues up fetching sketches from workers and progressively generates partitions boundaries. */ -public class WorkerSketchFetcher +public class WorkerSketchFetcher implements AutoCloseable { private static final Logger log = new Logger(WorkerSketchFetcher.class); private static final int DEFAULT_THREAD_COUNT = 4; @@ -63,7 +63,7 @@ public WorkerSketchFetcher(WorkerClient workerClient, ClusterStatisticsMergeMode { this.workerClient = workerClient; this.clusterStatisticsMergeMode = clusterStatisticsMergeMode; - this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT); + this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT, "SketchFetcherThreadPool"); this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes; } @@ -337,4 +337,10 @@ private static long getPartitionCountFromEither(Either Date: Wed, 30 Nov 2022 12:37:18 +0530 Subject: [PATCH 2/2] Change threadpool name --- .../java/org/apache/druid/msq/exec/WorkerSketchFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index e233ca198330..c4118a9d38e0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -63,7 +63,7 @@ public WorkerSketchFetcher(WorkerClient workerClient, ClusterStatisticsMergeMode { this.workerClient = workerClient; this.clusterStatisticsMergeMode = clusterStatisticsMergeMode; - this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT, "SketchFetcherThreadPool"); + this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT, "SketchFetcherThreadPool-%d"); this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes; }