Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Historical

Expand All @@ -113,9 +116,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Real-time

Expand All @@ -140,9 +146,12 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ public static class AggregateStats
{
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long maxMergeBufferAcquisitionTimeNs = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
private long mergeDictionarySize = 0;
private long maxMergeDictionarySize = 0;

public AggregateStats()
{
Expand All @@ -78,16 +81,22 @@ public AggregateStats()
public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long maxMergeBufferAcquisitionTimeNs,
long spilledQueries,
long spilledBytes,
long mergeDictionarySize
long maxSpilledBytes,
long mergeDictionarySize,
long maxMergeDictionarySize
)
{
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
this.mergeDictionarySize = mergeDictionarySize;
this.maxMergeDictionarySize = maxMergeDictionarySize;
}

public long getMergeBufferQueries()
Expand All @@ -100,6 +109,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs;
}

public long getMaxMergeBufferAcquisitionTimeNs()
{
return maxMergeBufferAcquisitionTimeNs;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand All @@ -110,24 +124,40 @@ public long getSpilledBytes()
return spilledBytes;
}

public long getMaxSpilledBytes()
{
return maxSpilledBytes;
}

public long getMergeDictionarySize()
{
return mergeDictionarySize;
}

public long getMaxMergeDictionarySize()
{
return maxMergeDictionarySize;
}

public void addQueryStats(PerQueryStats perQueryStats)
{
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
mergeBufferQueries++;
mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs();
maxMergeBufferAcquisitionTimeNs = Math.max(
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
}

if (perQueryStats.getSpilledBytes() > 0) {
spilledQueries++;
spilledBytes += perQueryStats.getSpilledBytes();
maxSpilledBytes = Math.max(maxSpilledBytes, perQueryStats.getSpilledBytes());
}

mergeDictionarySize += perQueryStats.getMergeDictionarySize();
maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize());
}

public AggregateStats reset()
Expand All @@ -136,16 +166,22 @@ public AggregateStats reset()
new AggregateStats(
mergeBufferQueries,
mergeBufferAcquisitionTimeNs,
maxMergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
mergeDictionarySize
maxSpilledBytes,
mergeDictionarySize,
maxMergeDictionarySize
);

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.maxMergeBufferAcquisitionTimeNs = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
this.mergeDictionarySize = 0;
this.maxMergeDictionarySize = 0;

return aggregateStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,72 @@ public void testMetricCollection()
GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince();
Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries());
Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs());
Assert.assertEquals(0L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
Assert.assertEquals(0L, aggregateStats.getSpilledQueries());
Assert.assertEquals(0L, aggregateStats.getSpilledBytes());
Assert.assertEquals(0L, aggregateStats.getMaxSpilledBytes());
Assert.assertEquals(0L, aggregateStats.getMergeDictionarySize());
Assert.assertEquals(0L, aggregateStats.getMaxMergeDictionarySize());

statsProvider.closeQuery(id1);
statsProvider.closeQuery(id2);

aggregateStats = statsProvider.getStatsSince();
Assert.assertEquals(2, aggregateStats.getMergeBufferQueries());
Assert.assertEquals(1800L, aggregateStats.getMergeBufferAcquisitionTimeNs());
Assert.assertEquals(1100L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
Assert.assertEquals(2L, aggregateStats.getSpilledQueries());
Assert.assertEquals(1600L, aggregateStats.getSpilledBytes());
Assert.assertEquals(1000L, aggregateStats.getMaxSpilledBytes());
Assert.assertEquals(1000L, aggregateStats.getMergeDictionarySize());
Assert.assertEquals(700L, aggregateStats.getMaxMergeDictionarySize());
}


@Test
public void testMetricsWithMultipleQueries()
{
GroupByStatsProvider statsProvider = new GroupByStatsProvider();

QueryResourceId r1 = new QueryResourceId("r1");
GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1);
stats1.mergeBufferAcquisitionTime(2000);
stats1.spilledBytes(100);
stats1.dictionarySize(200);

QueryResourceId r2 = new QueryResourceId("r2");
GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2);
stats2.mergeBufferAcquisitionTime(100);
stats2.spilledBytes(150);
stats2.dictionarySize(250);

QueryResourceId r3 = new QueryResourceId("r3");
GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3);
stats3.mergeBufferAcquisitionTime(200);
stats3.spilledBytes(3000);
stats3.dictionarySize(300);

QueryResourceId r4 = new QueryResourceId("r4");
GroupByStatsProvider.PerQueryStats stats4 = statsProvider.getPerQueryStatsContainer(r4);
stats4.mergeBufferAcquisitionTime(300);
stats4.spilledBytes(200);
stats4.dictionarySize(1500);

statsProvider.closeQuery(r1);
statsProvider.closeQuery(r2);
statsProvider.closeQuery(r3);
statsProvider.closeQuery(r4);

GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince();

Assert.assertEquals(2000L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs());
Assert.assertEquals(3000L, aggregateStats.getMaxSpilledBytes());
Assert.assertEquals(1500L, aggregateStats.getMaxMergeDictionarySize());

