Use mergeBuffer instead of processingBuffer in parallelCombiner#5634
Use mergeBuffer instead of processingBuffer in parallelCombiner#5634gianm merged 10 commits intoapache:masterfrom
Conversation
…for-parallel-combiner
|
@jihoonson sorry, could you please resolve conflicts? I think it conflicted with my patch #5630. Also, does this patch replace that one? If so, I should cancel the backport PR #5633. |
|
Ok, I'll leave #5633 open then, and additionally review this one. |
| checkInitialized(); | ||
| try { | ||
| return wrapObjects(takeObjects(elementNum)); | ||
| return pollObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); |
There was a problem hiding this comment.
takeObjects has become pollObjects - semantics have changed.
There was a problem hiding this comment.
Good catch! Fixed.
| } | ||
|
|
||
| /** | ||
| * Returns the resource. {@link #increment()} should be called carefully before using the returned resource. |
There was a problem hiding this comment.
What did you mean by this?
{@link #increment()} should be called carefully before using the returned resource.
It seems to suggest that callers must use increment before using the resource. However, it isn't always necessary. The refcount starts out as 1, so increment is only needed if you need to track more than one reference.
The idea with ReferenceCountingResourceHolder is that:
- Refcount starts at 1, and that first reference is released by calling
close() - You can increment the refcount by calling
increment, and that reference is released using the Releaser. - When all references are released (i.e.
closehas been called, plus all releasers have been released, if any exist) then the resource holder invokes itscloser.
There was a problem hiding this comment.
Thanks. Fixed javadoc.
| } | ||
|
|
||
| /** | ||
| * Closes this holder and decrements the reference count by 1. This method should be called after all |
There was a problem hiding this comment.
This isn't true. You can call close before the Releasers are closed, and it all works fine. If you do this, then when the last Releaser is closed, then the holder will invoke its closer.
There was a problem hiding this comment.
That's true. But, it's quite confused to me that the resource can be valid even after the holder is closed. Also, I think this is quite error-prone. For example, if someone forget to close Releaser or ReferenceCountingResourceHolder, the closer in holder will never be called. Probably adding some restrictions to this helps to reduce such mistakes. For example, if we enforce to close the holder after all Releaser is closed, we can add a sanity check to close() method. What do you think? This might not be an issue of this PR. If you agree, I'll raise another PR for this.
| { | ||
| try ( | ||
| // These variables are used to close releasers automatically. | ||
| @SuppressWarnings("unused") |
There was a problem hiding this comment.
IMO, //noinspection unused is easier on the eyes. But this is personal preference.
There was a problem hiding this comment.
Intellij still shows a red line even after adding //noinspection unused. Is there an option for this?
There was a problem hiding this comment.
I think it has to be above the line in question (not to the side).
If it doesn't work then stick with the annotation I suppose.
There was a problem hiding this comment.
I think //noinspection unused doesn't work here. The code Intellij automatically generates is the same.
| throw new QueryInterruptedException(e); | ||
| } | ||
| // If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining | ||
| final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1; |
There was a problem hiding this comment.
What if 2 merge buffers are not available? For example if numMergeBuffers is set to 1.
Also, the config docs should be updated to reflect that when parallel combining is used, the number of merge buffers needed will double.
There was a problem hiding this comment.
getMergeBuffer() takes required merge buffers atomically. If required buffers are not available, it would throw an exception. I added a check for the size of mergeBufferPool and changed to throw proper exceptions.
Also updated doc.
|
@gianm I've fixed unit test failures. Would you review again? |
| checkInitialized(); | ||
| try { | ||
| return wrapObjects(takeObjects(elementNum)); | ||
| return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); |
There was a problem hiding this comment.
I think this has a buffer leak (and it looks like the old code had the leak too, and so does pollObjects). If either pollObjects or takeObjects is interrupted while it's waiting for more objects to become available, then the objects popped from objects are not returned to the pool - they are lost.
There was a problem hiding this comment.
Would you elaborate more on how resource leak occurs on interruption?
The implementation of takeBatch is
private List<T> takeObjects(int elementNum) throws InterruptedException
{
final List<T> list = new ArrayList<>(elementNum);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.size() < elementNum) {
notEnough.await();
}
for (int i = 0; i < elementNum; i++) {
list.add(objects.pop());
}
return list;
}
finally {
lock.unlock();
}
}and InterruptedException can be thrown at lock.lockInterruptibly() and notEnough.await(). list.add(objects.pop()) is called only when there are enough number of available objects.
wrapObject() also doesn't check the interruption state, so objects should be wrapped once takeObjects() returns them.
There was a problem hiding this comment.
Ah, you're right, as long as nothing involved checks interrupts: list.add, objects.pop, wrapObject, etc. It looks like that is the case so there is no leak. Nevermind.
| } | ||
|
|
||
| /** | ||
| * Returns the resource. If multiple threads are supposed to call this method for the same holder, |
There was a problem hiding this comment.
It's useful for multiple threads but is not only for use by multiple threads. Reference counting is potentially useful even within a single thread (although I guess we don't use it for this today). I'd suggest wording like,
Returns the resource with an initial reference count of 1. More references can be added by
calling {@link #increment()}.
There was a problem hiding this comment.
Good point. Fixed.
|
|
||
| /** | ||
| * Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to | ||
| * decrement the reference count when the caller no longer needs the resource. |
There was a problem hiding this comment.
This should include wording like:
Releasers are not thread-safe. If multiple threads need references to the same holder, they should
each acquire their own Releaser.
| However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). | ||
|
|
||
| Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). | ||
| Once a historical finishes aggregation using the hash table, it sorts aggregates and merge them before sending to the |
There was a problem hiding this comment.
"sorts aggregates and merges them." (spelling)
Although, when I read this sentence I had to do some backtracking since I thought "aggregates" was a verb at first. So to avoid that consider wording like: "sorts the aggregated results and merges them"
| intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge | ||
| aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they | ||
| need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the | ||
| degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). |
| { | ||
| try { | ||
| if (numBuffers > mergeBufferPool.maxSize()) { | ||
| throw new ResourceLimitExceededException( |
There was a problem hiding this comment.
This error should include something like "Try raising druid.processing.numMergeBuffers."
| throw new TimeoutException(); | ||
| } | ||
| if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) { | ||
| throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); |
There was a problem hiding this comment.
Maybe this should be a TimeoutException? (One of the 4 types that gives callers a special error code; see QueryInterruptedException's getErrorCodeFromThrowable method)
The current wording and type does not make it clear that this is really a timeout error.
There was a problem hiding this comment.
Good point. Changed exception.
| { | ||
| try ( | ||
| // These variables are used to close releasers automatically. | ||
| @SuppressWarnings("unused") |
There was a problem hiding this comment.
I think it has to be above the line in question (not to the side).
If it doesn't work then stick with the annotation I suppose.
…he#5634) * Use mergeBuffer instead of processingBuffer in parallelCombiner * Fix test * address comments * fix test * Fix test * Update comment * address comments * fix build * Fix test failure
Fix #5629.
This change is