Atomic merge buffer acquisition for groupBys#3939
Conversation
| { | ||
| final int requiredMergeBufferNum; | ||
| if (strategySelector.useStrategyV2(query)) { | ||
| final int groupByLayerNum = countGroupByLayers(query, 1); |
There was a problem hiding this comment.
I think this check could be simplified to just see if the top level query has a table datasource or inner query datasource, without needing to check the number of layers, since the buffer requirement is always 2 if layers > 1
There was a problem hiding this comment.
Since brokers require merge buffers for processing the groupBy layers beyond the inner-most one, a nested groupBy (groupBy -> groupBy -> table) requires only a single merge buffer.
It can be still simplified by early exiting the count recursion (or loop) when the found number of groupBy layers becomes 2, but I wonder how worthwhile it is because most groupBys have very short depth.
There was a problem hiding this comment.
@jihoonson even then we don't need to count beyond 3 layers
|
it looks like this will solve first case reported in #3819 too, however description seems to be very explicit about case two only. no ? |
| { | ||
| if (!objects.offer(theObject)) { | ||
| log.error("WTF?! Queue offer failed, uh oh..."); | ||
| offer(theObject); |
There was a problem hiding this comment.
nit: not sure why simple logic is separated in to a method
There was a problem hiding this comment.
For avoiding the mistake to forget handling the case when offer() is failed. This case means there is a bug in returning unused resources. We are now simply logging an error message, but I think it should be improved.
| Supplier<GroupByQueryConfig> configSupplier, | ||
| GroupByStrategySelector strategySelector, | ||
| @Global StupidPool<ByteBuffer> bufferPool, | ||
| GroupByQueryBrokerResourceInitializer brokerResourceInitializer, |
There was a problem hiding this comment.
instead of adding this class, can't we just add prepare(..) method to GroupByQueryStrategy and have that return the resource?
There was a problem hiding this comment.
Thanks. It sounds good. I'll change.
| final int requiredMergeBufferNum; | ||
| if (strategySelector.useStrategyV2(query)) { | ||
| final int groupByLayerNum = countGroupByLayers(query, 1); | ||
| requiredMergeBufferNum = Math.min(2, groupByLayerNum - 1); |
There was a problem hiding this comment.
Can you add a note or otherwise make it more clear that this GroupByQueryBrokerResource isn't used when running a non-nested query? It threw me off for a bit when I was looking at what happens when requiredMergeBufferNum is 0
There was a problem hiding this comment.
I'll add a note and improve the javadoc.
| final ResourceHolder<List<ByteBuffer>> mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum); | ||
| if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { | ||
| mergeBufferHolders.close(); | ||
| throw new ResourceLimitExceededException("Cannot acquire enough merge buffers"); |
There was a problem hiding this comment.
Failing the query here seems too aggressive to me, e.g.:
- Suppose there are two merge buffers, with a nested query A and a single-level query B
- Query B runs first, grabs one of the buffers
- Query A runs, with this query A would fail right there even though the query could be successfully executed if query A waited for query B to finish
I think the ResourceLimitExceededException should only be thrown if the number of buffers required by a single query exceeds the total number of buffers available, but not for the situation where a buffer is only temporarily unavailable
I think the query that needs more buffers than currently available should wait for the timeout set in the query before failing, for the nested queries that require > 2 this may need an atomic checkSizeAndDrain method that grabs the ArrayBlockingQueue's lock and checks the size before either grabbing resources or waiting for a timeout
There was a problem hiding this comment.
Thanks for your suggestion.
I think the case you mentioned is the problem of query scheduling. With query scheduling, when a query is submitted, it first waits for its turn until the resources are ready and other queries of higher priority complete.
However, timeout seems necessary in anyway. I'll add it soon.
There was a problem hiding this comment.
@jon-wei, I simply added a timeout parameter to drain() method. I think it is better for now because adding checkSizeAndDrain method causes the below problems.
- This method requires for
BlockingPoolto maintain a lock itself, and thus the type ofobjectsshould be changed from BlockingQueue to something another to avoid unnecessary locking. - Even with
checkSizeAndDrainmethod, the starvation problem still exists, so additional handling for that problem is required.
I think it would be better to redesign BlockingPool to address both problems, but it's not covered by this issue. So, how about opening a new issue for this?
There was a problem hiding this comment.
@jihoonson @jon-wei with the timeout , it looks good to me ... I don't think checkSizeAndDrain is necessary
can we reduce the timeout when sending the query to historical since we already used some of the time that user allowed ?
There was a problem hiding this comment.
Hm, without checking the size and draining atomically, two nested groupbys needing 2 buffers each could still in theory block each other:
-
Suppose there are 2 buffers, and two nested queries are issued simultaneously, the window for both acquiring one buffer and block each other should be much smaller now (from duration of subquery execution to the much shorter drainTo processing time)
-
Suppose there are 2 buffers, with two nested queries, but 1 buffer is currently in use by a non-nested query. Nested query A runs and drains one buffer, but waits for the second one. Nested query B also runs and sees no buffers, so it waits. Now suppose the non-nested query finishes and returns its buffer, but nested query B gets to run before nested query A, and takes the second buffer, leaving both nested queries blocked on each other
I'm okay with using this drain + timeout for now if that's the consensus, and opening a follow on issue about implementing truly atomic buffer acquisition coupled with something to address starvation issues for queries that need > 1 buffers
There was a problem hiding this comment.
In the first case, you mean two queries needing 2 buffers can block each other even when there are 2 available buffers in the pool?
Anyway, yes, drain + timeout is not enough. I'll open a follow-up issue if others agree.
There was a problem hiding this comment.
I'm curious why we don't simply use well-known libraries like netty. @gianm, would you share any reasons?
There was a problem hiding this comment.
Sorry, scratch what I said about the first case, while the BlockingQueue drainTo method contract doesn't specify what happens with concurrent modifications while draining occurs, the ArrayBlockingQueue implementation does have a lock internally
|
@jon-wei, @himanshug thanks for your review. I addressed your comments. @himanshug, yes this PR covers the case 1 as well. I updated the pr description. |
| final long timeout = timeoutAt - System.currentTimeMillis(); | ||
| if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { | ||
| throw new QueryInterruptedException(new TimeoutException()); | ||
| throw new TimeoutException(); |
There was a problem hiding this comment.
why this change? i think this is used to propagate issues properly from historicals to brokers
There was a problem hiding this comment.
BlockingPool.take() now throws RuntimeException instead of InterruptedException, and the below catch block catches all kinds of exceptions and throws again with wrapping QueryInterruptedException (https://github.com/druid-io/druid/pull/3939/files/4f36f619bdfe5f7084913e77c583cf2f423d304a#diff-852ac93b1541cb9178ad922dc30be4baR176). This line causes QueryInterruptedException is wrapped twice unnecessarily, so I changed.
| */ | ||
| public class GroupByQueryBrokerResource implements Closeable | ||
| { | ||
| private static final EmittingLogger log = new EmittingLogger(GroupByQueryBrokerResource.class); |
There was a problem hiding this comment.
nit: don't really need the EmittingLogger
| public ResourceHolder<ByteBuffer> getMergeBuffer() | ||
| { | ||
| Preconditions.checkState(mergeBuffers != null); | ||
| Preconditions.checkState(mergeBuffers.size() > 0); |
There was a problem hiding this comment.
Preconditions.checkState(mergeBuffer != null && mergeBuffers.size() > 0) ?
There was a problem hiding this comment.
The null mergeBuffers means this resource is initialized with 0 merge buffers, and mergeBuffers of size 0 means there remains no available merge buffers. I would like to make sure this.
| return config.withOverrides(query).getDefaultStrategy(); | ||
| } | ||
|
|
||
| public boolean useStrategyV2(GroupByQuery query) |
There was a problem hiding this comment.
not sure why is this introduced? who is using this
There was a problem hiding this comment.
I forgot to remove. Thanks.
| public GroupByStrategy strategize(GroupByQuery query) | ||
| { | ||
| final String strategyString = config.withOverrides(query).getDefaultStrategy(); | ||
| final String strategyString = getStrategy(query); |
There was a problem hiding this comment.
again, not sure if there is any advantage in separating this into a method
| checkInitialized(); | ||
| final T theObject; | ||
| try { | ||
| theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); |
There was a problem hiding this comment.
0 time means no wait.
There was a problem hiding this comment.
Why take() rather than poll()?
| checkInitialized(); | ||
| final T theObject; | ||
| try { | ||
| theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); |
There was a problem hiding this comment.
Why take() rather than poll()?
| Queues.drain(objects, batch, maxElements, timeout, TimeUnit.MILLISECONDS) : | ||
| objects.drainTo(batch, maxElements); | ||
| if (n < maxElements) { | ||
| if (log.isDebugEnabled()) { |
There was a problem hiding this comment.
log.isDebugEnabled() is baked in Logger already, so suggested
log.debug("Requested %d elements, but drained %d elements", maxElements, n);| public CloseableGrouperIterator<RowBasedKey, Row> make() | ||
| { | ||
| final List<Closeable> closeOnFailure = Lists.newArrayList(); | ||
| final List<Closeable> closeOnExit = Lists.newArrayList(); |
There was a problem hiding this comment.
Why not using Closer for this?
There was a problem hiding this comment.
It seems that the order of closing matters. I don't want to change it in this PR.
There was a problem hiding this comment.
Please leave a comment explaining this in the code.
| { | ||
| if (mergeBuffersHolder != null) { | ||
| if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { | ||
| log.warn((mergeBuffersHolder.get().size() - mergeBuffers.size()) + " resources are not returned yet"); |
| private final ResourceHolder<List<ByteBuffer>> mergeBuffersHolder; | ||
| private final List<ByteBuffer> mergeBuffers; | ||
|
|
||
| public GroupByQueryBrokerResource() |
There was a problem hiding this comment.
What's the point of this constructor?
There was a problem hiding this comment.
This default constructor is used when any merge buffers are not required for groupBy execution.
I renamed to GroupByQueryResource which is more general name because druid's convention generally doesn't distinguish the broker side things and others.
GroupByQueryResource can be used by queryable nodes, i.e., brokers, historicals, and real times. However, currently it is used by only brokers to get merge buffers atomically if necessary. And, even in brokers, merge buffers are not used when the groupBy strategy v1 is used.
| public ResourceHolder<ByteBuffer> getMergeBuffer() | ||
| { | ||
| Preconditions.checkState(mergeBuffers != null, "Resource is initialized with empty merge buffers"); | ||
| Preconditions.checkState(mergeBuffers.size() > 0, "No available merge buffers"); |
There was a problem hiding this comment.
If using ArrayDeque, these two lines could be replaced with buffer = mergeBuffers.pop()
| try { | ||
| mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum, timeout.longValue()); | ||
| if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { | ||
| mergeBufferHolders.close(); |
There was a problem hiding this comment.
Why TimeoutException? It's not a timeout. Maybe IllegalStateException
There was a problem hiding this comment.
Changed to InsufficientResourcesException
| @Override | ||
| public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean willMergeRunners) | ||
| { | ||
| if (!willMergeRunners) { |
There was a problem hiding this comment.
Please move this comment to countRequiredMergeBufferNum()
| return new GroupByQueryBrokerResource(mergeBufferHolders); | ||
| } | ||
| } | ||
| catch (Exception e) { |
There was a problem hiding this comment.
Why QueryInterruptedException?
There was a problem hiding this comment.
QueryInterruptedException is used for all kinds of failed queries.
- Add InsufficientResourcesException - Renamed GroupByQueryBrokerResource to GroupByQueryResource
|
@leventov thanks for your review. Addressed some of your comments. I don't know why I can't add an inline reply for your comment. That part is not what I changed. I just changed that |
| /** | ||
| * This exception is thrown when the requested operation cannot be completed due to a lack of available resources. | ||
| */ | ||
| public class InsufficientResourcesException extends Exception |
There was a problem hiding this comment.
Please make new exceptions to extend RuntimeException. Checked exceptions only force us to write more boilerplate, try-catch-throwables-propagate.
| public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; | ||
| public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; | ||
|
|
||
| private static final int MAX_MERGE_BUFFER_NUM = 2; |
There was a problem hiding this comment.
Worth adding at least "see countRequiredMergeBufferNum() for explanation" comment, or move part of countRequiredMergeBufferNum()'s comment to this constant
|
@jihoonson and about |
|
@leventov ah, I got your point. That's also not what I changed, but I clarified it. |
|
@jon-wei I simply added a new Sorry for going back and forth. Would you and other reviewers @himanshug @leventov mind reviewing again please? |
|
Hmm, travis has been done for a while but the PR hasn't figured that out yet. Going to bounce it to see if that helps. |
|
👍 |
|
@himanshug thank you for the review and merge. |
This patch fixes #3819.
After this patch, when a group-by query is submitted, all needed merge buffers are first acquired atomically before query execution.
This change is