-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix: repartition for grouping set #16983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: repartition for grouping set #16983
Conversation
| || has_grouping_id) | ||
| && session_state.config().repartition_aggregations(); | ||
|
|
||
| let next_partition_mode = if can_repartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should put code here if can_repartition || has_grouping_id {
|
@thinkharderdev / @avantgardnerio -- do you have some time to help review this PR? |
| 01)ProjectionExec: expr=[id@0 as id] | ||
| 02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[], ordering_mode=PartiallySorted([0]) | ||
| 03)----CoalesceBatchesExec: target_batch_size=8192 | ||
| 04)------RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 1), input_partitions=2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm not understanding something but how does "repartitioning" to a single partition change anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it has single partition, but multiple record batches. aggregation assumes that records in same group are adjacent, but it's not true for this case. repartition solves this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, been busy for past few days so just getting back to this. I think I understand the underlying issue now since id is a const we infer it as a singleton which is why we get the issue.
Still I'm concerned that we are solving this with a pretty blunt instrument. Adding a repartition to ever aggregation with a grouping set can have a non-trivial cost, especially in a distributed query.
Looking into it a bit more, it seems like in this case we infer SortProperties::Singleton for the id expr in the final aggregation which I think is incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the underlying issue is that in aggregation with group_id, the partial aggregation and final aggregation have different group columns. if partition num is greater than 1, it always do repartition, so this problem is covered up.
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Rationale for this change
For aggregation with grouping set,
the group by expressions differ between the partial and final stages,
the implementation of final stage aggregation depends on the order of input.
What changes are included in this PR?
repartition if group by contains grouping set
Are these changes tested?
UT
Are there any user-facing changes?
No