Skip to content

Push limit into aggregation when nested GROUP BY expressions are equivalent but not identical #8101

@msirek

Description

@msirek

Is your feature request related to a problem or challenge?

#8038 adds support for pushing a LIMIT into a DISTINCT and/or GROUP BY expression with no aggregate expressions.
For example, notice lim=[5] in this EXPLAIN, propagated from the LIMIT clause:

EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;
----
logical_plan
Limit: skip=0, fetch=5
--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
------TableScan: aggregate_test_100 projection=[c3]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
------------CoalescePartitionsExec
--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true

If there are multiple levels of aggregation with equivalent, but not identical GROUP BY expressions, the limit cannot currently be pushed into all aggregations:

EXPLAIN SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c2, c3 limit 3 offset 10;
logical_plan
Limit: skip=10, fetch=3
--Aggregate: groupBy=[[aggregate_test_100.c3, aggregate_test_100.c2]], aggr=[[]]
----Projection: aggregate_test_100.c3, aggregate_test_100.c2
------Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], aggr=[[]]
--------TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
GlobalLimitExec: skip=10, fetch=3
--AggregateExec: mode=Final, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[c3@0 as c3, c2@1 as c2], aggr=[], lim=[13]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------ProjectionExec: expr=[c3@1 as c3, c2@0 as c2]
------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[] -- Limit could be pushed here, but isn't
--------------CoalescePartitionsExec
----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]  -- Limit could be pushed here, but isn't
------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true

Describe the solution you'd like

This issue is opened to extend the LimitedDistinctAggregation to push the limit as deep as possible into nested AggregateExec operations having equivalent, but non-identical PhysicalGroupBy expressions.

This may also include GROUPING SETS, CUBE and ROLLUP expressions, which should be researched to find out if this rewrite is applicable. For example:

EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3;
----
logical_plan
Limit: skip=0, fetch=3
--Aggregate: groupBy=[[ROLLUP (aggregate_test_100.c2, aggregate_test_100.c3)]], aggr=[[]]
----TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
GlobalLimitExec: skip=0, fetch=3
--AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[3]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[], lim=[3]  --- Is is legal to push the limit down to this level?
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true

Describe alternatives you've considered

None

Additional context

There are equivalence classes which currently define a "set of [Arc<dyn PhysicalExpr>]s that are known to have the same value for all tuples in a relation", and there are related utility functions such as physical_exprs_bag_equal which seem possibly useful here:

https://github.com/apache/arrow-datafusion/blob/c2e768052c43e4bab6705ee76befc19de383c2cb/datafusion/physical-expr/src/physical_expr.rs#L247-L252

A PhysicalGroupBy is similar, but has more than just a slice of PhysicalExprs. It also has corresponding null expressions and a null mask for each group in a grouping set:

https://github.com/apache/arrow-datafusion/blob/6071ee436051c9de0858835dd6ea1d763d4a1c12/datafusion/physical-plan/src/aggregates/mod.rs#L153-L163

So, the equivalence class interfaces maybe cannot be used directly out of the box, and have a different meaning of equivalence, in that they're based on equality predicates in the query. However, their implementation could inform a similar technique for a new equivalence checking method for PhysicalGroupBy.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions