From e04eabe1cf5df9c5b061c45690884b774f70804e Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 26 Sep 2024 22:14:23 +0530 Subject: [PATCH 01/12] Emit aggregate segment processing metrics per sink instead of firehydrant --- .../emitter/service/ServiceMetricEvent.java | 3 +- .../druid/query/DefaultQueryMetrics.java | 10 +- .../appenderator/SinkQuerySegmentWalker.java | 158 +++++++++++++++++- .../appenderator/StreamAppenderatorTest.java | 127 +++++++++++--- .../StreamAppenderatorTester.java | 45 +++-- 5 files changed, 286 insertions(+), 57 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java index 3bab9a3ad04d..435e42ed725e 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java @@ -31,6 +31,7 @@ import org.joda.time.DateTime; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -197,7 +198,7 @@ public ServiceMetricEvent build(ImmutableMap serviceDimensions) return new ServiceMetricEvent( createdTime, serviceDimensions, - userDims, + new HashMap<>(userDims), feed, metric, value diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index db7346517976..8480265d67c9 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -40,6 +40,10 @@ */ public class DefaultQueryMetrics> implements QueryMetrics { + public static String QUERY_WAIT_TIME = "query/wait/time"; + public static String QUERY_SEGMENT_TIME = "query/segment/time"; + public static String QUERY_SEGMENT_AND_CACHE_TIME = "query/segmentAndCache/time"; + protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); protected final Map metrics = new HashMap<>(); @@ -229,19 +233,19 @@ public QueryMetrics reportQueryBytes(long byteCount) @Override public QueryMetrics reportWaitTime(long timeNs) { - return reportMillisTimeMetric("query/wait/time", timeNs); + return reportMillisTimeMetric(QUERY_WAIT_TIME, timeNs); } @Override public QueryMetrics reportSegmentTime(long timeNs) { - return reportMillisTimeMetric("query/segment/time", timeNs); + return reportMillisTimeMetric(QUERY_SEGMENT_TIME, timeNs); } @Override public QueryMetrics reportSegmentAndCacheTime(long timeNs) { - return reportMillisTimeMetric("query/segmentAndCache/time", timeNs); + return reportMillisTimeMetric(QUERY_SEGMENT_AND_CACHE_TIME, timeNs); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 557198f14244..00c3e7c8c65e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -30,6 +30,11 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.FunctionalIterable; +import org.apache.druid.java.util.common.guava.LazySequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.SequenceWrapper; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; @@ -37,11 +42,11 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FinalizeResultsQueryRunner; -import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -52,6 +57,7 @@ import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SinkQueryRunners; +import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -67,19 +73,28 @@ import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.ObjLongConsumer; import java.util.stream.Collectors; +import static org.apache.druid.query.DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME; +import static org.apache.druid.query.DefaultQueryMetrics.QUERY_SEGMENT_TIME; +import static org.apache.druid.query.DefaultQueryMetrics.QUERY_WAIT_TIME; + /** * Query handler for indexing tasks. */ @@ -194,6 +209,12 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final List allSegmentReferences = new ArrayList<>(); final Map segmentIdMap = new HashMap<>(); final LinkedHashMap>> allRunners = new LinkedHashMap<>(); + final ConcurrentHashMap> segmentMetrics = new ConcurrentHashMap<>(); + + final Map>> metricsToReport = new HashMap<>(); + metricsToReport.put(QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime); + metricsToReport.put(QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime); + metricsToReport.put(QUERY_WAIT_TIME, QueryMetrics::reportWaitTime); try { for (final SegmentDescriptor newDescriptor : specs) { @@ -233,12 +254,15 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final descriptor, sinkSegmentReferences.stream().map( segmentReference -> { - QueryRunner runner = new MetricsEmittingQueryRunner<>( + QueryRunner runner = new SinkMetricsEmittingQueryRunner<>( emitter, factory.getToolchest(), factory.createRunner(segmentReference.getSegment()), - QueryMetrics::reportSegmentTime, - queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) + metricsToReport, + segmentMetrics, + Collections.singleton(QUERY_SEGMENT_TIME), + sinkSegmentId.toString(), + false ); // 1) Only use caching if data is immutable @@ -275,13 +299,16 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the // *possible* caching. - runner = new MetricsEmittingQueryRunner<>( + runner = new SinkMetricsEmittingQueryRunner<>( emitter, factory.getToolchest(), runner, - QueryMetrics::reportSegmentAndCacheTime, - queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) - ).withWaitMeasuredFromNow(); + metricsToReport, + segmentMetrics, + new HashSet<>(Arrays.asList(QUERY_WAIT_TIME, QUERY_SEGMENT_AND_CACHE_TIME)), + sinkSegmentId.toString(), + false + ); // Emit CPU time metrics. runner = CPUTimeMetricQueryRunner.safeBuild( @@ -346,7 +373,19 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final return new ResourceIdPopulatingQueryRunner<>( QueryRunnerHelper.makeClosingQueryRunner( CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner, true), toolChest), + new SinkMetricsEmittingQueryRunner<>( + emitter, + toolChest, + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(mergedRunner, true), + toolChest + ), + metricsToReport, + segmentMetrics, + Collections.emptySet(), + null, + true + ), toolChest, emitter, cpuTimeAccumulator, @@ -395,4 +434,105 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final // with subsegments (hydrants). return segmentId + "_H" + hydrantNumber; } + + public static class SinkMetricsEmittingQueryRunner implements QueryRunner { + + private static final Logger log = new Logger(SinkMetricsEmittingQueryRunner.class); + + private final ServiceEmitter emitter; + private final QueryToolChest> queryToolChest; + private final QueryRunner queryRunner; + private final Map>> metricsToReport; + private final ConcurrentHashMap> segmentMetrics; + private final Set metricsToCompute; + @Nullable + private final String segmentId; + private final boolean report; + private final long creationTimeNs; + + private SinkMetricsEmittingQueryRunner( + ServiceEmitter emitter, + QueryToolChest> queryToolChest, + QueryRunner queryRunner, + Map>> metricsToReport, + ConcurrentHashMap> segmentMetrics, + Set metricsToCompute, + @Nullable String segmentId, + boolean report + ) + { + this.emitter = emitter; + this.queryToolChest = queryToolChest; + this.queryRunner = queryRunner; + this.metricsToReport = metricsToReport; + this.segmentMetrics = segmentMetrics; + this.metricsToCompute = metricsToCompute; + this.segmentId = segmentId; + this.report = report; + this.creationTimeNs = System.nanoTime(); + } + + @Override + public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) + { + QueryPlus queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest); + final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); + + return Sequences.wrap( + // Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying + // Sequence) as part of the reported query time, i.e. we want to execute queryRunner.run() after + // `startTimeNs = System.nanoTime();` + new LazySequence<>(() -> queryRunner.run(queryWithMetrics, responseContext)), + new SequenceWrapper() + { + private long startTimeNs; + + @Override + public void before() + { + startTimeNs = System.nanoTime(); + } + + @Override + public void after(boolean isDone, Throwable thrown) + { + if (!report) { + for (String metric : metricsToCompute) { + if (QUERY_WAIT_TIME.equals(metric)) { + long waitTimeNs = startTimeNs - creationTimeNs; + // segment wait time is the time taken to start processing the first hydrant for the segment + segmentMetrics.computeIfAbsent(segmentId, metrics -> new HashMap<>()) + .putIfAbsent(metric, new AtomicLong(waitTimeNs)); + } else { + long timeTakenNs = System.nanoTime() - startTimeNs; + segmentMetrics.computeIfAbsent(segmentId, metrics -> new HashMap<>()) + .computeIfAbsent(metric, value -> new AtomicLong(0)) + .addAndGet(timeTakenNs); + } + } + } else { + for (Map.Entry> segmentAndMetrics : segmentMetrics.entrySet()) { + queryMetrics.segment(segmentAndMetrics.getKey()); + + for (Map.Entry>> reportMetric : metricsToReport.entrySet()) { + String metricName = reportMetric.getKey(); + if (segmentAndMetrics.getValue().containsKey(metricName)) { + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().get(metricName).get()); + } + } + + try { + queryMetrics.emit(emitter); + } + catch (Exception e) { + // Query should not fail, because of emitter failure. Swallowing the exception. + log.error("Failure while trying to emit [%s] with stacktrace [%s]", emitter.toString(), e); + } + } + } + } + } + ); + } + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 46580a5eeeed..bce2a8f06d45 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -34,6 +34,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.Druids; import org.apache.druid.query.Order; import org.apache.druid.query.QueryPlus; @@ -69,8 +71,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -1136,9 +1140,11 @@ ScheduledFuture getLastScheduledFuture() public void testQueryByIntervals() throws Exception { try ( + final StubServiceEmitter serviceEmitter = new StubServiceEmitter(); final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) .basePersistDirectory(temporaryFolder.newFolder()) + .withServiceEmitter(serviceEmitter) .build()) { final Appenderator appenderator = tester.getAppenderator(); @@ -1177,36 +1183,18 @@ public void testQueryByIntervals() throws Exception results1 ); - // Query2: 2000/2002 - final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() - .dataSource(StreamAppenderatorTester.DATASOURCE) - .intervals(ImmutableList.of(Intervals.of("2000/2002"))) - .aggregators( - Arrays.asList( - new LongSumAggregatorFactory("count", "count"), - new LongSumAggregatorFactory("met", "met") - ) - ) - .granularity(Granularities.DAY) - .build(); - - final List> results2 = - QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList(); - Assert.assertEquals( - "query2", - ImmutableList.of( - new Result<>( - DateTimes.of("2000"), - new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) - ), - new Result<>( - DateTimes.of("2001"), - new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Arrays.asList( + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(1).asSegmentId().toString() ) - ), - results2 + ) ); + serviceEmitter.flush(); + // Query3: 2000/2001T01 final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -1236,6 +1224,19 @@ public void testQueryByIntervals() throws Exception results3 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Arrays.asList( + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(1).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + // Query4: 2000/2001T01, 2001T03/2001T04 final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -1269,6 +1270,16 @@ public void testQueryByIntervals() throws Exception ), results4 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Arrays.asList( + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(1).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); } } @@ -1276,9 +1287,11 @@ public void testQueryByIntervals() throws Exception public void testQueryBySegments() throws Exception { try ( + StubServiceEmitter serviceEmitter = new StubServiceEmitter(); final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) .basePersistDirectory(temporaryFolder.newFolder()) + .withServiceEmitter(serviceEmitter) .build()) { final Appenderator appenderator = tester.getAppenderator(); @@ -1327,6 +1340,17 @@ public void testQueryBySegments() throws Exception results1 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + // Query2: segment #2, partial final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -1363,6 +1387,17 @@ public void testQueryBySegments() throws Exception results2 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + // Query3: segment #2, two disjoint intervals final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -1404,6 +1439,17 @@ public void testQueryBySegments() throws Exception results3 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + final ScanQuery query4 = Druids.newScanQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) .intervals( @@ -1439,6 +1485,33 @@ public void testQueryBySegments() throws Exception new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L}, ((List) ((List) results4.get(1).getEvents()).get(0)).toArray() ); + + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + } + } + + private void verifySinkMetrics(StubServiceEmitter emitter, Set segmentIds) + { + Map> events = emitter.getMetricEvents(); + int segments = segmentIds.size(); + Assert.assertEquals(4, events.size()); + Assert.assertTrue(events.containsKey("query/cpu/time")); + Assert.assertEquals(segments, events.get("query/segment/time").size()); + Assert.assertEquals(segments, events.get("query/segmentAndCache/time").size()); + Assert.assertEquals(segments, events.get("query/wait/time").size()); + for (String id : segmentIds) { + Assert.assertTrue(events.get("query/segment/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(events.get("query/segmentAndCache/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(events.get("query/wait/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index 29d758aaed02..1fdc7bde25d4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -109,7 +109,8 @@ public StreamAppenderatorTester( final RowIngestionMeters rowIngestionMeters, final boolean skipBytesInMemoryOverheadCheck, final DataSegmentAnnouncer announcer, - final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig + final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig, + final ServiceEmitter serviceEmitter ) { objectMapper = new DefaultObjectMapper(); @@ -145,18 +146,18 @@ public StreamAppenderatorTester( .withObjectMapper(objectMapper) .build(); tuningConfig = new TestAppenderatorConfig( - TuningConfig.DEFAULT_APPENDABLE_INDEX, - maxRowsInMemory, - maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, - skipBytesInMemoryOverheadCheck, - IndexSpec.DEFAULT, - 0, - false, - 0L, - OffHeapMemorySegmentWriteOutMediumFactory.instance(), - IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory - ); + TuningConfig.DEFAULT_APPENDABLE_INDEX, + maxRowsInMemory, + maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, + skipBytesInMemoryOverheadCheck, + IndexSpec.DEFAULT, + 0, + false, + 0L, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, + basePersistDirectory + ); metrics = new SegmentGenerationMetrics(); queryExecutor = Execs.singleThreaded("queryExecutor(%d)"); @@ -174,11 +175,12 @@ public StreamAppenderatorTester( OffHeapMemorySegmentWriteOutMediumFactory.instance() ); - emitter = new ServiceEmitter( + emitter = serviceEmitter == null ? new ServiceEmitter( "test", "test", new NoopEmitter() - ); + ) : serviceEmitter; + emitter.start(); EmittingLogger.registerEmitter(emitter); dataSegmentPusher = new DataSegmentPusher() @@ -354,6 +356,7 @@ public static class Builder private RowIngestionMeters rowIngestionMeters; private boolean skipBytesInMemoryOverheadCheck; private int delayInMilli = 0; + private ServiceEmitter serviceEmitter; public Builder maxRowsInMemory(final int maxRowsInMemory) { @@ -397,6 +400,12 @@ public Builder withSegmentDropDelayInMilli(int delayInMilli) return this; } + public Builder withServiceEmitter(ServiceEmitter serviceEmitter) + { + this.serviceEmitter = serviceEmitter; + return this; + } + public StreamAppenderatorTester build() { return new StreamAppenderatorTester( @@ -408,7 +417,8 @@ public StreamAppenderatorTester build() rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters, skipBytesInMemoryOverheadCheck, new NoopDataSegmentAnnouncer(), - CentralizedDatasourceSchemaConfig.create() + CentralizedDatasourceSchemaConfig.create(), + serviceEmitter ); } @@ -426,7 +436,8 @@ public StreamAppenderatorTester build( rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters, skipBytesInMemoryOverheadCheck, dataSegmentAnnouncer, - config + config, + serviceEmitter ); } } From 2aa45b6096046d45869211ac8b65030d9c655001 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 26 Sep 2024 22:21:15 +0530 Subject: [PATCH 02/12] add docs --- .../realtime/appenderator/SinkQuerySegmentWalker.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 00c3e7c8c65e..31df53b1e23f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -435,9 +435,12 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final return segmentId + "_H" + hydrantNumber; } - public static class SinkMetricsEmittingQueryRunner implements QueryRunner { - - private static final Logger log = new Logger(SinkMetricsEmittingQueryRunner.class); + /** + * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time for a sink. + * It accumulates query/segment/time and query/segmentAndCache/time for each hydrant at the level of sink. + * query/wait/time is the time take to process the first hydrant for the sink. + */ + private static class SinkMetricsEmittingQueryRunner implements QueryRunner { private final ServiceEmitter emitter; private final QueryToolChest> queryToolChest; From 5179b6081a1402fa7d94609d8367f89ff302f9c2 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 26 Sep 2024 22:50:33 +0530 Subject: [PATCH 03/12] minor change --- .../realtime/appenderator/SinkQuerySegmentWalker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 31df53b1e23f..1652b0445bec 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -436,9 +436,9 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final } /** - * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time for a sink. - * It accumulates query/segment/time and query/segmentAndCache/time for each hydrant at the level of sink. - * query/wait/time is the time take to process the first hydrant for the sink. + * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink. + * It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink. + * query/wait/time metric is the time taken to process the first FireHydrant for the Sink. */ private static class SinkMetricsEmittingQueryRunner implements QueryRunner { From 7edd60cd132c0d98e5b080c3bbf469097b5ed216 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 27 Sep 2024 11:14:05 +0530 Subject: [PATCH 04/12] checkstyle --- .../appenderator/SinkQuerySegmentWalker.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 1652b0445bec..cec74eab8ea3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -34,12 +34,12 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.SequenceWrapper; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.CPUTimeMetricQueryRunner; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.NoopQueryRunner; @@ -91,10 +91,6 @@ import java.util.function.ObjLongConsumer; import java.util.stream.Collectors; -import static org.apache.druid.query.DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME; -import static org.apache.druid.query.DefaultQueryMetrics.QUERY_SEGMENT_TIME; -import static org.apache.druid.query.DefaultQueryMetrics.QUERY_WAIT_TIME; - /** * Query handler for indexing tasks. */ @@ -212,9 +208,9 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final ConcurrentHashMap> segmentMetrics = new ConcurrentHashMap<>(); final Map>> metricsToReport = new HashMap<>(); - metricsToReport.put(QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime); - metricsToReport.put(QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime); - metricsToReport.put(QUERY_WAIT_TIME, QueryMetrics::reportWaitTime); + metricsToReport.put(DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime); + metricsToReport.put(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime); + metricsToReport.put(DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime); try { for (final SegmentDescriptor newDescriptor : specs) { @@ -260,7 +256,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final factory.createRunner(segmentReference.getSegment()), metricsToReport, segmentMetrics, - Collections.singleton(QUERY_SEGMENT_TIME), + Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME), sinkSegmentId.toString(), false ); @@ -305,7 +301,12 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final runner, metricsToReport, segmentMetrics, - new HashSet<>(Arrays.asList(QUERY_WAIT_TIME, QUERY_SEGMENT_AND_CACHE_TIME)), + new HashSet<>( + Arrays.asList( + DefaultQueryMetrics.QUERY_WAIT_TIME, + DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME + ) + ), sinkSegmentId.toString(), false ); @@ -440,8 +441,8 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final * It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink. * query/wait/time metric is the time taken to process the first FireHydrant for the Sink. */ - private static class SinkMetricsEmittingQueryRunner implements QueryRunner { - + private static class SinkMetricsEmittingQueryRunner implements QueryRunner + { private final ServiceEmitter emitter; private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; @@ -501,7 +502,7 @@ public void after(boolean isDone, Throwable thrown) { if (!report) { for (String metric : metricsToCompute) { - if (QUERY_WAIT_TIME.equals(metric)) { + if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) { long waitTimeNs = startTimeNs - creationTimeNs; // segment wait time is the time taken to start processing the first hydrant for the segment segmentMetrics.computeIfAbsent(segmentId, metrics -> new HashMap<>()) From c3d9685dc43aa586593ef1a1dfcd45ce40846c49 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 27 Sep 2024 11:16:09 +0530 Subject: [PATCH 05/12] Fix DefaultQueryMetricsTest --- .../java/org/apache/druid/query/DefaultQueryMetricsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index f2fbdc1eb984..78188d8fedc5 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -66,11 +66,11 @@ public void testDefaultQueryMetricsQuery() .context(ImmutableMap.of("testKey", "testValue")) .build(); queryMetrics.query(query); + queryMetrics.sqlQueryId("dummy"); + queryMetrics.queryId("dummy"); queryMetrics.reportQueryTime(0).emit(serviceEmitter); // No way to verify this right now since DefaultQueryMetrics implements a no-op for sqlQueryId(String) and queryId(String) // This change is done to keep the code coverage tool happy by exercising the implementation - queryMetrics.sqlQueryId("dummy"); - queryMetrics.queryId("dummy"); Map actualEvent = serviceEmitter.getEvents().get(0).toMap(); Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); From cb3a318f6ee5017b5f8c212002cb1a37a52df8e6 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 27 Sep 2024 15:49:04 +0530 Subject: [PATCH 06/12] Minor changes in SinkMetricsEmittingQueryRunner --- .../appenderator/SinkQuerySegmentWalker.java | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index cec74eab8ea3..05fc9a50be5b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -205,7 +205,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final List allSegmentReferences = new ArrayList<>(); final Map segmentIdMap = new HashMap<>(); final LinkedHashMap>> allRunners = new LinkedHashMap<>(); - final ConcurrentHashMap> segmentMetrics = new ConcurrentHashMap<>(); + final ConcurrentHashMap> segmentMetricsAccumulator = new ConcurrentHashMap<>(); final Map>> metricsToReport = new HashMap<>(); metricsToReport.put(DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime); @@ -255,10 +255,9 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final factory.getToolchest(), factory.createRunner(segmentReference.getSegment()), metricsToReport, - segmentMetrics, + segmentMetricsAccumulator, Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME), - sinkSegmentId.toString(), - false + sinkSegmentId.toString() ); // 1) Only use caching if data is immutable @@ -300,15 +299,14 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final factory.getToolchest(), runner, metricsToReport, - segmentMetrics, + segmentMetricsAccumulator, new HashSet<>( Arrays.asList( DefaultQueryMetrics.QUERY_WAIT_TIME, DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME ) ), - sinkSegmentId.toString(), - false + sinkSegmentId.toString() ); // Emit CPU time metrics. @@ -382,10 +380,9 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final toolChest ), metricsToReport, - segmentMetrics, + segmentMetricsAccumulator, Collections.emptySet(), - null, - true + null ), toolChest, emitter, @@ -447,11 +444,10 @@ private static class SinkMetricsEmittingQueryRunner implements QueryRunner private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; private final Map>> metricsToReport; - private final ConcurrentHashMap> segmentMetrics; + private final ConcurrentHashMap> segmentMetricsAccumulator; private final Set metricsToCompute; @Nullable private final String segmentId; - private final boolean report; private final long creationTimeNs; private SinkMetricsEmittingQueryRunner( @@ -459,20 +455,18 @@ private SinkMetricsEmittingQueryRunner( QueryToolChest> queryToolChest, QueryRunner queryRunner, Map>> metricsToReport, - ConcurrentHashMap> segmentMetrics, + ConcurrentHashMap> segmentMetricsAccumulator, Set metricsToCompute, - @Nullable String segmentId, - boolean report + @Nullable String segmentId ) { this.emitter = emitter; this.queryToolChest = queryToolChest; this.queryRunner = queryRunner; this.metricsToReport = metricsToReport; - this.segmentMetrics = segmentMetrics; + this.segmentMetricsAccumulator = segmentMetricsAccumulator; this.metricsToCompute = metricsToCompute; this.segmentId = segmentId; - this.report = report; this.creationTimeNs = System.nanoTime(); } @@ -483,7 +477,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); return Sequences.wrap( - // Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying + // Use LazySequence because we want to account execution time of queryRunner.run() (it prepares the underlying // Sequence) as part of the reported query time, i.e. we want to execute queryRunner.run() after // `startTimeNs = System.nanoTime();` new LazySequence<>(() -> queryRunner.run(queryWithMetrics, responseContext)), @@ -500,22 +494,24 @@ public void before() @Override public void after(boolean isDone, Throwable thrown) { - if (!report) { + if (segmentId != null) { + // accumulate metrics for (String metric : metricsToCompute) { if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) { long waitTimeNs = startTimeNs - creationTimeNs; - // segment wait time is the time taken to start processing the first hydrant for the segment - segmentMetrics.computeIfAbsent(segmentId, metrics -> new HashMap<>()) - .putIfAbsent(metric, new AtomicLong(waitTimeNs)); + // segment wait time is the time taken to start processing the first FireHydrant for the Sink + segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>()) + .putIfAbsent(metric, new AtomicLong(waitTimeNs)); } else { long timeTakenNs = System.nanoTime() - startTimeNs; - segmentMetrics.computeIfAbsent(segmentId, metrics -> new HashMap<>()) - .computeIfAbsent(metric, value -> new AtomicLong(0)) - .addAndGet(timeTakenNs); + segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>()) + .computeIfAbsent(metric, value -> new AtomicLong(0)) + .addAndGet(timeTakenNs); } } } else { - for (Map.Entry> segmentAndMetrics : segmentMetrics.entrySet()) { + // report accumulated metrics + for (Map.Entry> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) { queryMetrics.segment(segmentAndMetrics.getKey()); for (Map.Entry>> reportMetric : metricsToReport.entrySet()) { From 6c372c5467b987a5d09567d3849882334d51a366 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 27 Sep 2024 15:49:24 +0530 Subject: [PATCH 07/12] spotbugs --- .../java/org/apache/druid/query/DefaultQueryMetrics.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index 8480265d67c9..bbf748d2a8ed 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -40,9 +40,9 @@ */ public class DefaultQueryMetrics> implements QueryMetrics { - public static String QUERY_WAIT_TIME = "query/wait/time"; - public static String QUERY_SEGMENT_TIME = "query/segment/time"; - public static String QUERY_SEGMENT_AND_CACHE_TIME = "query/segmentAndCache/time"; + public static final String QUERY_WAIT_TIME = "query/wait/time"; + public static final String QUERY_SEGMENT_TIME = "query/segment/time"; + public static final String QUERY_SEGMENT_AND_CACHE_TIME = "query/segmentAndCache/time"; protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); protected final Map metrics = new HashMap<>(); From 5cf1028f69adb574cc6f6238ea1fd9c01de39025 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 10 Dec 2024 13:03:32 +0530 Subject: [PATCH 08/12] Address review comments --- .../appenderator/SinkQuerySegmentWalker.java | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index b0108b6b19bd..ef9a06b1b516 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -100,6 +100,23 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; + private static final Set SEGMENT_QUERY_METRIC = Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME); + private static final Set SEGMENT_CACHE_AND_WAIT_METRICS = Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + DefaultQueryMetrics.QUERY_WAIT_TIME, + DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME + ) + ) + ); + + private static final Map>> METRICS_TO_REPORT = + Map.of( + DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime, + DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime, + DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime + ); + private final String dataSource; // Maintain a timeline of ids and Sinks for all the segments including the base and upgraded versions @@ -207,11 +224,6 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final LinkedHashMap>> allRunners = new LinkedHashMap<>(); final ConcurrentHashMap> segmentMetricsAccumulator = new ConcurrentHashMap<>(); - final Map>> metricsToReport = new HashMap<>(); - metricsToReport.put(DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime); - metricsToReport.put(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime); - metricsToReport.put(DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime); - try { for (final SegmentDescriptor descriptor : specs) { final PartitionChunk chunk = upgradedSegmentsTimeline.findChunk( @@ -253,9 +265,8 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final emitter, factory.getToolchest(), factory.createRunner(segmentReference.getSegment()), - metricsToReport, segmentMetricsAccumulator, - Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME), + SEGMENT_QUERY_METRIC, sinkSegmentId.toString() ); @@ -297,14 +308,8 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final emitter, factory.getToolchest(), runner, - metricsToReport, segmentMetricsAccumulator, - new HashSet<>( - Arrays.asList( - DefaultQueryMetrics.QUERY_WAIT_TIME, - DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME - ) - ), + SEGMENT_CACHE_AND_WAIT_METRICS, sinkSegmentId.toString() ); @@ -378,7 +383,6 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final toolChest.mergeResults(mergedRunner, true), toolChest ), - metricsToReport, segmentMetricsAccumulator, Collections.emptySet(), null @@ -455,16 +459,20 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final } /** - * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink. + * This class is responsible for emitting query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink. * It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink. * query/wait/time metric is the time taken to process the first FireHydrant for the Sink. + *

+ * This class operates in two distinct modes based on whether {@link SinkMetricsEmittingQueryRunner#segmentId} is null or non-null. + * When segmentId is non-null, it accumulates the metrics. When segmentId is null, it emits the accumulated metrics. + *

+ * This class is derived from {@link org.apache.druid.query.MetricsEmittingQueryRunner}. */ private static class SinkMetricsEmittingQueryRunner implements QueryRunner { private final ServiceEmitter emitter; private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; - private final Map>> metricsToReport; private final ConcurrentHashMap> segmentMetricsAccumulator; private final Set metricsToCompute; @Nullable @@ -475,7 +483,6 @@ private SinkMetricsEmittingQueryRunner( ServiceEmitter emitter, QueryToolChest> queryToolChest, QueryRunner queryRunner, - Map>> metricsToReport, ConcurrentHashMap> segmentMetricsAccumulator, Set metricsToCompute, @Nullable String segmentId @@ -484,7 +491,6 @@ private SinkMetricsEmittingQueryRunner( this.emitter = emitter; this.queryToolChest = queryToolChest; this.queryRunner = queryRunner; - this.metricsToReport = metricsToReport; this.segmentMetricsAccumulator = segmentMetricsAccumulator; this.metricsToCompute = metricsToCompute; this.segmentId = segmentId; @@ -495,8 +501,6 @@ private SinkMetricsEmittingQueryRunner( public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) { QueryPlus queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest); - final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); - return Sequences.wrap( // Use LazySequence because we want to account execution time of queryRunner.run() (it prepares the underlying // Sequence) as part of the reported query time, i.e. we want to execute queryRunner.run() after @@ -531,11 +535,12 @@ public void after(boolean isDone, Throwable thrown) } } } else { + final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); // report accumulated metrics for (Map.Entry> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) { queryMetrics.segment(segmentAndMetrics.getKey()); - for (Map.Entry>> reportMetric : metricsToReport.entrySet()) { + for (Map.Entry>> reportMetric : METRICS_TO_REPORT.entrySet()) { String metricName = reportMetric.getKey(); if (segmentAndMetrics.getValue().containsKey(metricName)) { reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().get(metricName).get()); @@ -547,7 +552,7 @@ public void after(boolean isDone, Throwable thrown) } catch (Exception e) { // Query should not fail, because of emitter failure. Swallowing the exception. - log.error("Failure while trying to emit [%s] with stacktrace [%s]", emitter.toString(), e); + log.error(e, "Failed to emit metrics for segment[%s]", segmentAndMetrics.getKey()); } } } From 703876f6a087a26477f55064ebde0f1130e55fad Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 10 Dec 2024 13:18:13 +0530 Subject: [PATCH 09/12] Use ImmutableSet and ImmutableMap --- .../appenderator/SinkQuerySegmentWalker.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index ef9a06b1b516..40fb6078fe4f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; @@ -77,10 +79,8 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -100,18 +100,14 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; - private static final Set SEGMENT_QUERY_METRIC = Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME); - private static final Set SEGMENT_CACHE_AND_WAIT_METRICS = Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList( - DefaultQueryMetrics.QUERY_WAIT_TIME, - DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME - ) - ) + private static final Set SEGMENT_QUERY_METRIC = ImmutableSet.of(DefaultQueryMetrics.QUERY_SEGMENT_TIME); + private static final Set SEGMENT_CACHE_AND_WAIT_METRICS = ImmutableSet.of( + DefaultQueryMetrics.QUERY_WAIT_TIME, + DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME ); private static final Map>> METRICS_TO_REPORT = - Map.of( + ImmutableMap.of( DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime, DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime, DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime From 9d03a6a212d179bbaaa2af97381b549b47ef75ef Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 11 Dec 2024 09:42:48 +0530 Subject: [PATCH 10/12] Create a helper class for saving state of StubServiceEmitter --- .../emitter/service/ServiceMetricEvent.java | 3 +- .../java/util/metrics/StubServiceEmitter.java | 40 ++++++++++++++++--- .../druid/query/DefaultQueryMetricsTest.java | 4 +- .../appenderator/StreamAppenderatorTest.java | 3 +- .../server/audit/SQLAuditManagerTest.java | 12 +++--- 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java index 435e42ed725e..3bab9a3ad04d 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java @@ -31,7 +31,6 @@ import org.joda.time.DateTime; import java.util.Arrays; -import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -198,7 +197,7 @@ public ServiceMetricEvent build(ImmutableMap serviceDimensions) return new ServiceMetricEvent( createdTime, serviceDimensions, - new HashMap<>(userDims), + userDims, feed, metric, value diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index e4a8b9403dd0..323d8cd308c9 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie { private final List events = new ArrayList<>(); private final List alertEvents = new ArrayList<>(); - private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { @@ -56,7 +56,7 @@ public void emit(Event event) if (event instanceof ServiceMetricEvent) { ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>()) - .add(metricEvent); + .add(new ServiceMetricEventSnapshot(metricEvent)); } else if (event instanceof AlertEvent) { alertEvents.add((AlertEvent) event); } @@ -76,7 +76,7 @@ public List getEvents() * * @return Map from metric name to list of events emitted for that metric. */ - public Map> getMetricEvents() + public Map> getMetricEvents() { return metricEvents; } @@ -96,18 +96,18 @@ public List getMetricValues( ) { final List values = new ArrayList<>(); - final List events = + final List events = metricEvents.getOrDefault(metricName, Collections.emptyList()); final Map filters = dimensionFilters == null ? Collections.emptyMap() : dimensionFilters; - for (ServiceMetricEvent event : events) { + for (ServiceMetricEventSnapshot event : events) { final Map userDims = event.getUserDims(); boolean match = filters.keySet().stream() .map(d -> filters.get(d).equals(userDims.get(d))) .reduce((a, b) -> a && b) .orElse(true); if (match) { - values.add(event.getValue()); + values.add(event.getMetricEvent().getValue()); } } @@ -131,4 +131,32 @@ public void flush() public void close() { } + + /** + * Helper class to encapsulate a ServiceMetricEvent and its user dimensions. + * Since {@link StubServiceEmitter} doesn't actually emit metrics and saves the emitted metrics in-memory, + * this helper class saves a copy of {@link ServiceMetricEvent#userDims} of emitted metrics + * via {@link ServiceMetricEvent#getUserDims()} as it can get mutated. + */ + public static class ServiceMetricEventSnapshot + { + private final ServiceMetricEvent metricEvent; + private final Map userDims; + + public ServiceMetricEventSnapshot(ServiceMetricEvent metricEvent) + { + this.metricEvent = metricEvent; + this.userDims = metricEvent.getUserDims(); + } + + public ServiceMetricEvent getMetricEvent() + { + return metricEvent; + } + + public Map getUserDims() + { + return userDims; + } + } } diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index 78188d8fedc5..f2fbdc1eb984 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -66,11 +66,11 @@ public void testDefaultQueryMetricsQuery() .context(ImmutableMap.of("testKey", "testValue")) .build(); queryMetrics.query(query); - queryMetrics.sqlQueryId("dummy"); - queryMetrics.queryId("dummy"); queryMetrics.reportQueryTime(0).emit(serviceEmitter); // No way to verify this right now since DefaultQueryMetrics implements a no-op for sqlQueryId(String) and queryId(String) // This change is done to keep the code coverage tool happy by exercising the implementation + queryMetrics.sqlQueryId("dummy"); + queryMetrics.queryId("dummy"); Map actualEvent = serviceEmitter.getEvents().get(0).toMap(); Assert.assertEquals(13, actualEvent.size()); Assert.assertTrue(actualEvent.containsKey("feed")); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 5ec0d600542f..b8190e0c1f9d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -34,7 +34,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.Druids; @@ -2226,7 +2225,7 @@ public void testQueryBySegments() throws Exception private void verifySinkMetrics(StubServiceEmitter emitter, Set segmentIds) { - Map> events = emitter.getMetricEvents(); + Map> events = emitter.getMetricEvents(); int segments = segmentIds.size(); Assert.assertEquals(4, events.size()); Assert.assertTrue(events.containsKey("query/cpu/time")); diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index a12722088159..f10248bf1e92 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -91,14 +91,14 @@ public void testAuditMetricEventWithPayload() throws IOException final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc()); auditManager.doAudit(entry); - Map> metricEvents = serviceEmitter.getMetricEvents(); + Map> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List auditMetricEvents = metricEvents.get("config/audit"); + List auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0); + ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); final AuditEntry dbEntry = lookupAuditEntryForKey("testKey"); Assert.assertNotNull(dbEntry); @@ -120,14 +120,14 @@ public void testCreateAuditEntry() throws IOException Assert.assertEquals(entry, dbEntry); // Verify emitted metrics - Map> metricEvents = serviceEmitter.getMetricEvents(); + Map> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List auditMetricEvents = metricEvents.get("config/audit"); + List auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0); + ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key")); Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type")); Assert.assertNull(metric.getUserDims().get("payload")); From 4301734c0a81358b13b6812ea492a87cd87e57a7 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 11 Dec 2024 11:32:47 +0530 Subject: [PATCH 11/12] Add SinkQuerySegmentWalkerBenchmark --- .../SinkQuerySegmentWalkerBenchmark.java | 156 ++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java new file mode 100644 index 000000000000..8342dd565ca1 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.LoggingEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.Result; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester; +import org.apache.druid.segment.realtime.sink.Committers; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SinkQuerySegmentWalkerBenchmark +{ + static { + NullHandling.initializeForTests(); + } + + @Param({"10", "50", "100", "200"}) + private int numFireHydrants; + + private final LoggingEmitter loggingEmitter = new LoggingEmitter(new Logger(LoggingEmitter.class), LoggingEmitter.Level.INFO, new DefaultObjectMapper()); + private final ServiceEmitter serviceEmitter = new ServiceEmitter("test", "test", loggingEmitter); + private File cacheDir; + + private Appenderator appenderator; + + @Setup(Level.Trial) + public void setup() throws Exception + { + final String userConfiguredCacheDir = System.getProperty("druid.benchmark.cacheDir", System.getenv("DRUID_BENCHMARK_CACHE_DIR")); + cacheDir = new File(userConfiguredCacheDir); + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(1) + .basePersistDirectory(cacheDir) + .withServiceEmitter(serviceEmitter) + .build(); + + appenderator = tester.getAppenderator(); + appenderator.startJob(); + + final SegmentIdWithShardSpec segmentIdWithShardSpec = new SegmentIdWithShardSpec( + StreamAppenderatorTester.DATASOURCE, + Intervals.of("2000/2001"), + "A", + new LinearShardSpec(0) + ); + + for (int i = 0; i < numFireHydrants; i++) { + final MapBasedInputRow inputRow = new MapBasedInputRow( + DateTimes.of("2000").getMillis(), + ImmutableList.of("dim"), + ImmutableMap.of( + "dim", + "bar_" + i, + "met", + 1 + ) + ); + appenderator.add(segmentIdWithShardSpec, inputRow, Suppliers.ofInstance(Committers.nil())); + } + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception + { + appenderator.close(); + FileUtils.deleteDirectory(cacheDir); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void emitSinkMetrics(Blackhole blackhole) throws Exception + { + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + final List> results = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + blackhole.consume(results); + + serviceEmitter.flush(); + } + } +} From 9f732f3257fc2beb9bc3a684009e31ade08d2c3d Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Wed, 18 Dec 2024 01:36:58 +0530 Subject: [PATCH 12/12] Create SegmentMetrics class for tracking segment metrics --- .../appenderator/SinkQuerySegmentWalker.java | 80 ++++++++++++++----- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 40fb6078fe4f..c256e82c6d2d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -218,7 +218,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final List allSegmentReferences = new ArrayList<>(); final Map segmentIdMap = new HashMap<>(); final LinkedHashMap>> allRunners = new LinkedHashMap<>(); - final ConcurrentHashMap> segmentMetricsAccumulator = new ConcurrentHashMap<>(); + final ConcurrentHashMap segmentMetricsAccumulator = new ConcurrentHashMap<>(); try { for (final SegmentDescriptor descriptor : specs) { @@ -469,7 +469,7 @@ private static class SinkMetricsEmittingQueryRunner implements QueryRunner private final ServiceEmitter emitter; private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; - private final ConcurrentHashMap> segmentMetricsAccumulator; + private final ConcurrentHashMap segmentMetricsAccumulator; private final Set metricsToCompute; @Nullable private final String segmentId; @@ -479,7 +479,7 @@ private SinkMetricsEmittingQueryRunner( ServiceEmitter emitter, QueryToolChest> queryToolChest, QueryRunner queryRunner, - ConcurrentHashMap> segmentMetricsAccumulator, + ConcurrentHashMap segmentMetricsAccumulator, Set metricsToCompute, @Nullable String segmentId ) @@ -517,29 +517,31 @@ public void after(boolean isDone, Throwable thrown) { if (segmentId != null) { // accumulate metrics - for (String metric : metricsToCompute) { - if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) { - long waitTimeNs = startTimeNs - creationTimeNs; - // segment wait time is the time taken to start processing the first FireHydrant for the Sink - segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>()) - .putIfAbsent(metric, new AtomicLong(waitTimeNs)); - } else { - long timeTakenNs = System.nanoTime() - startTimeNs; - segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>()) - .computeIfAbsent(metric, value -> new AtomicLong(0)) - .addAndGet(timeTakenNs); - } + final SegmentMetrics metrics = segmentMetricsAccumulator.computeIfAbsent(segmentId, id -> new SegmentMetrics()); + if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_WAIT_TIME)) { + metrics.setWaitTime(startTimeNs - creationTimeNs); + } + if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_TIME)) { + metrics.addSegmentTime(System.nanoTime() - startTimeNs); + } + if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME)) { + metrics.addSegmentAndCacheTime(System.nanoTime() - startTimeNs); } } else { final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); // report accumulated metrics - for (Map.Entry> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) { + for (Map.Entry segmentAndMetrics : segmentMetricsAccumulator.entrySet()) { queryMetrics.segment(segmentAndMetrics.getKey()); for (Map.Entry>> reportMetric : METRICS_TO_REPORT.entrySet()) { - String metricName = reportMetric.getKey(); - if (segmentAndMetrics.getValue().containsKey(metricName)) { - reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().get(metricName).get()); + final String metricName = reportMetric.getKey(); + switch (metricName) { + case DefaultQueryMetrics.QUERY_SEGMENT_TIME: + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentTime()); + case DefaultQueryMetrics.QUERY_WAIT_TIME: + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getWaitTime()); + case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME: + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentAndCacheTime()); } } @@ -556,6 +558,46 @@ public void after(boolean isDone, Throwable thrown) } ); } + + /** + * Class to track segment related metrics during query execution. + */ + private static class SegmentMetrics + { + private final AtomicLong querySegmentTime = new AtomicLong(0); + private final AtomicLong queryWaitTime = new AtomicLong(0); + private final AtomicLong querySegmentAndCacheTime = new AtomicLong(0); + + private void addSegmentTime(long time) + { + querySegmentTime.addAndGet(time); + } + + private void setWaitTime(long time) + { + queryWaitTime.set(time); + } + + private void addSegmentAndCacheTime(long time) + { + querySegmentAndCacheTime.addAndGet(time); + } + + private long getSegmentTime() + { + return querySegmentTime.get(); + } + + private long getWaitTime() + { + return queryWaitTime.get(); + } + + private long getSegmentAndCacheTime() + { + return querySegmentAndCacheTime.get(); + } + } } private static class SinkHolder implements Overshadowable