Assert.assertEquals(4L, aggregateStats.getMergeBufferQueries());
Assert.assertEquals(2600L, aggregateStats.getMergeBufferAcquisitionTimeNs());
Assert.assertEquals(4L, aggregateStats.getSpilledQueries());
Assert.assertEquals(3450L, aggregateStats.getSpilledBytes());
Assert.assertEquals(2250L, aggregateStats.getMergeDictionarySize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,21 @@ public boolean doMonitor(ServiceEmitter emitter)
"mergeBuffer/acquisitionTimeNs",
statsContainer.getMergeBufferAcquisitionTimeNs()
));
emitter.emit(builder.setMetric(
"mergeBuffer/maxAcquisitionTimeNs",
statsContainer.getMaxMergeBufferAcquisitionTimeNs()
));
}

if (statsContainer.getSpilledQueries() > 0) {
emitter.emit(builder.setMetric("groupBy/spilledQueries", statsContainer.getSpilledQueries()));
emitter.emit(builder.setMetric("groupBy/spilledBytes", statsContainer.getSpilledBytes()));
emitter.emit(builder.setMetric("groupBy/maxSpilledBytes", statsContainer.getMaxSpilledBytes()));
}

if (statsContainer.getMergeDictionarySize() > 0) {
emitter.emit(builder.setMetric("groupBy/mergeDictionarySize", statsContainer.getMergeDictionarySize()));
emitter.emit(builder.setMetric("groupBy/maxMergeDictionarySize", statsContainer.getMaxMergeDictionarySize()));
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryResourceId;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -59,8 +60,11 @@ public synchronized AggregateStats getStatsSince()
return new AggregateStats(
1L,
100L,
100L,
2L,
200L,
200L,
300L,
300L
);
}
Expand Down Expand Up @@ -88,14 +92,17 @@ public void testMonitor()
// Trigger metric emission
monitor.doMonitor(emitter);

Assert.assertEquals(7, emitter.getNumEmittedEvents());
Assert.assertEquals(10, emitter.getNumEmittedEvents());
emitter.verifyValue("mergeBuffer/pendingRequests", 0L);
emitter.verifyValue("mergeBuffer/used", 0L);
emitter.verifyValue("mergeBuffer/queries", 1L);
emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L);
emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 100L);
emitter.verifyValue("groupBy/spilledQueries", 2L);
emitter.verifyValue("groupBy/spilledBytes", 200L);
emitter.verifyValue("groupBy/maxSpilledBytes", 200L);
emitter.verifyValue("groupBy/mergeDictionarySize", 300L);
emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L);
}

@Test
Expand Down Expand Up @@ -128,9 +135,12 @@ public void testMonitorWithServiceDimensions()
verifyMetricValue(emitter, "mergeBuffer/used", dimFilters, 0L);
verifyMetricValue(emitter, "mergeBuffer/queries", dimFilters, 1L);
verifyMetricValue(emitter, "mergeBuffer/acquisitionTimeNs", dimFilters, 100L);
verifyMetricValue(emitter, "mergeBuffer/maxAcquisitionTimeNs", dimFilters, 100L);
verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L);
verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L);
verifyMetricValue(emitter, "groupBy/maxSpilledBytes", dimFilters, 200L);
verifyMetricValue(emitter, "groupBy/mergeDictionarySize", dimFilters, 300L);
verifyMetricValue(emitter, "groupBy/maxMergeDictionarySize", dimFilters, 300L);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we perhaps also add testMonitoringWithMultipleResources() and testMetricsWithMultipleQueries() in this change itself with the appropriate max metric validations?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tests!

}

@Test
Expand Down Expand Up @@ -185,6 +195,50 @@ public void testMonitoringMergeBuffer_pendingRequests()
}
}

@Test
public void testMonitoringWithMultipleResources()
{
GroupByStatsProvider statsProvider = new GroupByStatsProvider();

QueryResourceId r1 = new QueryResourceId("r1");
GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1);
stats1.mergeBufferAcquisitionTime(100);
stats1.spilledBytes(200);
stats1.dictionarySize(100);

QueryResourceId r2 = new QueryResourceId("r2");
GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2);
stats2.mergeBufferAcquisitionTime(500);
stats2.spilledBytes(100);
stats2.dictionarySize(300);

QueryResourceId r3 = new QueryResourceId("r3");
GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3);
stats3.mergeBufferAcquisitionTime(200);
stats3.spilledBytes(800);
stats3.dictionarySize(200);

// Close all queries to aggregate stats (mimics GroupByMergingQueryRunner behavior)
statsProvider.closeQuery(r1);
statsProvider.closeQuery(r2);
statsProvider.closeQuery(r3);

final GroupByStatsMonitor monitor = new GroupByStatsMonitor(statsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
emitter.start();
monitor.doMonitor(emitter);

emitter.verifyValue("mergeBuffer/queries", 3L);
emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 800L);
emitter.verifyValue("groupBy/spilledQueries", 3L);
emitter.verifyValue("groupBy/spilledBytes", 1100L);
emitter.verifyValue("groupBy/mergeDictionarySize", 600L);

emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 500L);
emitter.verifyValue("groupBy/maxSpilledBytes", 800L);
emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L);
}

private void verifyMetricValue(StubServiceEmitter emitter, String metricName, Map<String, Object> dimFilters, Number expectedValue)
{
final List<ServiceMetricEvent> observedMetricEvents = emitter.getMetricEvents(metricName);
Expand Down
Loading