diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index ffdded1397f4..e4609ff292cd 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -107,6 +107,7 @@ import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; @@ -342,7 +343,8 @@ public > QueryToolChest getToolChest processingConfig, forkJoinPool, QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); } diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 71cc33025a3f..d4ca7ad01123 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -59,6 +59,7 @@ Available Metrics |`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/interrupted/count`|number of queries interrupted due to cancellation.|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/timeout/count`|number of timed out queries.|This metric is only available if the QueryCountStatsMonitor module is included.|| +|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will re-send the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.|Varies.| |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|id, nativeQueryIds, dataSource, remoteAddress, success.|< 1s| |`sqlQuery/bytes`|number of bytes returned in SQL query response.|id, nativeQueryIds, dataSource, remoteAddress, success.| | diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index f1173aab03c5..15fdfa768156 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -70,6 +70,7 @@ import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -367,7 +368,8 @@ public String getFormatString() }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker( diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index a351eeb29ed6..095fa17d03a8 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -340,6 +340,13 @@ public QueryMetrics reportParallelMergeTotalCpuTime(long timeNs) return this; } + @Override + public QueryMetrics reportQueriedSegmentCount(long segmentCount) + { + // Don't emit by default. + return this; + } + @Override public void emit(ServiceEmitter emitter) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index 9457fbd75f59..304ed6cd8331 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -270,6 +270,11 @@ public interface QueryMetrics> */ QueryMetrics reportQueryBytes(long byteCount); + /** + * Registeres "segments queried count" metric. + */ + QueryMetrics reportQueriedSegmentCount(long segmentCount); + /** * Registers "wait time" metric. */ diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index dd52bf5a7ffb..108518a79486 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -291,6 +291,12 @@ public QueryMetrics reportParallelMergeTotalCpuTime(long timeNs) return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs); } + @Override + public QueryMetrics reportQueriedSegmentCount(long segmentCount) + { + return delegateQueryMetrics.reportQueriedSegmentCount(segmentCount); + } + @Override public void emit(ServiceEmitter emitter) { diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index e0fe0b4a70da..5091ae08a582 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -148,5 +148,12 @@ public static void testQueryMetricsDefaultMetricNamesAndUnits( actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); Assert.assertEquals("query/node/bytes", actualEvent.get("metric")); Assert.assertEquals(10L, actualEvent.get("value")); + + // Here we are testing that Queried Segment Count does not get emitted by the DefaultQueryMetrics and the last + // metric remains as query/node/bytes + queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/node/bytes", actualEvent.get("metric")); + Assert.assertEquals(10L, actualEvent.get("value")); } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 448184958901..8175e2a3ce07 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DruidProcessingConfig; @@ -129,6 +130,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final ForkJoinPool pool; private final QueryScheduler scheduler; private final JoinableFactoryWrapper joinableFactoryWrapper; + private final ServiceEmitter emitter; @Inject public CachingClusteredClient( @@ -142,7 +144,8 @@ public CachingClusteredClient( DruidProcessingConfig processingConfig, @Merging ForkJoinPool pool, QueryScheduler scheduler, - JoinableFactory joinableFactory + JoinableFactory joinableFactory, + ServiceEmitter emitter ) { this.warehouse = warehouse; @@ -156,6 +159,7 @@ public CachingClusteredClient( this.pool = pool; this.scheduler = scheduler; this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.emitter = emitter; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { log.warn( @@ -369,6 +373,8 @@ ClusterQueryResult run( query = scheduler.prioritizeAndLaneQuery(queryPlus, segmentServers); queryPlus = queryPlus.withQuery(query); + queryPlus = queryPlus.withQueryMetrics(toolChest); + queryPlus.getQueryMetrics().reportQueriedSegmentCount(segmentServers.size()).emit(emitter); final SortedMap> segmentsByServer = groupSegmentsByServer(segmentServers); LazySequence mergedResultSequence = new LazySequence<>(() -> { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 1d591e2148db..a897e09d6434 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -51,6 +51,7 @@ import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -335,7 +336,8 @@ public int getMergePoolParallelism() }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java index ab1d6cd7b801..218cac521bad 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -53,6 +53,7 @@ import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerManagerTest; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -138,7 +139,8 @@ public void testGetQueryRunnerForSegments_singleIntervalLargeSegments() Mockito.mock(DruidProcessingConfig.class), ForkJoinPool.commonPool(), queryScheduler, - NoopJoinableFactory.INSTANCE + NoopJoinableFactory.INSTANCE, + new NoopServiceEmitter() ); Query fakeQuery = makeFakeQuery(interval); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index cda0170d5f0a..6fe08c4622ba 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -125,6 +125,7 @@ import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; @@ -2850,7 +2851,8 @@ public int getMergePoolParallelism() NoQueryLaningStrategy.INSTANCE, new ServerConfig() ), - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index ae0c269adf6b..94c6c5960906 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -52,6 +52,7 @@ import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; @@ -145,7 +146,8 @@ public void setupTestBase() ), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); servers = new ArrayList<>(); }