diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 84453eaf98bd..61939d823731 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -1772,6 +1772,7 @@ private ResultAndChannels gatherResultKeyStatistics(final OutputChannels chan @Override public void onSuccess(final ClusterByStatisticsCollector result) { + result.logSketches(); kernelManipulationQueue.add( holder -> holder.getStageKernelMap().get(stageDefinition.getId()) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java index 8be2108a57a4..e38fc778bb8a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java @@ -100,6 +100,7 @@ public Either generatePartitionsForGlobalSort( } else if (maxPartitions > maxNumPartitions) { return Either.error((long) maxPartitions); } else { + collector.logSketches(); final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithMaxCount(maxPartitions); if (generatedPartitions.size() <= maxNumPartitions) { return Either.value(generatedPartitions); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java index 61ae457d626a..7e03a0664a85 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java @@ -99,6 +99,7 @@ public Either generatePartitionsForGlobalSort( if (expectedPartitions > maxNumPartitions) { return Either.error(expectedPartitions); } else { + collector.logSketches(); final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithTargetWeight(targetSize); if (generatedPartitions.size() <= maxNumPartitions) { return Either.value(generatedPartitions); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java index fb8c8232fb8d..971bf7573212 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java @@ -90,6 +90,11 @@ public interface ClusterByStatisticsCollector */ ClusterByPartitions generatePartitionsWithMaxCount(int maxNumPartitions); + /** + * Logs some information regarding the collector. This is useful in seeing which sketches were downsampled the most. + */ + void logSketches(); + /** * Returns an immutable, JSON-serializable snapshot of this collector. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java index aad5d3d54834..d89a4e809521 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java @@ -243,8 +243,6 @@ public ClusterByStatisticsCollector clear() @Override public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetWeight) { - logSketches(); - if (targetWeight < 1) { throw new IAE("Target weight must be positive"); } @@ -288,8 +286,6 @@ public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetW @Override public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartitions) { - logSketches(); - if (maxNumPartitions < 1) { throw new IAE("Must have at least one partition"); } else if (buckets.isEmpty()) { @@ -331,7 +327,8 @@ public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartit return ranges; } - private void logSketches() + @Override + public void logSketches() { if (log.isDebugEnabled()) { // Log all sketches