diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index f663990244eb..1a15a5314f88 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -89,9 +89,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| ### Historical @@ -113,9 +116,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| ### Real-time @@ -140,9 +146,12 @@ to represent the task ID are deprecated and will be removed in a future release. |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| ### Jetty diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java index a5ce31cb5f98..51f564005555 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -67,9 +67,12 @@ public static class AggregateStats { private long mergeBufferQueries = 0; private long mergeBufferAcquisitionTimeNs = 0; + private long maxMergeBufferAcquisitionTimeNs = 0; private long spilledQueries = 0; private long spilledBytes = 0; + private long maxSpilledBytes = 0; private long mergeDictionarySize = 0; + private long maxMergeDictionarySize = 0; public AggregateStats() { @@ -78,16 +81,22 @@ public AggregateStats() public AggregateStats( long mergeBufferQueries, long mergeBufferAcquisitionTimeNs, + long maxMergeBufferAcquisitionTimeNs, long spilledQueries, long spilledBytes, - long mergeDictionarySize + long maxSpilledBytes, + long mergeDictionarySize, + long maxMergeDictionarySize ) { this.mergeBufferQueries = mergeBufferQueries; this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs; + this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs; this.spilledQueries = spilledQueries; this.spilledBytes = spilledBytes; + this.maxSpilledBytes = maxSpilledBytes; this.mergeDictionarySize = mergeDictionarySize; + this.maxMergeDictionarySize = maxMergeDictionarySize; } public long getMergeBufferQueries() @@ -100,6 +109,11 @@ public long getMergeBufferAcquisitionTimeNs() return mergeBufferAcquisitionTimeNs; } + public long getMaxMergeBufferAcquisitionTimeNs() + { + return maxMergeBufferAcquisitionTimeNs; + } + public long getSpilledQueries() { return spilledQueries; @@ -110,24 +124,40 @@ public long getSpilledBytes() return spilledBytes; } + public long getMaxSpilledBytes() + { + return maxSpilledBytes; + } + public long getMergeDictionarySize() { return mergeDictionarySize; } + public long getMaxMergeDictionarySize() + { + return maxMergeDictionarySize; + } + public void addQueryStats(PerQueryStats perQueryStats) { if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) { mergeBufferQueries++; mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs(); + maxMergeBufferAcquisitionTimeNs = Math.max( + maxMergeBufferAcquisitionTimeNs, + perQueryStats.getMergeBufferAcquisitionTimeNs() + ); } if (perQueryStats.getSpilledBytes() > 0) { spilledQueries++; spilledBytes += perQueryStats.getSpilledBytes(); + maxSpilledBytes = Math.max(maxSpilledBytes, perQueryStats.getSpilledBytes()); } mergeDictionarySize += perQueryStats.getMergeDictionarySize(); + maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize()); } public AggregateStats reset() @@ -136,16 +166,22 @@ public AggregateStats reset() new AggregateStats( mergeBufferQueries, mergeBufferAcquisitionTimeNs, + maxMergeBufferAcquisitionTimeNs, spilledQueries, spilledBytes, - mergeDictionarySize + maxSpilledBytes, + mergeDictionarySize, + maxMergeDictionarySize ); this.mergeBufferQueries = 0; this.mergeBufferAcquisitionTimeNs = 0; + this.maxMergeBufferAcquisitionTimeNs = 0; this.spilledQueries = 0; this.spilledBytes = 0; + this.maxSpilledBytes = 0; this.mergeDictionarySize = 0; + this.maxMergeDictionarySize = 0; return aggregateStats; } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java index 565a5ab97bc3..4a6cfb83d102 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java @@ -53,9 +53,12 @@ public void testMetricCollection() GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince(); Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(0L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); Assert.assertEquals(0L, aggregateStats.getSpilledQueries()); Assert.assertEquals(0L, aggregateStats.getSpilledBytes()); + Assert.assertEquals(0L, aggregateStats.getMaxSpilledBytes()); Assert.assertEquals(0L, aggregateStats.getMergeDictionarySize()); + Assert.assertEquals(0L, aggregateStats.getMaxMergeDictionarySize()); statsProvider.closeQuery(id1); statsProvider.closeQuery(id2); @@ -63,8 +66,59 @@ public void testMetricCollection() aggregateStats = statsProvider.getStatsSince(); Assert.assertEquals(2, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(1800L, aggregateStats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(1100L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); Assert.assertEquals(2L, aggregateStats.getSpilledQueries()); Assert.assertEquals(1600L, aggregateStats.getSpilledBytes()); + Assert.assertEquals(1000L, aggregateStats.getMaxSpilledBytes()); Assert.assertEquals(1000L, aggregateStats.getMergeDictionarySize()); + Assert.assertEquals(700L, aggregateStats.getMaxMergeDictionarySize()); + } + + + @Test + public void testMetricsWithMultipleQueries() + { + GroupByStatsProvider statsProvider = new GroupByStatsProvider(); + + QueryResourceId r1 = new QueryResourceId("r1"); + GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); + stats1.mergeBufferAcquisitionTime(2000); + stats1.spilledBytes(100); + stats1.dictionarySize(200); + + QueryResourceId r2 = new QueryResourceId("r2"); + GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2); + stats2.mergeBufferAcquisitionTime(100); + stats2.spilledBytes(150); + stats2.dictionarySize(250); + + QueryResourceId r3 = new QueryResourceId("r3"); + GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3); + stats3.mergeBufferAcquisitionTime(200); + stats3.spilledBytes(3000); + stats3.dictionarySize(300); + + QueryResourceId r4 = new QueryResourceId("r4"); + GroupByStatsProvider.PerQueryStats stats4 = statsProvider.getPerQueryStatsContainer(r4); + stats4.mergeBufferAcquisitionTime(300); + stats4.spilledBytes(200); + stats4.dictionarySize(1500); + + statsProvider.closeQuery(r1); + statsProvider.closeQuery(r2); + statsProvider.closeQuery(r3); + statsProvider.closeQuery(r4); + + GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince(); + + Assert.assertEquals(2000L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(3000L, aggregateStats.getMaxSpilledBytes()); + Assert.assertEquals(1500L, aggregateStats.getMaxMergeDictionarySize()); + + Assert.assertEquals(4L, aggregateStats.getMergeBufferQueries()); + Assert.assertEquals(2600L, aggregateStats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(4L, aggregateStats.getSpilledQueries()); + Assert.assertEquals(3450L, aggregateStats.getSpilledBytes()); + Assert.assertEquals(2250L, aggregateStats.getMergeDictionarySize()); } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index 10985b4b4d3a..ecb702cc70da 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -69,15 +69,21 @@ public boolean doMonitor(ServiceEmitter emitter) "mergeBuffer/acquisitionTimeNs", statsContainer.getMergeBufferAcquisitionTimeNs() )); + emitter.emit(builder.setMetric( + "mergeBuffer/maxAcquisitionTimeNs", + statsContainer.getMaxMergeBufferAcquisitionTimeNs() + )); } if (statsContainer.getSpilledQueries() > 0) { emitter.emit(builder.setMetric("groupBy/spilledQueries", statsContainer.getSpilledQueries())); emitter.emit(builder.setMetric("groupBy/spilledBytes", statsContainer.getSpilledBytes())); + emitter.emit(builder.setMetric("groupBy/maxSpilledBytes", statsContainer.getMaxSpilledBytes())); } if (statsContainer.getMergeDictionarySize() > 0) { emitter.emit(builder.setMetric("groupBy/mergeDictionarySize", statsContainer.getMergeDictionarySize())); + emitter.emit(builder.setMetric("groupBy/maxMergeDictionarySize", statsContainer.getMaxMergeDictionarySize())); } return true; diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index 94da5889d39c..8c9d4dc4c814 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.QueryResourceId; import org.apache.druid.query.groupby.GroupByStatsProvider; import org.junit.After; import org.junit.Assert; @@ -59,8 +60,11 @@ public synchronized AggregateStats getStatsSince() return new AggregateStats( 1L, 100L, + 100L, 2L, 200L, + 200L, + 300L, 300L ); } @@ -88,14 +92,17 @@ public void testMonitor() // Trigger metric emission monitor.doMonitor(emitter); - Assert.assertEquals(7, emitter.getNumEmittedEvents()); + Assert.assertEquals(10, emitter.getNumEmittedEvents()); emitter.verifyValue("mergeBuffer/pendingRequests", 0L); emitter.verifyValue("mergeBuffer/used", 0L); emitter.verifyValue("mergeBuffer/queries", 1L); emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L); + emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 100L); emitter.verifyValue("groupBy/spilledQueries", 2L); emitter.verifyValue("groupBy/spilledBytes", 200L); + emitter.verifyValue("groupBy/maxSpilledBytes", 200L); emitter.verifyValue("groupBy/mergeDictionarySize", 300L); + emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L); } @Test @@ -128,9 +135,12 @@ public void testMonitorWithServiceDimensions() verifyMetricValue(emitter, "mergeBuffer/used", dimFilters, 0L); verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L); verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L); + verifyMetricValue(emitter, "mergeBuffer/maxAcquisitionTimeNs", dimFilters, 100L); verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L); verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L); + verifyMetricValue(emitter, "groupBy/maxSpilledBytes", dimFilters, 200L); verifyMetricValue(emitter, "groupBy/mergeDictionarySize", dimFilters, 300L); + verifyMetricValue(emitter, "groupBy/maxMergeDictionarySize", dimFilters, 300L); } @Test @@ -185,6 +195,50 @@ public void testMonitoringMergeBuffer_pendingRequests() } } + @Test + public void testMonitoringWithMultipleResources() + { + GroupByStatsProvider statsProvider = new GroupByStatsProvider(); + + QueryResourceId r1 = new QueryResourceId("r1"); + GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); + stats1.mergeBufferAcquisitionTime(100); + stats1.spilledBytes(200); + stats1.dictionarySize(100); + + QueryResourceId r2 = new QueryResourceId("r2"); + GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2); + stats2.mergeBufferAcquisitionTime(500); + stats2.spilledBytes(100); + stats2.dictionarySize(300); + + QueryResourceId r3 = new QueryResourceId("r3"); + GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3); + stats3.mergeBufferAcquisitionTime(200); + stats3.spilledBytes(800); + stats3.dictionarySize(200); + + // Close all queries to aggregate stats (mimics GroupByMergingQueryRunner behavior) + statsProvider.closeQuery(r1); + statsProvider.closeQuery(r2); + statsProvider.closeQuery(r3); + + final GroupByStatsMonitor monitor = new GroupByStatsMonitor(statsProvider, mergeBufferPool); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + emitter.start(); + monitor.doMonitor(emitter); + + emitter.verifyValue("mergeBuffer/queries", 3L); + emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 800L); + emitter.verifyValue("groupBy/spilledQueries", 3L); + emitter.verifyValue("groupBy/spilledBytes", 1100L); + emitter.verifyValue("groupBy/mergeDictionarySize", 600L); + + emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 500L); + emitter.verifyValue("groupBy/maxSpilledBytes", 800L); + emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L); + } + private void verifyMetricValue(StubServiceEmitter emitter, String metricName, Map dimFilters, Number expectedValue) { final List observedMetricEvents = emitter.getMetricEvents(metricName);