Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final BinaryOperator<T> combineFn;
private final int queueSize;
private final boolean hasTimeout;
private final long startTimeNanos;
private final long timeoutAtNanos;
private final int yieldAfter;
private final int batchSize;
Expand Down Expand Up @@ -105,12 +106,13 @@ public ParallelMergeCombiningSequence(
this.orderingFn = orderingFn;
this.combineFn = combineFn;
this.hasTimeout = hasTimeout;
this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS);
this.startTimeNanos = System.nanoTime();
this.timeoutAtNanos = startTimeNanos + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS);
this.parallelism = parallelism;
this.yieldAfter = yieldAfter;
this.batchSize = batchSize;
this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS);
this.queueSize = 4 * (yieldAfter / batchSize);
this.queueSize = (1 << 15) / batchSize; // each queue can by default hold ~32k rows
this.metricsReporter = reporter;
this.cancellationGizmo = new CancellationGizmo();
}
Expand All @@ -121,8 +123,9 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulat
if (inputSequences.isEmpty()) {
return Sequences.<T>empty().toYielder(initValue, accumulator);
}

final BlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<>(queueSize);
// we make final output queue larger than the merging queues so if downstream readers are slower to read there is
// less chance of blocking the merge
final BlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<>(4 * queueSize);
Comment thread
rohangarg marked this conversation as resolved.
final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size());
MergeCombinePartitioningAction<T> mergeCombineAction = new MergeCombinePartitioningAction<>(
inputSequences,
Expand All @@ -147,6 +150,7 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulat
cancellationGizmo
).withBaggage(() -> {
if (metricsReporter != null) {
metricsAccumulator.setTotalWallTime(System.nanoTime() - startTimeNanos);
metricsReporter.accept(metricsAccumulator.build());
}
});
Expand Down Expand Up @@ -698,6 +702,8 @@ private static class PrepareMergeCombineInputsAction<T> extends RecursiveAction
private final MergeCombineActionMetricsAccumulator metricsAccumulator;
private final CancellationGizmo cancellationGizmo;

private final long startTime;

private PrepareMergeCombineInputsAction(
List<BatchedResultsCursor<T>> partition,
QueuePusher<ResultBatch<T>> outputQueue,
Expand All @@ -719,6 +725,7 @@ private PrepareMergeCombineInputsAction(
this.targetTimeNanos = targetTimeNanos;
this.metricsAccumulator = metricsAccumulator;
this.cancellationGizmo = cancellationGizmo;
this.startTime = System.nanoTime();
}

@SuppressWarnings("unchecked")
Expand All @@ -736,7 +743,6 @@ protected void compute()
cursor.close();
}
}

