Emit disk spill and merge buffer utilisation metrics for GroupBy queries#17360
Emit disk spill and merge buffer utilisation metrics for GroupBy queries#17360LakshSingla merged 18 commits intoapache:masterfrom
Conversation
| temporaryStorages.add(temporaryStorage); | ||
| } | ||
|
|
||
| public long getSpilledBytes() |
There was a problem hiding this comment.
What if the registered "limited temporary storage" is not closed between two invocations of the getSpilledBytes(). Would the code double-count the stored bytes? Perhaps that is the intended behavior. If that's the case, we should figure out the intent behind the metric, and take a call on whether we want to do that or not.
There was a problem hiding this comment.
Regardless of this, I think that there should be metric (either this one after repurposing or a new one) which indicates the total size of the spilled data, per query. This will allow the admins to estimate whether the queries need a larger merge buffer and by how much. WDYT?
There was a problem hiding this comment.
Would the code double-count the stored bytes? Perhaps that is the intended behavior.
Yes, that is the intention. It basically is reporting the amount of bytes spilled when the metric is emitted.
Regardless of this, I think that there should be metric (either this one after repurposing or a new one) which indicates the total size of the spilled data, per query. This will allow the admins to estimate whether the queries need a larger merge buffer and by how much.
Makes sense, but this would be reported at the end of every query?
There was a problem hiding this comment.
Yes, the query can update the monitor at the end
|
|
||
| private final ConcurrentLinkedQueue<LimitedTemporaryStorage> temporaryStorages; | ||
|
|
||
| @Inject |
There was a problem hiding this comment.
nit: We shouldn't need this
| private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0); | ||
| private final AtomicLong resourceAcquisitionCount = new AtomicLong(0); |
There was a problem hiding this comment.
| private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0); | |
| private final AtomicLong resourceAcquisitionCount = new AtomicLong(0); | |
| private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0); | |
| private final AtomicLong mergeBufferAcquisitionCount = new AtomicLong(0); |
There was a problem hiding this comment.
Making them atomic seems redundant given that both need to be updated together, and guaranteeing atomicity over a single variable is not useful. The current code does not rely on the AtomicLong part of the variables since they are already guarded by the provider's lock.
| resourceAcquisitionCount.incrementAndGet(); | ||
| } | ||
|
|
||
| public synchronized long getAndResetGroupByResourceAcquisitionStats() |
There was a problem hiding this comment.
This doesn't seem correct. We are reporting the average as a metric. A better way would be to report the sum, as well as the count. This will allow a better weighted-average than directly reporting the average from the monitor. While we are amortizing a lot of metrics by emitting it from the monitor, I think it's better to report count and sum separately.
There was a problem hiding this comment.
I couldn't find a way wherein I could report the sum and count together (so that any metric aggregation platform could automatically compute the averages).
Are you suggesting to report two different metrics groupBy/acquisitionTime & groupBy/acquisitionCount?
| |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.| | ||
| |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.| | ||
| |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| | ||
| |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports stats for group by queries.| |
There was a problem hiding this comment.
| |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports stats for group by queries.| | |
| |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports statistics for the group by queries.| |
There was a problem hiding this comment.
Should it be made as a separate monitor? I don't see any downside of merging the QueryCountStatsMonitor with the group by statistics, but at the same time, I don't think there's any benefit to it.
However, merging group-by statistics with "query count stats" feels incongruent to me.
There was a problem hiding this comment.
I don't have strong opinions either, the only reason I merged them was an existing mergeBuffer metric in this monitor. I think a separate monitor would be better due to the added flexibility.
| /** | ||
| * @return number of used buffers from the pool | ||
| */ | ||
| long getUsedBufferCount(); |
There was a problem hiding this comment.
BlockingPool is generic. It shouldn't reference buffers.
| long getUsedBufferCount(); | |
| long getUsedResourcesCount(); |
| } | ||
| finally { | ||
| pendingRequests.incrementAndGet(); | ||
| pendingRequests.decrementAndGet(); |
| return average; | ||
| } | ||
|
|
||
| public void registerTemporaryStorage(LimitedTemporaryStorage temporaryStorage) |
There was a problem hiding this comment.
How often does this method need to be called per group by query, and what does this rely upon? I feel that there's an inbuilt expense with tracking the spilled bytes this way. I think it is still fine, but I wonder if a more efficient way would be as follows:
- Have a running counter of the size occupied
- LimitedTemporaryStorage implements closeable. Modify wherever the LimitedTemporaryStorage is getting closed to subtract the value of the storage from the counter. There can be a set inclusion check where a hashset of the opened temporary storages being tracked by the running counter are stored, and the code confirms that the removed temporary storage was indeed being tracked by the counter.
Note: My approach seems more convoluted, so if there isn't much performance downside of the current version, I think its fine as is.
There was a problem hiding this comment.
there's an inbuilt expense with tracking the spilled bytes this way
What is the expense you are referring to? The temporary storage object is added to a queue and cost of adding an element should be in most cases should be O(1).
LimitedTemporaryStorage implements closeable. Modify wherever the LimitedTemporaryStorage is getting closed to subtract the value of the storage from the counter. There can be a set inclusion check where a hashset of the opened temporary storages being tracked by the running counter are stored, and the code confirms that the removed temporary storage was indeed being tracked by the counter.
This approach also requires keeping track of the temporary storages thus incurring similar overhead as the existing implementation. Additionally, it requires incrementing the "running counter" whenever bytes are written out to the disk.
There was a problem hiding this comment.
What is the expense you are referring to
Iterating while calculating the stored bytes.
keeping track of the temporary storages
That is true, but calculating the stored bytes won't require an iteration over the stored memories.
There was a problem hiding this comment.
Iterating while calculating the stored bytes.
I understand. Given that the monitor will run at a fixed interval (every minute or so), is the overhead significant?
I am weighing the implementation complexity against the performance impact.
| |`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.| | ||
| |`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by Middle Manager node types.| | ||
| |`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat for the service.| | ||
| |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report statistics for groupBy queries like disk spill, merge buffer usage. | |
There was a problem hiding this comment.
nit : disk spill can be worded better.
| |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report statistics for groupBy queries like disk spill, merge buffer usage. | | |
| |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Reports metrics for groupBy queries like disk and merge buffer utilized by them. | |
| |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | | ||
| |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | | ||
| |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | | ||
| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be 0.| |
There was a problem hiding this comment.
Recommended value doesn't seem correct. It is fine if the queries are waiting on the merge buffer even during normal operation. Can mention, ideally 0, but it is fine if not (after rewording).
| |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | | ||
| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be 0.| | ||
| |`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| | ||
| |`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge buffers.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |
There was a problem hiding this comment.
Number of times groupBy queries acquired merge buffers.|
Is it cumulative or per query? A query should acquire merge buffer only once. I think if we reword the metric name or the documentation, it will be clear to the users.
There was a problem hiding this comment.
It is cumulative, I have renamed it to mergeBuffer/queries.
| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be 0.| | ||
| |`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| | ||
| |`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge buffers.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| | ||
| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be as low as possible.| |
There was a problem hiding this comment.
| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be as low as possible.| | |
| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies.| |
| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be 0.| | ||
| |`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| | ||
| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be as low as possible.| | ||
| |`mergeBuffer/acquisitionCount`|Number of times groupBy queries acquired merge buffers.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| | ||
| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| | ||
| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| | ||
| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |
There was a problem hiding this comment.
I wonder if there's a way to reference all these in a single place instead. It seems wasteful that two copies of the same documentation are maintained. LMK if there are any prior metrics that can do that.
There was a problem hiding this comment.
Moved them to a new section.
| } | ||
| } | ||
|
|
||
| private void verifyMetrics(long queries, boolean skipMergeDictionary) |
There was a problem hiding this comment.
What is skipMergeDictionary and when is it required?
There was a problem hiding this comment.
Renamed, this is basically to skip verifying the merge dictionary size metric, since some of the queries aggregate on non-string dimensions.
| if (config.equals(V2_SMALL_BUFFER_CONFIG)) { | ||
| verifyMetrics(1, true); | ||
| } |
There was a problem hiding this comment.
Why are the assertions being done on a single config?
There was a problem hiding this comment.
This is the most suited config to verify all the metrics. For example, in config with larger buffer size it is not possible to verify disk spill metrics. I am moved this check into the method and renamed the method.
| return aggregateStatsContainer.reset(); | ||
| } | ||
|
|
||
| public static class AggregateStats |
There was a problem hiding this comment.
Why are some stats AggregateStats, and some PerQuery? Wouldn't the monitor aggregate them all?
There was a problem hiding this comment.
PerQueryStats is created per query and is closed at the end of the query, AggregateStats is "reset" after each monitor cycle and all of the PerQueryStats are accumulated into AggregateStats when the query is closed.
Monitor simply reports the aggregate metrics.
| /** | ||
| * Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size. | ||
| */ | ||
| @LazySingleton |
There was a problem hiding this comment.
Should it be registered in a Module if this is a LazySingleton? (I am not very familiar, just curious if this is working as expected)
There was a problem hiding this comment.
Yes, it works as expected. Guice should automatically detect classes marked with this annotation and bind them in singleton scope.
| public synchronized void closeQuery(QueryResourceId resourceId) | ||
| { | ||
| if (resourceId == null || !perQueryStats.containsKey(resourceId)) { | ||
| return; | ||
| } | ||
| PerQueryStats container = perQueryStats.remove(resourceId); | ||
| aggregateStatsContainer.addQueryStats(container); | ||
| } |
There was a problem hiding this comment.
This is not intuitive. The callers won't remember calling closeQuery(). Is it possible to mold it as a Closer/Closable? PerQueryStats can extend Closeable, and the close can do that.
It is still not good since it doesn't seem cool to close "Stats" in first place. Perhaps renaming PerQueryStats to PerQueryStatsContainer would also help.
There was a problem hiding this comment.
This is not intuitive. The callers won't remember calling closeQuery()
Even if it implements closable, it would require to be explicitly registered with the query closer?
I am doing the same thing in GroupByQueryQueryToolChest
closer.register(() -> groupByStatsProvider.closeQuery(query.context().getQueryResourceId()));
LMK what do you think?
There was a problem hiding this comment.
it would require to be explicitly registered with the query closer
Yes. Implementing the interface makes it easier for the developer to call the closer. I have had experience in the past where an implicit contract was not apparent which led to resource leak. Not a blocker.
| emitter.emit(builder.setMetric("mergeBuffer/usedCount", mergeBufferPool.getUsedResourcesCount())); | ||
|
|
||
| GroupByStatsProvider.AggregateStats statsContainer = groupByStatsProvider.getStatsSince(); | ||
|
|
||
| if (statsContainer.getMergeBufferAcquisitionCount() > 0) { | ||
| emitter.emit(builder.setMetric("mergeBuffer/acquisitionCount", statsContainer.getMergeBufferAcquisitionCount())); |
There was a problem hiding this comment.
usedCount and acquisitionCount are difficult to disambiguate. Can we rename them?
There was a problem hiding this comment.
Renamed mergeBuffer/usedCount -> mergeBuffer/used, mergeBuffer/acquistionCount -> mergeBuffer/queries.
LMK if this is easier to understand?
|
Waiting on a minor change. Will merge this post. |
|
The minor change was to use query id instead of resource reservation id in the logic, but that is not a user facing thing and can be picked up in subsequent patches. I have merged this. |
|
|
||
| emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", mergeBufferPool.getPendingRequests())); | ||
|
|
||
| emitter.emit(builder.setMetric("mergeBuffer/used", mergeBufferPool.getUsedResourcesCount())); |
There was a problem hiding this comment.
@LakshSingla @findingrish Is there a reason why this metric isn’t included in the per-query stats? I think it would be helpful to report it for each query individually.
Right now, the emission of this metric depends on the emission interval. For example, if the interval is 1 minute and a query runs in the middle of that cycle—using and then freeing the merge buffers before the minute ends—we won’t see that the buffers were ever used. So, we could miss important usage information simply because the timing doesn’t align with the metric emission.
Please let me know if I’ve misunderstood how this works.
There was a problem hiding this comment.
I enabled the metric on a test cluster, and I found that, despite running many GroupBy queries,mergeBuffer/used always equals 0
There was a problem hiding this comment.
This metric is meant to be a point-in-time measurement, designed to help diagnose situations where merge buffer usage is consistently very high, and causes the pool to be continuously exhausted.
There is another metric mergeBuffer/queries that is the number of queries made that used merge buffers. This one might be more what you want, if you're wanting to see per query metrics.
There was a problem hiding this comment.
@gianm mergeBuffer/queries defines how many queries acquired some merge buffers. But we can't get exact numbers, some queries acquire 1 merge buffer, and others acquire 2 merge buffers.
There was a problem hiding this comment.
Yeah, that's a good point. IMO it would make sense to add a new metric like mergeBuffer/count which is the total number of merge buffers allocated. Similar to mergeBuffer/queries, but could increment by more than 1 per query.
This change is to emit following metrics as part of
GroupByStatsMonitormonitor,mergeBuffer/used-> Number of merge buffers used.mergeBuffer/acquisitionTimeNs-> Total time required to acquire merge buffer.mergeBuffer/acquisition-> Number of queries that acquired a batch of merge buffers.groupBy/spilledQueries-> Number of queries that spilled onto the disk.groupBy/spilledBytes-> Spilled bytes on the disk.groupBy/mergeDictionarySize-> Size of the merging dictionary.Testing
GroupByStatsMonitoris reporting the new metrics.GroupByStatsProvider.Release Notes
Going forward
GroupByStatsMonitormonitor should be used for emittingmergeBuffer/pendingRequestsmetric instead ofQueryCountStatsMonitor. In a subsequent release this metric will not be emitted fromQueryCountStatsMonitor.