[Groupby Query Metrics] Add merge buffer tracking#18731
[Groupby Query Metrics] Add merge buffer tracking#18731GWphua merged 35 commits intoapache:masterfrom
Conversation
| if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) { | ||
| mergeBufferQueries++; | ||
| mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs(); | ||
| mergeBufferTotalUsage += perQueryStats.getMergeBufferTotalUsage(); |
There was a problem hiding this comment.
@GWphua Instead of summing here, what do you think about taking the max? Then the metric emitted would be the max merge buffer usage of a single query in that emission period. This would be a good signal for operators on whether they need to tweak the mergeBuffer size.
There was a problem hiding this comment.
I was thinking about this.
There are other metrics where taking MAX will also make sense --
spilledBytes --> How much storage would be good to configure?
dictionarySize --> How large can the merge dictionary size get?
I am considering adding another metric (maxSpilledBytes, maxDictionarySize, maxSpilledBytes). What do you think?
There was a problem hiding this comment.
Yeah agreed. I do think it makes sense to have it for those 3 metrics
Even for mergeBuffer/acquisitionTimeNs I think there's value in having the max, as it gives operators a signal on whether to increase numMergeBuffers
|
Thanks for adding those max metrics @GWphua! What do you think about adding |
|
Hi @aho135 Thanks for the review! I also find that it will be very helpful to emit metrics for each query, so we know which query will take up alot of resources. In our version of Druid, we simply appended each of the Alternatively, we can look into migrating the groupBy query metrics in We can do more of this in a seperate PR. |
Sounds good @GWphua I was thinking on very similar lines to emit these from I have a first draft on this: aho135@9f82091 |
|
Hi @aho135, since the scope of adding
I have a draft for |
|
Hi @gianm, would appreciate it if I receive a review/feedback on this PR. Thanks! |
abhishekrb19
left a comment
There was a problem hiding this comment.
@GWphua, thanks for the improved observability and @aho135 for the helpful max metric suggestions! Overall, the changes look good to me - I will take a closer look at the grouper changes soon.
Checkpointing my review on the GroupByStatsProvider, docs and some test suggestions. Please let me know what you think.
|
@abhishekrb19, thanks for the review! I have made changes according to your suggestions. |
There was a problem hiding this comment.
Thanks @GWphua! I tried to take a closer look at the grouper changes - please see my latest comments. To track actual merge buffer usage the way #17902
proposes, we may need some additional thought: using buffer.capacity() in the various places is going to reflect the configured buffer size, not the actual bytes used.
My suggestion would be to split all the new max metrics into a separate PR: mergeBuffer/maxAcquisitionTimeNs, groupBy/maxSpilledBytes, groupBy/maxMergeDictionarySize. Those look more straightforward and would still be useful for operators to track.
Then we can keep this PR and the linked issue open to track mergeBuffer/bytesUsed and mergeBuffer/maxBytesUsed. We'll also want to think about and add some test coverage for the grouper changes once the approach is finalized.
| throw new ISE("Grouper is closed"); | ||
| } | ||
|
|
||
| groupers.forEach(Grouper::reset); |
There was a problem hiding this comment.
Is this change required? Given that the ConcurrentGrouper operates on SpillingGrouper instance, I suppose this is technically correct. But calling Grouper::reset as it was earlier should already ensure that the specific reset/close methods from SpillingGrouper are invoked? If so, could we revert this change here and below?
There was a problem hiding this comment.
You are correct, only SpillingGrouper's method is called here.
I changed this to make my life easier during development:
- IntelliJ can jump to the SpillingGrouper method, instead of going to the Grouper interface.
- Future readers will be able to tell that the groupers object held by
ConcurrentGrouperwill beSpillingGroupers.
If the purpose is to keep the changes limited to this PR only connected to the Groupby metrics, I will be opening a new PR. Let me know if you would prefer that.
There was a problem hiding this comment.
Got it. It’s generally preferable not to mix refactoring and functional changes together, but this one is relatively straightforward, so it’s okay with me.
| ); | ||
|
|
||
| private final Grouper<KeyType> grouper; | ||
| private final AbstractBufferHashGrouper<KeyType> grouper; |
There was a problem hiding this comment.
Same comment as above. I think it would be better to define these methods in the Grouper interface and leave this change as it was earlier. Mixing Grouper with impls otherwise seems confusing
There was a problem hiding this comment.
For this, I will need to maintain the AbstractBufferHashGrouper. The reason will also address why getMergeBufferUsedBytes is not placed in the Groupers interface.
This is because the groupby metrics are only collated when Groupers#close is called. This means that we will not be interfacing with Groupers#getMergeBufferUsedBytes at all, but letting Groupers#close retrieve getMergeBufferUsedBytes. This is why SpillingGrouper#getMergeBufferUsedBytes is private, following the example set by SpillingGrouper#getDictionarySizeEstimate.
The only time we need to interface with getMergeBufferUsedBytes is in AbstractBufferHashGrouper, which are the underlying groupers used by the SpillingGrouper.
| } | ||
| buffer.putInt(numElements * Integer.BYTES, val); | ||
| numElements++; | ||
| maxMergeBufferUsageBytes = Math.max(maxMergeBufferUsageBytes, numElements * Integer.BYTES); |
There was a problem hiding this comment.
Actually I think this variable and state tracking isn't needed in add() since we're tracking numElements already. We can just do it inline numElements * Integer.BYTES from getMaxMergeBufferUsageBytes()
There was a problem hiding this comment.
I was thinking to get the maximum usage, because we request a getMergeBufferUsage when SpillingGrouper#close is called. I do not really know whether we are guaranteed to see the maximum usage when the grouper is closing this class...
I am worried about the case where maybe we configure 1GiB to the merge buffer, and the usage in the middle goes to, say 900MiB, but when the Grouper is closed, the usage shows ~300MiB. The user is then encouraged to lower the merge buffer allocation to 500MiB, which will be problematic.
This comment would also hopefully address your query about why reset does not change maxMergeBufferUsageBytes.
|
|
||
| protected void updateMaxTableBufferUsage() | ||
| { | ||
| maxTableBufferUsage = Math.max(maxTableBufferUsage, tableBuffer.capacity()); |
There was a problem hiding this comment.
Unless I'm missing something, the issue #17902 was created to actually get some visibility into actual merge buffer usage, but this metric would just tell us how much was actually configured instead?
tableBuffer.capacity() would just indicate the capacity of the buffers, so more or less what was configured via druid.processing.buffer.sizeBytes.
There was a problem hiding this comment.
You are right about the current metric reporting the allocation.
I relooked the implementation, and found that maybe we can better estimate the ByteBufferHashTable. Since the ByteBufferHashTable is an open-addressing hash table, we can use the number of elements in table (size) * space taken up per bucket (bucketSizeWithHash) to estimate the usage in bytes.
There was a problem hiding this comment.
Thanks for the update. Please see https://github.com/apache/druid/pull/18731/changes#r2706872449
There was a problem hiding this comment.
Made the AlternatingByteBufferHashTable inherit the max metrics reporting from the superclass. Should now accurately report the usage :)
| long hashTableUsage = hashTable.getMaxTableBufferUsage(); | ||
| long offSetListUsage = offsetList.getMaxMergeBufferUsageBytes(); | ||
| return hashTableUsage + offSetListUsage; |
There was a problem hiding this comment.
Same comment, I think this would just more or less tell us configured size rather than actual buffer usage. (more or less because of offset list tracking)
…atsMonitorTest.java Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>
|
Hi @abhishekrb19, I have revisited your comments, and have made the relevant changes/replies. Please take a look at the new approach to calculating the usage. Thanks! |
| long currentBufferUsedBytes = 0; | ||
| for (ByteBuffer buffer : subHashTableBuffers) { | ||
| currentBufferUsedBytes += buffer.capacity(); | ||
| } |
There was a problem hiding this comment.
I think this effectively would just be tableArenaSize which would reflect the allocated configured size rather than actual used size?
There was a problem hiding this comment.
Please update LimitedBufferHashGrouperTest, BufferHashGrouperTest and related grouper tests to validate the correctness of these implementations.
I just pulled in the latest patch locally and ran some group by queries and noticed that the bytesUsed and maxBytesUsed were more or less what was configured druid.processing.buffer.sizeBytes 🤔
I’ll try to dig into this more, but in the meantime, I’d still recommend splitting the PR into two parts: 1. max metrics 2. the bytesUsed and maxBytesUsed metrics
2 seems a bit more involved.
There was a problem hiding this comment.
Hello, I have added the tests for the groupers.
I did not get the same results as you, maybe because I used queries for a smaller dataset.
How I did in my tests is to query with spill to disk enabled:
- Set druid.processing.buffer.sizeBytes = 1GB
- Query on a dataset. (Let's say the results for this is 100MB)
- Set druid.processing.buffer.sizeBytes to a much smaller value ~5MB
- Query on the same dataset, and watch the usage metrics cap at 5MB, with spillage to disk ~95MB.
Here's an example of how my max metrics look like:

I do have to admit, some of the values are kinda "blocky", like it will report ~28MB repeatedly for, say 3 consecutive metrics, then report some other value. Maybe this is because similar queries are being sent during a short period of time, and perhaps the allocated space is the same for these similar queries. Hopefully, this will be fixed by your catch -- reporting the usage instead of the capacity. 😄
There was a problem hiding this comment.
Yeah, I was playing around with it and I noticed more accurate reporting with the latest change than with the previous iterations.
I do have to admit, some of the values are kinda "blocky", like it will report ~28MB repeatedly for, say 3 consecutive metrics, then report some other value.
I wonder if the "stickiness" in the reporting is coming from reset() that was observed in the test: #18731 (comment)
|
|
||
| protected void updateMaxTableBufferUsage() | ||
| { | ||
| maxTableBufferUsage = Math.max(maxTableBufferUsage, tableBuffer.capacity()); |
There was a problem hiding this comment.
Thanks for the update. Please see https://github.com/apache/druid/pull/18731/changes#r2706872449
As suggested, moved 3 max metrics to #18934, leaving |
Tests for buffer hash grouper
|
The conflicts have been resolved. You can look again when you're free. Thanks! |
Ack, thanks @GWphua! I will take a look at the latest changes later this week/early next week |
abhishekrb19
left a comment
There was a problem hiding this comment.
Sorry for the delay @GWphua. I had some questions/suggestions and thanks for adding the tests!
| Assert.assertEquals(5L * expectedBucketSize, grouper.getMergeBufferUsedBytes()); | ||
|
|
There was a problem hiding this comment.
Was curious about the behavior here. Any idea why after a reset(), just adding these isn't working as expected? The 6 entries below do work (just anything under 5 aggregates() fail):
grouper.aggregate(new IntKey(1));
grouper.aggregate(new IntKey(2));
Assert.assertEquals(2L * expectedBucketSize, grouper.getMergeBufferUsedBytes());
For anything less than what was aggregate() from before reset(), it seems to keep track of the max bytes from before, so it's essentially overreporting.
java.lang.AssertionError:
Expected :58
Actual :145
(145 = 5 * expectedBucketSize, it keeps memory of stuff from before reset() so it's saving the value from before)
There was a problem hiding this comment.
This is because grouper.getMergeBufferUsedBytes tracks the max bytes used for a single query.
During a reset(), the history of the max bytes used is not reset. So after reset(), the value of grouper.getMergeBufferUsedBytes is unchanged (stays at 5). The test below it shows how much bytes are used by the grouper if we try to add a bunch of new items.
The new value of grouper.getMergeBufferUsedBytes will be Math.max(used bytes before reset, used bytes now)
| tableBuffer.putInt(Groupers.getUsedFlag(keyHash)); | ||
| tableBuffer.put(keyBuffer); | ||
| size++; | ||
| updateMaxMergeBufferUsedBytes(); |
There was a problem hiding this comment.
Should there be a similar call to updateMaxMergeBufferUsedBytes() from adjustTableWhenFull() as well?
The LimitedBufferHashGrouper does that too
There was a problem hiding this comment.
Since updateMaxMergeBufferUsedBytes is calculated by size * bucketSizeWithHash, I added this function wherever size or bucketSizeWithHash is changed.
ByteBufferHashTable does not change both values in adjustTableWhenFull, so I did not add that. Granted, from a code reader's perspective, the places where the max merge buffer usage is updated seems quite random...
I have added a Javadoc to the related function.
| throw new ISE("Grouper is closed"); | ||
| } | ||
|
|
||
| groupers.forEach(Grouper::reset); |
There was a problem hiding this comment.
Got it. It’s generally preferable not to mix refactoring and functional changes together, but this one is relatively straightforward, so it’s okay with me.
Fixes #17902
Huge thanks to @gianm for the implementation tip in the issue!
Description
Tracking merge buffer usage
AbstractBufferHashGrouperand its implementations.ByteBufferHashTablealong with an offset tracker.ByteBufferHashTable, and maximum offset size calculated throughout the query's lifecycle.Incorporated a helpful suggestion by @aho135 : since the size of the hash tables are ever-changing, it makes sense to conduct calculations by taking the maximum values across queries -- so operators can have a better understanding of how the size of merge buffers can be configured.Edit: max metrics provided in #18934
Here's an example of the current SUM implementations, vs the MAX implementation The latter helps to tell us that we should probably configure merge buffer sizes to 2G for this case:

Release note
GroupByStatsMonitornow provides metrics "mergeBuffer/bytesUsed", and max metrics for merge buffer acquisition time, bytes used, spilled bytes, and merge dictionary size.Key changed/added classes in this PR
GroupByStatsProviderThis PR has:
Possible further enhancements
While building this PR, I have come across some problems which we can further enhance in the future:
Nested Group-bys
The current metric is great, but will not report accurately for nested group-by's. (Do correct me on this if I'm mistaken though!)
As far as I know, nested groupby limits the merge buffers usage count to 2, meaning the merge buffer will be re-used. IIUC, every ConcurrentGrouper (if concurrency is enabled) / SpillingGrouper (if concurrency disabled) is created and closed multiple times, and hence a per-query metric will likely over-report the merge buffer usage.
Simplify Memory Management
Right now we need to configure the following for each queryable service:
It will be great if we can simplify the calculations down to simply configuring direct memory, and we can manage a memory pool instead. This allows for more flexibility (unused memory allocated for merge buffers may be used by processing threads instead).