Support limit push down for GroupBy#3873
Conversation
|
@jon-wei any benchmark results? |
30bbc5a to
c63e147
Compare
Basic query benchmark results:master patch tpch 'lineitem' becnhmarkUsing the tpch lineitem dataset from https://github.com/implydata/redshift-benchmark, running a cluster with a single r3.2xlarge historical and r3.2xlarge broker, with the following configs: with the following query, grouping on a long column with ~2million cardinality, with a single long sum agg I got the following total query running times using the "time" cmd: master: patch (with push down enabled) I also ran the query above against a similar datasource where |
c63e147 to
f565ce5
Compare
There was a problem hiding this comment.
getRowOrderingPushDown(..) is only used when limitSpec does not have aggregator in it, so shoudnt' this case of aggIndex >= 0 should actually be an error
There was a problem hiding this comment.
removed the agg checks here since they weren't necessary, thanks
There was a problem hiding this comment.
haven't gone through whole code yet... but, speaking as a user, it would be nice for druid to automatically push down limits where results would be exactly same.
and for other case, push limits only if user asked explicitly.
There was a problem hiding this comment.
The approximate results seem to mean that the group-by results may not be valid if non-grouping keys are used for sort. This is because of the behavior of BufferGrouper. It first aggregates input rows until its hash table fills. When the hash table fills, it copies the top-n entries found so far to another hash table, and continues aggregation. Here, the copied entires are able to not belong to the real group-by result, and this can make the result invalid.
In this case, we cannot guarantee the approximation ratio because it will depend on the input order. In the worst case, users can get totally unexpected results but it's difficult for them to figure out until running the same query without limit push down.
So, IMHO, it would be better to disable when non-grouping keys are used for sort. What do you think?
There was a problem hiding this comment.
I updated the patch so that the limit push down is enabled when the sorting order only has grouping key fields and noted that within the docs
re: push down when sorting on non-grouping fields, I think it's good to keep it disabled by default, but I also think it's useful to allow it if the user really wants to do it (maybe some users have data/queries that happen to work well even with the approximation), so I've kept the push down context flag, with a note in the docs that there are no guarantees on the accuracy of the results in that case.
There was a problem hiding this comment.
Please remove unnecessary semi colon.
There was a problem hiding this comment.
Iterator.next() should throw NoSuchElementException.
There was a problem hiding this comment.
changed to throw NSEE
There was a problem hiding this comment.
should throw NoSuchElementException if curr >= size .
There was a problem hiding this comment.
should throw NoSuchElementException if curr >= size .
There was a problem hiding this comment.
Please remove codes for debug.
There was a problem hiding this comment.
removed debug print fn
There was a problem hiding this comment.
Might be useful if it can be generalized for other types as well.
There was a problem hiding this comment.
Hm, I could see that, I'll leave this specialized for the current use case in this PR though, if another use comes up later we can do a PR then
There was a problem hiding this comment.
IMO, it would be better to make a new Grouper supporting limit push down because its behavior is quite different from the original one in many parts to achieve a different goal. It will be helpful for better readability and maybe slight performance as well.
There was a problem hiding this comment.
split the limit handling stuff into a new LimitedBufferGrouper
There was a problem hiding this comment.
The approximate results seem to mean that the group-by results may not be valid if non-grouping keys are used for sort. This is because of the behavior of BufferGrouper. It first aggregates input rows until its hash table fills. When the hash table fills, it copies the top-n entries found so far to another hash table, and continues aggregation. Here, the copied entires are able to not belong to the real group-by result, and this can make the result invalid.
In this case, we cannot guarantee the approximation ratio because it will depend on the input order. In the worst case, users can get totally unexpected results but it's difficult for them to figure out until running the same query without limit push down.
So, IMHO, it would be better to disable when non-grouping keys are used for sort. What do you think?
ed3ffb5 to
52cfebe
Compare
|
Thanks for the comments so far, updated the PR |
|
marking this WIP, fixing some test failures, will reopen later |
67dba7b to
ec4ee4a
Compare
|
Updated this with a few more changes:
|
| protected int maxSize; | ||
|
|
||
| // current number of available/used buckets in the table | ||
| protected int buckets; |
There was a problem hiding this comment.
can we call this maxPossibleBuckets ?
There was a problem hiding this comment.
may be also document in the comment for above two variables that these numbers change on table resize.
There was a problem hiding this comment.
maxBuckets is probably better.
There was a problem hiding this comment.
Renamed to maxBuckets and added comments re: table resize
| protected int size; | ||
|
|
||
| // Maximum number of elements in the table before it must be resized | ||
| protected int maxSize; |
There was a problem hiding this comment.
can we call this regrowthThreshold ?
There was a problem hiding this comment.
renamed to regrowthThreshold
| protected ByteBuffer buffer; | ||
| protected int bucketSizeWithHash; | ||
| protected int tableArenaSize; | ||
| protected int keySize; |
There was a problem hiding this comment.
last 4 variables can be final i think
|
|
||
| import java.nio.ByteBuffer; | ||
|
|
||
| public class ByteBufferHashTable |
There was a problem hiding this comment.
@gianm is concept of regrowth there only because at initializing time we need to set 0 in first byte of all the buckets and it would be too expensive to do that upfront for whole buffer ? if yes, then, it'll be interesting to see how Unsafe.setMemory(..) performs over 1 GB buffer and if that is fast enough then regrowth business could possibly be removed.
There was a problem hiding this comment.
Yes, that's the reason. When I benchmarked "simple" groupBys with small result sets, zeroing out the first byte of each bucket was a big performance hog.
There was a problem hiding this comment.
That seems worth looking into, though I think that would take some time/effort to evaluate and could be handled in a follow-on PR.
7b71a1d to
7250a13
Compare
| { | ||
| private ByteBuffer[] subHashTableBuffers; | ||
| private ByteBufferHashTable[] subHashTables; | ||
| private ByteBufferHashTable activeHashTable; |
There was a problem hiding this comment.
not sure why we are creating ByteBufferHashTable structures on two haves? can't we just keep two ByteBuffer for two halves and a reference to activeByteBuffer ?
also, may be some comments saying there are only two of these (them being array gave me impression that there were many of those) .... alternated on each regrow where elements beyond limit are discarded.
There was a problem hiding this comment.
Changed this to use two ByteBuffers directly without the sub HashTables, added a comment on there being two buffers and a description of the swapping
| // Limit to apply to results. | ||
| // If limit > 0, track hash table entries in a binary heap with size of limit. | ||
| // If -1, no limit is applied, hash table entry offsets are tracked with an unordered list with no limit. | ||
| private int limit; |
There was a problem hiding this comment.
what is the case where limit <= 0 is valid?
There was a problem hiding this comment.
This was an outdated comment from an older revision, limit will never be <= 0 now, I deleted the comment
| return forcePushDown; | ||
| } | ||
|
|
||
| public boolean determineApplyLimitPushDown() |
There was a problem hiding this comment.
i think this can be implemented in one line return validateAndGetForceLimitPushDown() || DefaultLimitSpec.sortingOrderHasNonGroupingFields(..);
you might have to add the defaultLimitSpec.getLimit() == Integer.MAX_VALUE check inside DefaultLimitSpec.sortingOrderHasNonGroupingFields(..)
There was a problem hiding this comment.
Hm, I decided to change this area in a different way, I removed an unnecessary null and type check from sortingOrderHasNonGroupingFields(), but kept determineApplyLimitPushDown() the same.
The sortingOrder method is used elsewhere, where the MAX_VALUE check is no longer relevant (as it would've been already checked earlier when the query object was created), so I didn't think it really belongs there.
I also felt that the conditions for when limit push down will be applied are more clear if they're all in the determineApplyLimitPushDown() method.
|
@jon-wei looks good overall besides some comments. it makes sense overall , but hard to identify bugs. hoping that you have tested it on some cluster with few different nested/non-nested groupBy queries. |
|
I've addressed your comments, thanks for the review! Re: testing, the patch has been tested with non-nested groupBys on a large TPC-H dataset as part of the benchmarking. I am currently in the process of doing more stress testing on this with more query variations like nested GroupBys, I'll make a comment here once that testing is done. |
| // clear the used bits of both buffers | ||
| for (int i = 0; i < maxBuckets; i++) { | ||
| subHashTableBuffers[0].put(i * bucketSizeWithHash, (byte) 0); | ||
| subHashTableBuffers[1].put(i * bucketSizeWithHash, (byte) 0); |
There was a problem hiding this comment.
this one is not needed and will automatically be zeroed out in adjustTableWhenFull(..) when needed.
There was a problem hiding this comment.
Got rid of the second buffer reset there
|
I tested the patch with some nested group bys against the TPC-H dataset that I benchmarked against earlier, the results are correct. I've deployed the patch to Imply's internal test cluster as well. Also merged master just now |
|
thanks @jon-wei 👍 |
|
👍 |
| private boolean forcePushDownLimit = false; | ||
|
|
||
| @JsonProperty | ||
| private Class<? extends GroupByQueryMetricsFactory> queryMetricsFactory; |
There was a problem hiding this comment.
@jon-wei could you please remove this field, getter and setter? They were removed here: https://github.com/druid-io/druid/pull/4336/files#diff-5e30ea112240e82f233099071bb3389e but resurrected in this PR.
| private final List<AggregatorFactory> aggregatorSpecs; | ||
| private final List<PostAggregator> postAggregatorSpecs; | ||
|
|
||
| private final Function<Sequence<Row>, Sequence<Row>> limitFn; |
This patch adds a context flag to the GroupBy query that enables result limiting/sorting at the merge buffer level, allowing the brokers to process less data.
This is accomplished by using a min-max heap with a size limit in the BufferGrouper instead of a list of offsets.
The table buffer in the BufferGrouper is used differently in push down mode: instead of growing as buckets are added, the hash table buffer is split in half. When a half fills, the active table buffer swaps to the other half, and the top N buckets are copied over to the new active buffer.
Note that when the sorting order uses fields that are not in the grouping key, limit push down can result in approximate results.