if (cursors.size() > 0) {
getPool().execute(new MergeCombineAction<T>(
cursors,
Expand All @@ -753,6 +759,7 @@ protected void compute()
} else {
outputQueue.offer(ResultBatch.TERMINAL);
}
metricsAccumulator.setPartitionInitializedTime(System.nanoTime() - startTime);
}
catch (Throwable t) {
closeAllCursors(partition);
Expand Down Expand Up @@ -1195,14 +1202,20 @@ public static class MergeCombineMetrics
private final long outputRows;
private final long taskCount;
private final long totalCpuTime;
private final long totalWallTime;
private final long fastestPartitionInitializedTime;
private final long slowestPartitionInitializedTime;

MergeCombineMetrics(
int parallelism,
int inputSequences,
long inputRows,
long outputRows,
long taskCount,
long totalCpuTime
long totalCpuTime,
long totalWallTime,
long fastestPartitionInitializedTime,
long slowestPartitionInitializedTime
)
{
this.parallelism = parallelism;
Expand All @@ -1211,6 +1224,9 @@ public static class MergeCombineMetrics
this.outputRows = outputRows;
this.taskCount = taskCount;
this.totalCpuTime = totalCpuTime;
this.totalWallTime = totalWallTime;
this.fastestPartitionInitializedTime = fastestPartitionInitializedTime;
this.slowestPartitionInitializedTime = slowestPartitionInitializedTime;
}

/**
Expand Down Expand Up @@ -1263,6 +1279,21 @@ public long getTotalCpuTime()
{
return totalCpuTime;
}

public long getTotalTime()
{
return totalWallTime;
}

public long getFastestPartitionInitializedTime()
{
return fastestPartitionInitializedTime;
}

public long getSlowestPartitionInitializedTime()
{
return slowestPartitionInitializedTime;
}
}

/**
Expand All @@ -1274,6 +1305,9 @@ static class MergeCombineMetricsAccumulator
{
List<MergeCombineActionMetricsAccumulator> partitionMetrics;
MergeCombineActionMetricsAccumulator mergeMetrics;

private long totalWallTime;

private final int inputSequences;

MergeCombineMetricsAccumulator(int inputSequences)
Expand All @@ -1291,6 +1325,11 @@ void setPartitions(List<MergeCombineActionMetricsAccumulator> partitionMetrics)
this.partitionMetrics = partitionMetrics;
}

void setTotalWallTime(long time)
{
this.totalWallTime = time;
}

MergeCombineMetrics build()
{
long numInputRows = 0;
Expand All @@ -1299,11 +1338,20 @@ MergeCombineMetrics build()
// partition
long totalPoolTasks = 1 + 1 + partitionMetrics.size();

long fastestPartInitialized = partitionMetrics.size() > 0 ? Long.MAX_VALUE : mergeMetrics.getPartitionInitializedtime();
long slowestPartInitialied = partitionMetrics.size() > 0 ? Long.MIN_VALUE : mergeMetrics.getPartitionInitializedtime();

// accumulate input row count, cpu time, and total number of tasks from each partition
for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) {
numInputRows += partition.getInputRows();
cpuTimeNanos += partition.getTotalCpuTimeNanos();
totalPoolTasks += partition.getTaskCount();
if (partition.getPartitionInitializedtime() < fastestPartInitialized) {
fastestPartInitialized = partition.getPartitionInitializedtime();
}
if (partition.getPartitionInitializedtime() > slowestPartInitialied) {
slowestPartInitialied = partition.getPartitionInitializedtime();
}
}
// if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the
// value as it is only the number of intermediary input rows to the layer 2 task
Expand All @@ -1322,7 +1370,10 @@ MergeCombineMetrics build()
numInputRows,
numOutputRows,
totalPoolTasks,
cpuTimeNanos
cpuTimeNanos,
totalWallTime,
fastestPartInitialized,
slowestPartInitialied
);
}
}
Expand All @@ -1337,6 +1388,8 @@ static class MergeCombineActionMetricsAccumulator
private long outputRows = 0;
private long totalCpuTimeNanos = 0;

private long partitionInitializedtime = 0L;

void incrementTaskCount()
{
taskCount++;
Expand All @@ -1357,6 +1410,11 @@ void incrementCpuTimeNanos(long nanos)
totalCpuTimeNanos += nanos;
}

void setPartitionInitializedTime(long nanos)
{
partitionInitializedtime = nanos;
}

long getTaskCount()
{
return taskCount;
Expand All @@ -1376,6 +1434,11 @@ long getTotalCpuTimeNanos()
{
return totalCpuTimeNanos;
}

long getPartitionInitializedtime()
{
return partitionInitializedtime;
}
}

private static <T> void closeAllCursors(final Collection<BatchedResultsCursor<T>> cursors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,27 @@ public QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs)
return this;
}

@Override
public QueryMetrics<QueryType> reportParallelMergeTotalTime(long timeNs)
{
// Don't emit by default.
return this;
}

@Override
public QueryMetrics<QueryType> reportParallelMergeFastestPartitionTime(long timeNs)
{
// Don't emit by default.
return this;
}

@Override
public QueryMetrics<QueryType> reportParallelMergeSlowestPartitionTime(long timeNs)
{
// Don't emit by default.
return this;
}

@Override
public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount)
{
Expand Down
24 changes: 24 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,30 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/
QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs);

/**
* Reports broker total "wall" time in nanoseconds from parallel merge start sequence creation to total
* consumption.
*/
QueryMetrics<QueryType> reportParallelMergeTotalTime(long timeNs);

/**
* Reports broker "wall" time in nanoseconds for the fastest parallel merge sequence partition to be 'initialized',
* where 'initialized' is time to the first result batch is populated from data servers and merging can begin.
*
* Similar to query 'time to first byte' metrics, except is a composite of the whole group of data servers which are
* present in the merge partition, which all must supply an initial result batch before merging can actually begin.
*/
QueryMetrics<QueryType> reportParallelMergeFastestPartitionTime(long timeNs);
Comment thread
rohangarg marked this conversation as resolved.

/**
* Reports broker "wall" time in nanoseconds for the slowest parallel merge sequence partition to be 'initialized',
* where 'initialized' is time to the first result batch is populated from data servers and merging can begin.
*
* Similar to query 'time to first byte' metrics, except is a composite of the whole group of data servers which are
* present in the merge partition, which all must supply an initial result batch before merging can actually begin.
*/
QueryMetrics<QueryType> reportParallelMergeSlowestPartitionTime(long timeNs);

/**
* Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ public QueryMetrics reportParallelMergeTotalCpuTime(long timeNs)
return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs);
}

@Override
public QueryMetrics reportParallelMergeTotalTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeTotalTime(timeNs);
}

@Override
public QueryMetrics reportParallelMergeFastestPartitionTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeFastestPartitionTime(timeNs);
}

@Override
public QueryMetrics reportParallelMergeSlowestPartitionTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeSlowestPartitionTime(timeNs);
}

@Override
public QueryMetrics reportQueriedSegmentCount(long segmentCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,17 @@ private Sequence<T> merge(List<Sequence<T>> sequencesByInterval)
QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics();
if (queryMetrics != null) {
queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism());
queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism());
queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences());
queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows());
queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows());
queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount());
queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime());
queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()).emit(emitter);
queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()).emit(emitter);
queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()).emit(emitter);
queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()).emit(emitter);
queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()).emit(emitter);
queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()).emit(emitter);
queryMetrics.reportParallelMergeTotalTime(reportMetrics.getTotalTime()).emit(emitter);
queryMetrics.reportParallelMergeSlowestPartitionTime(reportMetrics.getSlowestPartitionInitializedTime())
.emit(emitter);
queryMetrics.reportParallelMergeFastestPartitionTime(reportMetrics.getFastestPartitionInitializedTime())
.emit(emitter);
}
}
);
Expand Down Expand Up @@ -884,7 +889,6 @@ private PartitionChunkEntry<String, ServerSelector> toChunkEntry(
return null;
}
return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk);

}
}
}