diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java index 14f240295752..38947f009929 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java @@ -135,7 +135,8 @@ Sequence createParallelSequence( parallelism, yieldAfter, batchSize, - targetTaskTimeMillis + targetTaskTimeMillis, + null ); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 41a82c219a1c..173d721fec86 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BinaryOperator; +import java.util.function.Consumer; /** * Artisanal, locally-sourced, hand-crafted, gluten and GMO free, bespoke, free-range, organic, small-batch parallel @@ -64,7 +66,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase public static final int DEFAULT_TASK_SMALL_BATCH_NUM_ROWS = 4096; private final ForkJoinPool workerPool; - private final List> baseSequences; + private final List> inputSequences; private final Ordering orderingFn; private final BinaryOperator combineFn; private final int queueSize; @@ -75,11 +77,12 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase private final int batchSize; private final int parallelism; private final long targetTimeNanos; + private final Consumer metricsReporter; private final CancellationGizmo cancellationGizmo; public ParallelMergeCombiningSequence( ForkJoinPool workerPool, - List> baseSequences, + List> inputSequences, Ordering orderingFn, BinaryOperator combineFn, boolean hasTimeout, @@ -88,11 +91,12 @@ public ParallelMergeCombiningSequence( int parallelism, int yieldAfter, int batchSize, - int targetTimeMillis + int targetTimeMillis, + Consumer reporter ) { this.workerPool = workerPool; - this.baseSequences = baseSequences; + this.inputSequences = inputSequences; this.orderingFn = orderingFn; this.combineFn = combineFn; this.hasTimeout = hasTimeout; @@ -103,19 +107,21 @@ public ParallelMergeCombiningSequence( this.batchSize = batchSize; this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS); this.queueSize = 4 * (yieldAfter / batchSize); + this.metricsReporter = reporter; this.cancellationGizmo = new CancellationGizmo(); } @Override public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) { - if (baseSequences.isEmpty()) { + if (inputSequences.isEmpty()) { return Sequences.empty().toYielder(initValue, accumulator); } final BlockingQueue> outputQueue = new ArrayBlockingQueue<>(queueSize); - MergeCombinePartitioningAction finalMergeAction = new MergeCombinePartitioningAction<>( - baseSequences, + final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size()); + MergeCombinePartitioningAction mergeCombineAction = new MergeCombinePartitioningAction<>( + inputSequences, orderingFn, combineFn, outputQueue, @@ -126,10 +132,20 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat targetTimeNanos, hasTimeout, timeoutAtNanos, + metricsAccumulator, cancellationGizmo ); - workerPool.execute(finalMergeAction); - Sequence finalOutSequence = makeOutputSequenceForQueue(outputQueue, hasTimeout, timeoutAtNanos, cancellationGizmo); + workerPool.execute(mergeCombineAction); + Sequence finalOutSequence = makeOutputSequenceForQueue( + outputQueue, + hasTimeout, + timeoutAtNanos, + cancellationGizmo + ).withBaggage(() -> { + if (metricsReporter != null) { + metricsReporter.accept(metricsAccumulator.build()); + } + }); return finalOutSequence.toYielder(initValue, accumulator); } @@ -247,6 +263,7 @@ private static class MergeCombinePartitioningAction extends RecursiveAction private final long targetTimeNanos; private final boolean hasTimeout; private final long timeoutAt; + private final MergeCombineMetricsAccumulator metricsAccumulator; private final CancellationGizmo cancellationGizmo; private MergeCombinePartitioningAction( @@ -261,6 +278,7 @@ private MergeCombinePartitioningAction( long targetTimeNanos, boolean hasTimeout, long timeoutAt, + MergeCombineMetricsAccumulator metricsAccumulator, CancellationGizmo cancellationGizmo ) { @@ -275,6 +293,7 @@ private MergeCombinePartitioningAction( this.targetTimeNanos = targetTimeNanos; this.hasTimeout = hasTimeout; this.timeoutAt = timeoutAt; + this.metricsAccumulator = metricsAccumulator; this.cancellationGizmo = cancellationGizmo; } @@ -300,6 +319,9 @@ protected void compute() for (Sequence s : sequences) { sequenceCursors.add(new YielderBatchedResultsCursor<>(new SequenceBatcher<>(s, batchSize), orderingFn)); } + MergeCombineActionMetricsAccumulator soloAccumulator = new MergeCombineActionMetricsAccumulator(); + metricsAccumulator.setPartitions(Collections.emptyList()); + metricsAccumulator.setMergeMetrics(soloAccumulator); PrepareMergeCombineInputsAction blockForInputsAction = new PrepareMergeCombineInputsAction<>( sequenceCursors, resultsPusher, @@ -308,6 +330,7 @@ protected void compute() yieldAfter, batchSize, targetTimeNanos, + soloAccumulator, cancellationGizmo ); getPool().execute(blockForInputsAction); @@ -325,7 +348,9 @@ protected void compute() private void spawnParallelTasks(int parallelMergeTasks) { - List tasks = new ArrayList<>(); + List tasks = new ArrayList<>(parallelMergeTasks); + List taskMetrics = new ArrayList<>(parallelMergeTasks); + List>> intermediaryOutputs = new ArrayList<>(parallelMergeTasks); List>> partitions = @@ -340,6 +365,7 @@ private void spawnParallelTasks(int parallelMergeTasks) for (Sequence s : partition) { partitionCursors.add(new YielderBatchedResultsCursor<>(new SequenceBatcher<>(s, batchSize), orderingFn)); } + MergeCombineActionMetricsAccumulator partitionAccumulator = new MergeCombineActionMetricsAccumulator(); PrepareMergeCombineInputsAction blockForInputsAction = new PrepareMergeCombineInputsAction<>( partitionCursors, pusher, @@ -348,11 +374,15 @@ private void spawnParallelTasks(int parallelMergeTasks) yieldAfter, batchSize, targetTimeNanos, + partitionAccumulator, cancellationGizmo ); tasks.add(blockForInputsAction); + taskMetrics.add(partitionAccumulator); } + metricsAccumulator.setPartitions(taskMetrics); + for (RecursiveAction task : tasks) { getPool().execute(task); } @@ -364,6 +394,9 @@ private void spawnParallelTasks(int parallelMergeTasks) new BlockingQueueuBatchedResultsCursor<>(queue, orderingFn, hasTimeout, timeoutAt) ); } + MergeCombineActionMetricsAccumulator finalMergeMetrics = new MergeCombineActionMetricsAccumulator(); + + metricsAccumulator.setMergeMetrics(finalMergeMetrics); PrepareMergeCombineInputsAction finalMergeAction = new PrepareMergeCombineInputsAction<>( intermediaryOutputsCursors, outputPusher, @@ -372,6 +405,7 @@ private void spawnParallelTasks(int parallelMergeTasks) yieldAfter, batchSize, targetTimeNanos, + finalMergeMetrics, cancellationGizmo ); @@ -444,9 +478,9 @@ private int computeNumTasks() * this task completes and executes a new task to continue where it left off. This value is initially set by the * {@link MergeCombinePartitioningAction} to a default value, but after that this process is timed to try and compute * an 'optimal' number of rows to yield to achieve a task runtime of ~10ms, on the assumption that the time to process - * n results will be approximately the same. {@link #recursionDepth} is used to track how many times a task has - * continued executing, and utilized to compute a cumulative moving average of task run time per amount yielded in - * order to 'smooth' out the continual adjustment. + * n results will be approximately the same. {@link MergeCombineActionMetricsAccumulator#taskCount} is used to track + * how many times a task has continued executing, and utilized to compute a cumulative moving average of task run time + * per amount yielded in order to 'smooth' out the continual adjustment. */ private static class MergeCombineAction extends RecursiveAction { @@ -458,7 +492,7 @@ private static class MergeCombineAction extends RecursiveAction private final int yieldAfter; private final int batchSize; private final long targetTimeNanos; - private final int recursionDepth; + private final MergeCombineActionMetricsAccumulator metricsAccumulator; private final CancellationGizmo cancellationGizmo; private MergeCombineAction( @@ -470,7 +504,7 @@ private MergeCombineAction( int yieldAfter, int batchSize, long targetTimeNanos, - int recursionDepth, + MergeCombineActionMetricsAccumulator metricsAccumulator, CancellationGizmo cancellationGizmo ) { @@ -482,7 +516,7 @@ private MergeCombineAction( this.yieldAfter = yieldAfter; this.batchSize = batchSize; this.targetTimeNanos = targetTimeNanos; - this.recursionDepth = recursionDepth; + this.metricsAccumulator = metricsAccumulator; this.cancellationGizmo = cancellationGizmo; } @@ -498,7 +532,7 @@ protected void compute() ResultBatch outputBatch = new ResultBatch<>(batchSize); T currentCombinedValue = initialValue; - while (counter++ < yieldAfter && !pQueue.isEmpty()) { + while (counter < yieldAfter && !pQueue.isEmpty()) { BatchedResultsCursor cursor = pQueue.poll(); // push the queue along @@ -512,6 +546,7 @@ protected void compute() cursor.close(); } + counter++; // if current value is null, combine null with next value if (currentCombinedValue == null) { currentCombinedValue = combineFn.apply(null, nextValueToAccumulate); @@ -530,6 +565,7 @@ protected void compute() if (batchCounter >= batchSize) { outputQueue.offer(outputBatch); outputBatch = new ResultBatch<>(batchSize); + metricsAccumulator.incrementOutputRows(batchCounter); batchCounter = 0; } @@ -540,19 +576,25 @@ protected void compute() } } + final long elapsedCpuNanos = JvmUtils.safeGetThreadCpuTime() - startCpuNanos; + metricsAccumulator.incrementInputRows(counter); + metricsAccumulator.incrementCpuTimeNanos(elapsedCpuNanos); + metricsAccumulator.incrementTaskCount(); + if (!pQueue.isEmpty() && !cancellationGizmo.isCancelled()) { // if there is still work to be done, execute a new task with the current accumulated value to continue // combining where we left off if (!outputBatch.isDrained()) { outputQueue.offer(outputBatch); + metricsAccumulator.incrementOutputRows(batchCounter); } // measure the time it took to process 'yieldAfter' elements in order to project a next 'yieldAfter' value // which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order // to prevent normal jitter in processing time from skewing the next yield value too far in any direction final long elapsedNanos = System.nanoTime() - start; - final long elapsedCpuNanos = JvmUtils.safeGetThreadCpuTime() - startCpuNanos; final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0); + final long recursionDepth = metricsAccumulator.getTaskCount(); final double cumulativeMovingAverage = (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); final int adjustedNextYieldAfter = (int) Math.ceil(cumulativeMovingAverage); @@ -575,21 +617,22 @@ protected void compute() adjustedNextYieldAfter, batchSize, targetTimeNanos, - recursionDepth + 1, + metricsAccumulator, cancellationGizmo )); } else if (cancellationGizmo.isCancelled()) { // if we got the cancellation signal, go ahead and write terminal value into output queue to help gracefully // allow downstream stuff to stop - LOG.debug("cancelled after %s tasks", recursionDepth); + LOG.debug("cancelled after %s tasks", metricsAccumulator.getTaskCount()); outputQueue.offer(ResultBatch.TERMINAL); } else { // if priority queue is empty, push the final accumulated value into the output batch and push it out outputBatch.add(currentCombinedValue); + metricsAccumulator.incrementOutputRows(batchCounter + 1L); outputQueue.offer(outputBatch); // ... and the terminal value to indicate the blocking queue holding the values is complete outputQueue.offer(ResultBatch.TERMINAL); - LOG.debug("merge combine complete after %s tasks", recursionDepth); + LOG.debug("merge combine complete after %s tasks", metricsAccumulator.getTaskCount()); } } catch (Exception ex) { @@ -623,6 +666,7 @@ private static class PrepareMergeCombineInputsAction extends RecursiveAction private final int yieldAfter; private final int batchSize; private final long targetTimeNanos; + private final MergeCombineActionMetricsAccumulator metricsAccumulator; private final CancellationGizmo cancellationGizmo; private PrepareMergeCombineInputsAction( @@ -633,6 +677,7 @@ private PrepareMergeCombineInputsAction( int yieldAfter, int batchSize, long targetTimeNanos, + MergeCombineActionMetricsAccumulator metricsAccumulator, CancellationGizmo cancellationGizmo ) { @@ -643,6 +688,7 @@ private PrepareMergeCombineInputsAction( this.yieldAfter = yieldAfter; this.batchSize = batchSize; this.targetTimeNanos = targetTimeNanos; + this.metricsAccumulator = metricsAccumulator; this.cancellationGizmo = cancellationGizmo; } @@ -669,7 +715,7 @@ protected void compute() yieldAfter, batchSize, targetTimeNanos, - 1, + metricsAccumulator, cancellationGizmo )); } else { @@ -1110,4 +1156,198 @@ RuntimeException getRuntimeException() return new RE(ex); } } + + /** + * Metrics for the execution of a {@link ParallelMergeCombiningSequence} on the {@link ForkJoinPool} + */ + public static class MergeCombineMetrics + { + private final int parallelism; + private final int inputSequences; + private final long inputRows; + private final long outputRows; + private final long taskCount; + private final long totalCpuTime; + + MergeCombineMetrics( + int parallelism, + int inputSequences, + long inputRows, + long outputRows, + long taskCount, + long totalCpuTime + ) + { + this.parallelism = parallelism; + this.inputSequences = inputSequences; + this.inputRows = inputRows; + this.outputRows = outputRows; + this.taskCount = taskCount; + this.totalCpuTime = totalCpuTime; + } + + /** + * Total number of layer 1 parallel tasks (+ 1 for total number of concurrent tasks for this query) + */ + public int getParallelism() + { + return parallelism; + } + + /** + * Total number of input {@link Sequence} processed by {@link ParallelMergeCombiningSequence} + */ + public long getInputSequences() + { + return inputSequences; + } + + /** + * Total number of input 'rows' processed by the {@link ParallelMergeCombiningSequence} + */ + public long getInputRows() + { + return inputRows; + } + + /** + * Total number of output 'rows' produced by merging and combining the set of input {@link Sequence}s + */ + public long getOutputRows() + { + return outputRows; + } + + /** + * Total number of {@link ForkJoinPool} tasks involved in executing the {@link ParallelMergeCombiningSequence}, + * including {@link MergeCombinePartitioningAction}, {@link PrepareMergeCombineInputsAction}, and + * {@link MergeCombineAction}. + */ + public long getTaskCount() + { + return taskCount; + } + + /** + * Total CPU time in nanoseconds during the 'hot loop' of doing actual merging and combining + * in {@link MergeCombineAction} + */ + public long getTotalCpuTime() + { + return totalCpuTime; + } + } + + /** + * Holder to accumlate metrics for all work done {@link ParallelMergeCombiningSequence}, containing layer 1 task + * metrics in {@link #partitionMetrics} and final merge task metrics in {@link #mergeMetrics}, in order to compute + * {@link MergeCombineMetrics} after the {@link ParallelMergeCombiningSequence} is completely consumed. + */ + static class MergeCombineMetricsAccumulator + { + List partitionMetrics; + MergeCombineActionMetricsAccumulator mergeMetrics; + private final int inputSequences; + + MergeCombineMetricsAccumulator(int inputSequences) + { + this.inputSequences = inputSequences; + } + + void setMergeMetrics(MergeCombineActionMetricsAccumulator mergeMetrics) + { + this.mergeMetrics = mergeMetrics; + } + + void setPartitions(List partitionMetrics) + { + this.partitionMetrics = partitionMetrics; + } + + MergeCombineMetrics build() + { + long numInputRows = 0; + long cpuTimeNanos = 0; + // 1 partition task, 1 layer two prepare merge inputs task, 1 layer one prepare merge inputs task for each + // partition + long totalPoolTasks = 1 + 1 + partitionMetrics.size(); + + // accumulate input row count, cpu time, and total number of tasks from each partition + for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) { + numInputRows += partition.getInputRows(); + cpuTimeNanos += partition.getTotalCpuTimeNanos(); + totalPoolTasks += partition.getTaskCount(); + } + // if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the + // value as it is only the number of intermediary input rows to the layer 2 task + if (partitionMetrics.isEmpty()) { + numInputRows = mergeMetrics.getInputRows(); + } + // number of fjp tasks and cpu time is interesting though + totalPoolTasks += mergeMetrics.getTaskCount(); + cpuTimeNanos += mergeMetrics.getTotalCpuTimeNanos(); + + final long numOutputRows = mergeMetrics.getOutputRows(); + + return new MergeCombineMetrics( + Math.max(partitionMetrics.size(), 1), + inputSequences, + numInputRows, + numOutputRows, + totalPoolTasks, + cpuTimeNanos + ); + } + } + + /** + * Accumulate metrics about a single chain of{@link MergeCombineAction} + */ + static class MergeCombineActionMetricsAccumulator + { + private long taskCount = 1; + private long inputRows = 0; + private long outputRows = 0; + private long totalCpuTimeNanos = 0; + + void incrementTaskCount() + { + taskCount++; + } + + void incrementInputRows(long numInputRows) + { + inputRows += numInputRows; + } + + void incrementOutputRows(long numOutputRows) + { + outputRows += numOutputRows; + } + + void incrementCpuTimeNanos(long nanos) + { + totalCpuTimeNanos += nanos; + } + + long getTaskCount() + { + return taskCount; + } + + long getInputRows() + { + return inputRows; + } + + long getOutputRows() + { + return outputRows; + } + + long getTotalCpuTimeNanos() + { + return totalCpuTimeNanos; + } + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java index e2d637398c84..bb80fb237287 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java @@ -62,6 +62,7 @@ public Yielder get() catch (Exception e) { t.addSuppressed(e); } + Throwables.propagateIfPossible(t); throw new RuntimeException(t); } } @@ -89,6 +90,7 @@ public void close() throws IOException t.addSuppressed(e); } Throwables.propagateIfInstanceOf(t, IOException.class); + Throwables.propagateIfPossible(t); throw new RuntimeException(t); } // "Normal" close @@ -97,6 +99,7 @@ public void close() throws IOException } catch (Exception e) { Throwables.propagateIfInstanceOf(e, IOException.class); + Throwables.propagateIfPossible(e); throw new RuntimeException(e); } } diff --git a/core/src/main/java/org/apache/druid/math/expr/ApplyFunction.java b/core/src/main/java/org/apache/druid/math/expr/ApplyFunction.java index 82fc6cb6100c..4bf2fa5e934b 100644 --- a/core/src/main/java/org/apache/druid/math/expr/ApplyFunction.java +++ b/core/src/main/java/org/apache/druid/math/expr/ApplyFunction.java @@ -289,7 +289,7 @@ public boolean hasArrayOutput(LambdaExpr lambdaExpr) /** * Accumulate a value for a single array input with a 2 argument {@link LambdaExpr}. The 'array' input expression is - * the first argument, the initial value for the accumlator expression is the 2nd argument. + * the first argument, the initial value for the accumulator expression is the 2nd argument. */ class FoldFunction extends BaseFoldFunction { @@ -314,10 +314,10 @@ public ExprEval apply(LambdaExpr lambdaExpr, List argsExpr, Expr.ObjectBin if (array == null) { return ExprEval.of(null); } - Object accumlator = accEval.value(); + Object accumulator = accEval.value(); - FoldLambdaBinding lambdaBinding = new FoldLambdaBinding(array, accumlator, lambdaExpr, bindings); - return applyFold(lambdaExpr, accumlator, lambdaBinding); + FoldLambdaBinding lambdaBinding = new FoldLambdaBinding(array, accumulator, lambdaExpr, bindings); + return applyFold(lambdaExpr, accumulator, lambdaBinding); } @Override @@ -340,8 +340,8 @@ public void validateArguments(LambdaExpr lambdaExpr, List args) /** * Accumulate a value for the cartesian product of 'n' array inputs arguments with an 'n + 1' argument - * {@link LambdaExpr}. The 'array' input expressions are the first 'n' arguments, the initial value for the accumlator - * expression is the final argument. + * {@link LambdaExpr}. The 'array' input expressions are the first 'n' arguments, the initial value for the + * accumulator expression is the final argument. */ class CartesianFoldFunction extends BaseFoldFunction { @@ -385,11 +385,11 @@ public ExprEval apply(LambdaExpr lambdaExpr, List argsExpr, Expr.ObjectBin ExprEval accEval = accExpr.eval(bindings); - Object accumlator = accEval.value(); + Object accumulator = accEval.value(); CartesianFoldLambdaBinding lambdaBindings = - new CartesianFoldLambdaBinding(product, accumlator, lambdaExpr, bindings); - return applyFold(lambdaExpr, accumlator, lambdaBindings); + new CartesianFoldLambdaBinding(product, accumulator, lambdaExpr, bindings); + return applyFold(lambdaExpr, accumulator, lambdaBindings); } @Override diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java index 156447505c4b..e96e1e3a0aa1 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java @@ -41,9 +41,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BinaryOperator; +import java.util.function.Consumer; public class ParallelMergeCombiningSequenceTest { + private static final int TEST_POOL_SIZE = 4; private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class); public static final Ordering INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs); @@ -65,7 +67,7 @@ public class ParallelMergeCombiningSequenceTest public void setup() { pool = new ForkJoinPool( - (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75), + TEST_POOL_SIZE, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t), true @@ -255,7 +257,7 @@ public void testEmpties() throws Exception input.add(Sequences.empty()); assertResult(input); - // above min sequence count threshold, so will merge in parallel (if enough cores) + // above min sequence count threshold, so will merge in parallel input.add(Sequences.empty()); input.add(Sequences.empty()); input.add(Sequences.empty()); @@ -273,7 +275,7 @@ public void testEmptiesAndNonEmpty() throws Exception input.clear(); - // above min sequence count threshold, so will merge in parallel (if enough cores) + // above min sequence count threshold, so will merge in parallel input.add(Sequences.empty()); input.add(Sequences.empty()); input.add(Sequences.empty()); @@ -290,18 +292,32 @@ public void testAllInSingleBatch() throws Exception List> input = new ArrayList<>(); input.add(nonBlockingSequence(5)); input.add(nonBlockingSequence(6)); - assertResult(input, 10, 20); + assertResult(input, 10, 20, reportMetrics -> { + Assert.assertEquals(1, reportMetrics.getParallelism()); + Assert.assertEquals(2, reportMetrics.getInputSequences()); + Assert.assertEquals(11, reportMetrics.getInputRows()); + // deltas because it depends how much result combining is happening, which is random + Assert.assertEquals(6, reportMetrics.getOutputRows(), 5); + Assert.assertEquals(4, reportMetrics.getTaskCount()); + }); input.clear(); - // above min sequence count threshold, so will merge in parallel (if enough cores) + // above min sequence count threshold, so will merge in parallel input.add(nonBlockingSequence(5)); input.add(nonBlockingSequence(6)); input.add(nonBlockingSequence(5)); input.add(nonBlockingSequence(8)); input.add(nonBlockingSequence(4)); input.add(nonBlockingSequence(6)); - assertResult(input, 10, 20); + assertResult(input, 10, 20, reportMetrics -> { + Assert.assertEquals(2, reportMetrics.getParallelism()); + Assert.assertEquals(6, reportMetrics.getInputSequences()); + Assert.assertEquals(34, reportMetrics.getInputRows()); + // deltas because it depends how much result combining is happening, which is random + Assert.assertEquals(16, reportMetrics.getOutputRows(), 15); + Assert.assertEquals(10, reportMetrics.getTaskCount(), 2); + }); } @Test @@ -311,18 +327,32 @@ public void testAllInSingleYield() throws Exception List> input = new ArrayList<>(); input.add(nonBlockingSequence(5)); input.add(nonBlockingSequence(6)); - assertResult(input, 4, 20); + assertResult(input, 4, 20, reportMetrics -> { + Assert.assertEquals(1, reportMetrics.getParallelism()); + Assert.assertEquals(2, reportMetrics.getInputSequences()); + Assert.assertEquals(11, reportMetrics.getInputRows()); + // deltas because it depends how much result combining is happening, which is random + Assert.assertEquals(6, reportMetrics.getOutputRows(), 5); + Assert.assertEquals(4, reportMetrics.getTaskCount()); + }); input.clear(); - // above min sequence count threshold, so will merge in parallel (if enough cores) + // above min sequence count threshold, so will merge in parallel input.add(nonBlockingSequence(5)); input.add(nonBlockingSequence(6)); input.add(nonBlockingSequence(5)); input.add(nonBlockingSequence(8)); input.add(nonBlockingSequence(4)); input.add(nonBlockingSequence(6)); - assertResult(input, 4, 20); + assertResult(input, 4, 20, reportMetrics -> { + Assert.assertEquals(2, reportMetrics.getParallelism()); + Assert.assertEquals(6, reportMetrics.getInputSequences()); + Assert.assertEquals(34, reportMetrics.getInputRows()); + // deltas because it depends how much result combining is happening, which is random + Assert.assertEquals(16, reportMetrics.getOutputRows(), 15); + Assert.assertEquals(10, reportMetrics.getTaskCount(), 2); + }); } @@ -334,7 +364,14 @@ public void testMultiBatchMultiYield() throws Exception input.add(nonBlockingSequence(15)); input.add(nonBlockingSequence(26)); - assertResult(input, 5, 10); + assertResult(input, 5, 10, reportMetrics -> { + Assert.assertEquals(1, reportMetrics.getParallelism()); + Assert.assertEquals(2, reportMetrics.getInputSequences()); + Assert.assertEquals(41, reportMetrics.getInputRows()); + // deltas because it depends how much result combining is happening, which is random + Assert.assertEquals(21, reportMetrics.getOutputRows(), 20); + Assert.assertEquals(4, reportMetrics.getTaskCount(), 2); + }); // above min sequence count threshold, so will merge in parallel (if enough cores) input.add(nonBlockingSequence(15)); @@ -342,7 +379,14 @@ public void testMultiBatchMultiYield() throws Exception input.add(nonBlockingSequence(17)); input.add(nonBlockingSequence(14)); - assertResult(input, 5, 10); + assertResult(input, 5, 10, reportMetrics -> { + Assert.assertEquals(2, reportMetrics.getParallelism()); + Assert.assertEquals(6, reportMetrics.getInputSequences()); + Assert.assertEquals(120, reportMetrics.getInputRows()); + // deltas because it depends how much result combining is happening, which is random + Assert.assertEquals(60, reportMetrics.getOutputRows(), 59); + Assert.assertEquals(10, reportMetrics.getTaskCount(), 5); + }); } @Test @@ -372,14 +416,22 @@ public void testLongerSequencesJustForFun() throws Exception input.add(nonBlockingSequence(10_000)); input.add(nonBlockingSequence(9_001)); - assertResult(input, 128, 1024); + assertResult(input, 128, 1024, reportMetrics -> { + Assert.assertEquals(1, reportMetrics.getParallelism()); + Assert.assertEquals(2, reportMetrics.getInputSequences()); + Assert.assertEquals(19_001, reportMetrics.getInputRows()); + }); input.add(nonBlockingSequence(7_777)); input.add(nonBlockingSequence(8_500)); input.add(nonBlockingSequence(5_000)); input.add(nonBlockingSequence(8_888)); - assertResult(input, 128, 1024); + assertResult(input, 128, 1024, reportMetrics -> { + Assert.assertEquals(2, reportMetrics.getParallelism()); + Assert.assertEquals(6, reportMetrics.getInputSequences()); + Assert.assertEquals(49166, reportMetrics.getInputRows()); + }); } @Test @@ -482,12 +534,29 @@ private void assertResult(List> sequences) throws InterruptedE assertResult( sequences, ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS, - ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS + ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS, + null ); } private void assertResult(List> sequences, int batchSize, int yieldAfter) throws InterruptedException, IOException + { + assertResult( + sequences, + batchSize, + yieldAfter, + null + ); + } + + private void assertResult( + List> sequences, + int batchSize, + int yieldAfter, + Consumer reporter + ) + throws InterruptedException, IOException { final CombiningSequence combiningSequence = CombiningSequence.create( new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)), @@ -503,10 +572,11 @@ private void assertResult(List> sequences, int batchSize, int true, 5000, 0, - (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5), + TEST_POOL_SIZE, yieldAfter, batchSize, - ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS + ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS, + reporter ); Yielder combiningYielder = Yielders.each(combiningSequence); @@ -561,10 +631,11 @@ private void assertException( true, timeout, 0, - (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5), + TEST_POOL_SIZE, yieldAfter, batchSize, - ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS + ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS, + null ); Yielder parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence); diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index db13df7386b5..7effff35c8a2 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -186,6 +186,12 @@ public void vectorized(final boolean vectorized) // Emit nothing by default. } + @Override + public void parallelMergeParallelism(final int parallelism) + { + // Emit nothing by default. + } + @Override public BitmapResultFactory makeBitmapResultFactory(BitmapFactory factory) { @@ -254,40 +260,70 @@ public QueryMetrics reportNodeTime(long timeNs) return reportMillisTimeMetric("query/node/time", timeNs); } - private QueryMetrics reportMillisTimeMetric(String metricName, long timeNs) + @Override + public QueryMetrics reportNodeBytes(long byteCount) { - return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs)); + return reportMetric("query/node/bytes", byteCount); } - protected QueryMetrics reportMetric(String metricName, Number value) + @Override + public QueryMetrics reportBitmapConstructionTime(long timeNs) { - checkModifiedFromOwnerThread(); - metrics.put(metricName, value); + // Don't emit by default. return this; } @Override - public QueryMetrics reportNodeBytes(long byteCount) + public QueryMetrics reportSegmentRows(long numRows) { - return reportMetric("query/node/bytes", byteCount); + // Don't emit by default. + return this; } @Override - public QueryMetrics reportBitmapConstructionTime(long timeNs) + public QueryMetrics reportPreFilteredRows(long numRows) { // Don't emit by default. return this; } @Override - public QueryMetrics reportSegmentRows(long numRows) + public QueryMetrics reportParallelMergeParallelism(int parallelism) { // Don't emit by default. return this; } @Override - public QueryMetrics reportPreFilteredRows(long numRows) + public QueryMetrics reportParallelMergeInputSequences(long numSequences) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportParallelMergeInputRows(long numRows) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportParallelMergeOutputRows(long numRows) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportParallelMergeTaskCount(long numTasks) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportParallelMergeTotalCpuTime(long timeNs) { // Don't emit by default. return this; @@ -302,4 +338,16 @@ public void emit(ServiceEmitter emitter) } metrics.clear(); } + + protected QueryMetrics reportMetric(String metricName, Number value) + { + checkModifiedFromOwnerThread(); + metrics.put(metricName, value); + return this; + } + + private QueryMetrics reportMillisTimeMetric(String metricName, long timeNs) + { + return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs)); + } } diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java index 8e827ff510fa..ce76461f43aa 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java @@ -148,22 +148,39 @@ public String getTmpDir() } @Config(value = "${base_path}.merge.useParallelMergePool") - public boolean useParallelMergePool() + public boolean useParallelMergePoolConfigured() { return true; } + public boolean useParallelMergePool() + { + final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured(); + final int parallelism = getMergePoolParallelism(); + // need at least 3 to do 2 layer merge + if (parallelism > 2) { + return useParallelMergePoolConfigured; + } + if (useParallelMergePoolConfigured) { + log.debug( + "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s", + parallelism + ); + } + return false; + } + @Config(value = "${base_path}.merge.pool.parallelism") - public int getNumThreadsMergePoolConfigured() + public int getMergePoolParallelismConfigured() { return DEFAULT_NUM_THREADS; } public int getMergePoolParallelism() { - int numThreadsConfigured = getNumThreadsMergePoolConfigured(); - if (numThreadsConfigured != DEFAULT_NUM_THREADS) { - return numThreadsConfigured; + int poolParallelismConfigured = getMergePoolParallelismConfigured(); + if (poolParallelismConfigured != DEFAULT_NUM_THREADS) { + return poolParallelismConfigured; } else { // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5 return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75); diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index d770e7377adf..ec3c82495501 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -30,8 +30,8 @@ import java.util.List; /** - * Abstraction wrapping {@link org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder} and allowing to control what - * metrics are actually emitted, what dimensions do they have, etc. + * Abstraction wrapping {@link org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder} and allowing to + * control what metrics are actually emitted, what dimensions do they have, etc. * * * Goals of QueryMetrics @@ -113,9 +113,9 @@ * * Making subinterfaces of QueryMetrics for emitting custom dimensions and/or metrics for specific query types * ----------------------------------------------------------------------------------------------------------- - * If a query type (e. g. {@link org.apache.druid.query.metadata.metadata.SegmentMetadataQuery} (it's runners) needs to emit - * custom dimensions and/or metrics which doesn't make sense for all other query types, the following steps should be - * executed: + * If a query type (e. g. {@link org.apache.druid.query.metadata.metadata.SegmentMetadataQuery} (it's runners) needs to + * emit custom dimensions and/or metrics which doesn't make sense for all other query types, the following steps should + * be executed: * * 1. Create `interface SegmentMetadataQueryMetrics extends QueryMetrics` (here and below "SegmentMetadata" is the * query type) with additional methods (see "Adding new methods" section above). @@ -148,11 +148,11 @@ * This complex procedure is needed to ensure custom {@link GenericQueryMetricsFactory} specified by users still works * for the query type when query type decides to create their custom QueryMetrics subclass. * - * {@link org.apache.druid.query.topn.TopNQueryMetrics}, {@link org.apache.druid.query.groupby.GroupByQueryMetrics}, and {@link - * org.apache.druid.query.timeseries.TimeseriesQueryMetrics} are implemented differently, because they are introduced at the - * same time as the whole QueryMetrics abstraction and their default implementations have to actually emit more - * dimensions than the default generic QueryMetrics. So those subinterfaces shouldn't be taken as direct examples for - * following the plan specified above. + * {@link org.apache.druid.query.topn.TopNQueryMetrics}, {@link org.apache.druid.query.groupby.GroupByQueryMetrics}, and + * {@link org.apache.druid.query.timeseries.TimeseriesQueryMetrics} are implemented differently, because they are + * introduced at the same time as the whole QueryMetrics abstraction and their default implementations have to actually + * emit more dimensions than the default generic QueryMetrics. So those subinterfaces shouldn't be taken as direct + * examples for following the plan specified above. * * Refer {@link SearchQueryMetricsFactory} as an implementation example of this procedure. * @@ -241,6 +241,13 @@ public interface QueryMetrics> */ void vectorized(boolean vectorized); + /** + * Sets broker merge parallelism, if parallel merges are enabled. This will only appear in broker level metrics. This + * value is identical to the {@link #reportParallelMergeParallelism} metric value, but optionally also available as a + * dimension. + */ + void parallelMergeParallelism(int parallelism); + /** * Creates a {@link BitmapResultFactory} which may record some information along bitmap construction from {@link * #preFilters(List)}. The returned BitmapResultFactory may add some dimensions to this QueryMetrics from it's {@link @@ -321,6 +328,38 @@ public interface QueryMetrics> */ QueryMetrics reportPreFilteredRows(long numRows); + /** + * Reports number of parallel tasks the broker used to process the query during parallel merge. This value is + * identical to the {@link #parallelMergeParallelism} dimension value, but optionally also available as a metric. + */ + QueryMetrics reportParallelMergeParallelism(int parallelism); + + /** + * Reports total number of input sequences processed by the broker during parallel merge. + */ + QueryMetrics reportParallelMergeInputSequences(long numSequences); + + /** + * Reports total number of input rows processed by the broker during parallel merge. + */ + QueryMetrics reportParallelMergeInputRows(long numRows); + + /** + * Reports broker total number of output rows after merging and combining input sequences (should be less than or + * equal to the value supplied to {@link #reportParallelMergeInputRows}. + */ + QueryMetrics reportParallelMergeOutputRows(long numRows); + + /** + * Reports broker total number of fork join pool tasks required to complete query + */ + QueryMetrics reportParallelMergeTaskCount(long numTasks); + + /** + * Reports broker total CPU time in nanoseconds where fork join merge combine tasks were doing work + */ + QueryMetrics reportParallelMergeTotalCpuTime(long timeNs); + /** * Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object. */ diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index ca1fe18c5834..b10ab934f784 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -166,6 +166,12 @@ public void vectorized(final boolean vectorized) delegateQueryMetrics.vectorized(vectorized); } + @Override + public void parallelMergeParallelism(int parallelism) + { + delegateQueryMetrics.parallelMergeParallelism(parallelism); + } + @Override public BitmapResultFactory makeBitmapResultFactory(BitmapFactory factory) { @@ -256,6 +262,42 @@ public QueryMetrics reportPreFilteredRows(long numRows) return delegateQueryMetrics.reportPreFilteredRows(numRows); } + @Override + public QueryMetrics reportParallelMergeParallelism(int parallelism) + { + return delegateQueryMetrics.reportParallelMergeParallelism(parallelism); + } + + @Override + public QueryMetrics reportParallelMergeInputSequences(long numSequences) + { + return delegateQueryMetrics.reportParallelMergeInputSequences(numSequences); + } + + @Override + public QueryMetrics reportParallelMergeInputRows(long numRows) + { + return delegateQueryMetrics.reportParallelMergeInputRows(numRows); + } + + @Override + public QueryMetrics reportParallelMergeOutputRows(long numRows) + { + return delegateQueryMetrics.reportParallelMergeOutputRows(numRows); + } + + @Override + public QueryMetrics reportParallelMergeTaskCount(long numTasks) + { + return delegateQueryMetrics.reportParallelMergeTaskCount(numTasks); + } + + @Override + public QueryMetrics reportParallelMergeTotalCpuTime(long timeNs) + { + return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs); + } + @Override public void emit(ServiceEmitter emitter) { diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index b91731a54cef..3df55242bc41 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -55,6 +55,7 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; @@ -316,7 +317,19 @@ private Sequence merge(List> sequencesByInterval) QueryContexts.getParallelMergeParallelism(query, processingConfig.getMergePoolDefaultMaxQueryParallelism()), QueryContexts.getParallelMergeInitialYieldRows(query, processingConfig.getMergePoolTaskInitialYieldRows()), QueryContexts.getParallelMergeSmallBatchRows(query, processingConfig.getMergePoolSmallBatchRows()), - processingConfig.getMergePoolTargetTaskRunTimeMillis() + processingConfig.getMergePoolTargetTaskRunTimeMillis(), + reportMetrics -> { + QueryMetrics queryMetrics = queryPlus.getQueryMetrics(); + if (queryMetrics != null) { + queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism()); + queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()); + queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()); + queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()); + queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()); + queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()); + queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()); + } + } ); } else { return Sequences diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 56cb4864d2b3..a75ad64e080c 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -319,6 +319,13 @@ public String getFormatString() { return null; } + + @Override + public int getMergePoolParallelism() + { + // fixed so same behavior across all test environments + return 4; + } }, ForkJoinPool.commonPool() ); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index d875df329904..cd9e6d071fd6 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2500,6 +2500,13 @@ public String getFormatString() { return null; } + + @Override + public int getMergePoolParallelism() + { + // fixed so same behavior across all test environments + return 4; + } }, ForkJoinPool.commonPool() );