From c5276c8c62e4059fa4641ba355d95267f1969203 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sun, 7 Jul 2024 14:06:18 +0530 Subject: [PATCH 1/2] Add logging for sketches on workers --- .../src/main/java/org/apache/druid/msq/exec/WorkerImpl.java | 1 + .../druid/msq/statistics/ClusterByStatisticsCollector.java | 5 +++++ .../msq/statistics/ClusterByStatisticsCollectorImpl.java | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 84453eaf98bd..61939d823731 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -1772,6 +1772,7 @@ private ResultAndChannels gatherResultKeyStatistics(final OutputChannels chan @Override public void onSuccess(final ClusterByStatisticsCollector result) { + result.logSketches(); kernelManipulationQueue.add( holder -> holder.getStageKernelMap().get(stageDefinition.getId()) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java index fb8c8232fb8d..971bf7573212 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java @@ -90,6 +90,11 @@ public interface ClusterByStatisticsCollector */ ClusterByPartitions generatePartitionsWithMaxCount(int maxNumPartitions); + /** + * Logs some information regarding the collector. This is useful in seeing which sketches were downsampled the most. + */ + void logSketches(); + /** * Returns an immutable, JSON-serializable snapshot of this collector. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java index aad5d3d54834..cc0139dda8e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java @@ -331,7 +331,8 @@ public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartit return ranges; } - private void logSketches() + @Override + public void logSketches() { if (log.isDebugEnabled()) { // Log all sketches From 31409e10ae297fb2c2e8ed9bcc5d90c3cc3016e9 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 9 Jul 2024 11:06:51 +0530 Subject: [PATCH 2/2] Refactor --- .../druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java | 1 + .../druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java | 1 + .../msq/statistics/ClusterByStatisticsCollectorImpl.java | 4 ---- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java index 8be2108a57a4..e38fc778bb8a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java @@ -100,6 +100,7 @@ public Either generatePartitionsForGlobalSort( } else if (maxPartitions > maxNumPartitions) { return Either.error((long) maxPartitions); } else { + collector.logSketches(); final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithMaxCount(maxPartitions); if (generatedPartitions.size() <= maxNumPartitions) { return Either.value(generatedPartitions); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java index 61ae457d626a..7e03a0664a85 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java @@ -99,6 +99,7 @@ public Either generatePartitionsForGlobalSort( if (expectedPartitions > maxNumPartitions) { return Either.error(expectedPartitions); } else { + collector.logSketches(); final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithTargetWeight(targetSize); if (generatedPartitions.size() <= maxNumPartitions) { return Either.value(generatedPartitions); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java index cc0139dda8e9..d89a4e809521 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java @@ -243,8 +243,6 @@ public ClusterByStatisticsCollector clear() @Override public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetWeight) { - logSketches(); - if (targetWeight < 1) { throw new IAE("Target weight must be positive"); } @@ -288,8 +286,6 @@ public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetW @Override public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartitions) { - logSketches(); - if (maxNumPartitions < 1) { throw new IAE("Must have at least one partition"); } else if (buckets.isEmpty()) {