From 1e9175b91c54a23e04cf3a45f95d1d419055ac60 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 10 Oct 2024 11:09:03 -0700 Subject: [PATCH 1/2] MSQ: Include stageId, workerNumber in processing thread names. Helps identify which query was running in a thread dump. --- .../main/java/org/apache/druid/msq/exec/WorkerImpl.java | 3 ++- .../druid/frame/processor/FrameProcessorExecutor.java | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 46e7e4a14491..2928d75f9111 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -969,10 +969,11 @@ private StageOutputHolder getOrCreateStageOutputHolder(final StageId stageId, fi /** * Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}. + * In addition to being a token for cancellation, this also appears in thread dumps, so make it a little descriptive. */ private static String cancellationIdFor(final StageId stageId, final int workerNumber) { - return StringUtils.format("%s_%s", stageId, workerNumber); + return StringUtils.format("dart-worker[%s_%s]", stageId, workerNumber); } /** diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index f255fbe13a6b..49573b39b4ec 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -222,6 +222,7 @@ private Optional> runProcessorNow() } } + final String threadName = Thread.currentThread().getName(); boolean canceled = false; Either> retVal; @@ -230,6 +231,11 @@ private Optional> runProcessorNow() throw new InterruptedException(); } + if (cancellationId != null) { + // Set the thread name to something involving the cancellationId, to make thread dumps more useful. + Thread.currentThread().setName(threadName + "-" + cancellationId); + } + retVal = Either.value(processor.runIncrementally(readableInputs)); } catch (Throwable e) { @@ -253,6 +259,9 @@ private Optional> runProcessorNow() canceled = true; } } + + // Restore original thread name. + Thread.currentThread().setName(threadName); } } From 7166a7466e0321821a26e43f20c7b35ff0f6b680 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 10 Oct 2024 22:33:02 -0700 Subject: [PATCH 2/2] s/dart/msq/ --- .../src/main/java/org/apache/druid/msq/exec/WorkerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 2928d75f9111..6c9f1d899413 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -973,7 +973,7 @@ private StageOutputHolder getOrCreateStageOutputHolder(final StageId stageId, fi */ private static String cancellationIdFor(final StageId stageId, final int workerNumber) { - return StringUtils.format("dart-worker[%s_%s]", stageId, workerNumber); + return StringUtils.format("msq-worker[%s_%s]", stageId, workerNumber); } /**