From 53a201ddc782c28940f2d6993aa8accc87d6409f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 12 Aug 2025 11:25:25 +0200 Subject: [PATCH 1/3] Regression test --- .../sqllogictest/test_files/issue_17138.slt | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/issue_17138.slt diff --git a/datafusion/sqllogictest/test_files/issue_17138.slt b/datafusion/sqllogictest/test_files/issue_17138.slt new file mode 100644 index 0000000000000..8072a84d10758 --- /dev/null +++ b/datafusion/sqllogictest/test_files/issue_17138.slt @@ -0,0 +1,38 @@ +statement ok +CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER) + +statement ok +INSERT INTO tab1 VALUES(51,14,96) + +# TODO this query used to (accidentally) pass before https://github.com/apache/datafusion/pull/17088 +query error DataFusion error: Execution error: avg\(DISTINCT\) aggregations are not available +SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1 + +query TT +EXPLAIN SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1 +---- +logical_plan +01)Projection: Float64(NULL) AS col0 +02)--Aggregate: groupBy=[[]], aggr=[[avg(DISTINCT Float64(4)) AS avg(DISTINCT Int64(4))]] +03)----TableScan: tab1 projection=[] +physical_plan +01)ProjectionExec: expr=[NULL as col0] +02)--AggregateExec: mode=Single, gby=[], aggr=[avg(DISTINCT Int64(4))] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# TODO this query used to (accidentally) pass before https://github.com/apache/datafusion/pull/17088 +# Similar, with a few more arithmetic operations +query error DataFusion error: Execution error: avg\(DISTINCT\) aggregations are not available +SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1 + +query TT +EXPLAIN SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1 +---- +logical_plan +01)Projection: Float64(NULL) AS col0 +02)--Aggregate: groupBy=[[]], aggr=[[avg(DISTINCT Float64(4)) AS avg(DISTINCT Int64(4))]] +03)----TableScan: tab1 projection=[] +physical_plan +01)ProjectionExec: expr=[NULL as col0] +02)--AggregateExec: mode=Single, gby=[], aggr=[avg(DISTINCT Int64(4))] +03)----DataSourceExec: partitions=1, partition_sizes=[1] From 9e02646c5de14ce3f9f79f94929b7e6bcd9c1790 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 12 Aug 2025 11:26:01 +0200 Subject: [PATCH 2/3] Eliminate all redundant aggregations Before the change, it was disallowed to have an aggregation without GROUP BY and without any aggregate functions. This prevented the optimizer from removing each redundant aggregation if all were redundant. The first one would always be retained. This commit removes the limitation, allowing for queries to be further optimized. --- datafusion/core/tests/dataframe/mod.rs | 64 +++++++++---------- datafusion/expr/src/logical_plan/plan.rs | 5 -- .../optimizer/src/optimize_projections/mod.rs | 24 +------ .../sqllogictest/test_files/explain.slt | 8 +-- .../sqllogictest/test_files/explain_tree.slt | 7 +- .../test_files/expr/date_part.slt | 2 +- .../sqllogictest/test_files/issue_17138.slt | 20 +++--- .../test_files/spark/bitwise/getbit.slt | 1 - .../sqllogictest/test_files/subquery.slt | 5 +- 9 files changed, 51 insertions(+), 85 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 36a1161541756..d50cf15956ea1 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2744,23 +2744,21 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), - @r###" - +---------------+---------------------------------------------------------+ - | plan_type | plan | - +---------------+---------------------------------------------------------+ - | logical_plan | LeftSemi Join: | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __correlated_sq_1 | - | | Projection: | - | | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] | - | | TableScan: t2 projection=[] | - | physical_plan | NestedLoopJoinExec: join_type=RightSemi | - | | ProjectionExec: expr=[] | - | | PlaceholderRowExec | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+---------------------------------------------------------+ - "### + @r" + +---------------+-----------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------+ + | logical_plan | LeftSemi Join: | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __correlated_sq_1 | + | | Aggregate: groupBy=[[]], aggr=[[]] | + | | TableScan: t2 projection=[] | + | physical_plan | NestedLoopJoinExec: join_type=RightSemi | + | | PlaceholderRowExec | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+-----------------------------------------------------+ + " ); let df_results = ctx @@ -2783,23 +2781,21 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> { assert_snapshot!( pretty_format_batches(&df_results).unwrap(), - @r###" - +---------------+---------------------------------------------------------------------+ - | plan_type | plan | - +---------------+---------------------------------------------------------------------+ - | logical_plan | LeftSemi Join: | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __correlated_sq_1 | - | | Projection: | - | | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] | - | | TableScan: t2 projection=[] | - | physical_plan | NestedLoopJoinExec: join_type=RightSemi | - | | ProjectionExec: expr=[] | - | | PlaceholderRowExec | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+---------------------------------------------------------------------+ - "### + @r" + +---------------+-----------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------+ + | logical_plan | LeftSemi Join: | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __correlated_sq_1 | + | | Aggregate: groupBy=[[]], aggr=[[]] | + | | TableScan: t2 projection=[] | + | physical_plan | NestedLoopJoinExec: join_type=RightSemi | + | | PlaceholderRowExec | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+-----------------------------------------------------+ + " ); Ok(()) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d68e6cd812725..503afce577183 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -3520,11 +3520,6 @@ impl Aggregate { aggr_expr: Vec, schema: DFSchemaRef, ) -> Result { - if group_expr.is_empty() && aggr_expr.is_empty() { - return plan_err!( - "Aggregate requires at least one grouping or aggregate expression" - ); - } let group_expr_count = grouping_set_expr_count(&group_expr)?; if schema.fields().len() != group_expr_count + aggr_expr.len() { return plan_err!( diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 7b7be82b70ca0..a07d2d3f53526 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -153,24 +153,7 @@ fn optimize_projections( // Only use the absolutely necessary aggregate expressions required // by the parent: - let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); - - // Aggregations always need at least one aggregate expression. - // With a nested count, we don't require any column as input, but - // still need to create a correct aggregate, which may be optimized - // out later. As an example, consider the following query: - // - // SELECT count(*) FROM (SELECT count(*) FROM [...]) - // - // which always returns 1. - if new_aggr_expr.is_empty() - && new_group_bys.is_empty() - && !aggregate.aggr_expr.is_empty() - { - // take the old, first aggregate expression - new_aggr_expr = aggregate.aggr_expr; - new_aggr_expr.resize_with(1, || unreachable!()); - } + let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); @@ -1146,9 +1129,8 @@ mod tests { plan, @r" Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]] - Projection: - Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]] - TableScan: ?table? projection=[] + Aggregate: groupBy=[[]], aggr=[[]] + TableScan: ?table? projection=[] " ) } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 4d61b254f5077..c446088e6c447 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -429,14 +429,12 @@ logical_plan 01)LeftSemi Join: 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 -04)----Projection: -05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] -06)--------TableScan: t2 projection=[] +04)----Aggregate: groupBy=[[]], aggr=[[]] +05)------TableScan: t2 projection=[] physical_plan 01)NestedLoopJoinExec: join_type=LeftSemi 02)--DataSourceExec: partitions=1, partition_sizes=[0] -03)--ProjectionExec: expr=[] -04)----PlaceholderRowExec +03)--PlaceholderRowExec statement ok drop table t1; diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index f57c505068939..21de2d49fba2d 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1263,14 +1263,11 @@ physical_plan 04)│ join_type: LeftSemi │ │ 05)└─────────────┬─────────────┘ │ 06)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ -07)│ DataSourceExec ││ ProjectionExec │ +07)│ DataSourceExec ││ PlaceholderRowExec │ 08)│ -------------------- ││ │ 09)│ files: 1 ││ │ 10)│ format: csv ││ │ -11)└───────────────────────────┘└─────────────┬─────────────┘ -12)-----------------------------┌─────────────┴─────────────┐ -13)-----------------------------│ PlaceholderRowExec │ -14)-----------------------------└───────────────────────────┘ +11)└───────────────────────────┘└───────────────────────────┘ # Query with cross join. query TT diff --git a/datafusion/sqllogictest/test_files/expr/date_part.slt b/datafusion/sqllogictest/test_files/expr/date_part.slt index df17a8bca968d..64f16f72421a0 100644 --- a/datafusion/sqllogictest/test_files/expr/date_part.slt +++ b/datafusion/sqllogictest/test_files/expr/date_part.slt @@ -1089,4 +1089,4 @@ SELECT EXTRACT("isodow" FROM to_timestamp('2020-09-08T12:00:00+00:00')) query I SELECT EXTRACT('isodow' FROM to_timestamp('2020-09-08T12:00:00+00:00')) ---- -1 \ No newline at end of file +1 diff --git a/datafusion/sqllogictest/test_files/issue_17138.slt b/datafusion/sqllogictest/test_files/issue_17138.slt index 8072a84d10758..efc3fd9d486fc 100644 --- a/datafusion/sqllogictest/test_files/issue_17138.slt +++ b/datafusion/sqllogictest/test_files/issue_17138.slt @@ -4,35 +4,35 @@ CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER) statement ok INSERT INTO tab1 VALUES(51,14,96) -# TODO this query used to (accidentally) pass before https://github.com/apache/datafusion/pull/17088 -query error DataFusion error: Execution error: avg\(DISTINCT\) aggregations are not available +query R SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1 +---- +NULL query TT EXPLAIN SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1 ---- logical_plan 01)Projection: Float64(NULL) AS col0 -02)--Aggregate: groupBy=[[]], aggr=[[avg(DISTINCT Float64(4)) AS avg(DISTINCT Int64(4))]] +02)--Aggregate: groupBy=[[]], aggr=[[]] 03)----TableScan: tab1 projection=[] physical_plan 01)ProjectionExec: expr=[NULL as col0] -02)--AggregateExec: mode=Single, gby=[], aggr=[avg(DISTINCT Int64(4))] -03)----DataSourceExec: partitions=1, partition_sizes=[1] +02)--PlaceholderRowExec -# TODO this query used to (accidentally) pass before https://github.com/apache/datafusion/pull/17088 # Similar, with a few more arithmetic operations -query error DataFusion error: Execution error: avg\(DISTINCT\) aggregations are not available +query R SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1 +---- +NULL query TT EXPLAIN SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1 ---- logical_plan 01)Projection: Float64(NULL) AS col0 -02)--Aggregate: groupBy=[[]], aggr=[[avg(DISTINCT Float64(4)) AS avg(DISTINCT Int64(4))]] +02)--Aggregate: groupBy=[[]], aggr=[[]] 03)----TableScan: tab1 projection=[] physical_plan 01)ProjectionExec: expr=[NULL as col0] -02)--AggregateExec: mode=Single, gby=[], aggr=[avg(DISTINCT Int64(4))] -03)----DataSourceExec: partitions=1, partition_sizes=[1] +02)--PlaceholderRowExec diff --git a/datafusion/sqllogictest/test_files/spark/bitwise/getbit.slt b/datafusion/sqllogictest/test_files/spark/bitwise/getbit.slt index ca1b2724a8ce8..7cfdfe8257277 100644 --- a/datafusion/sqllogictest/test_files/spark/bitwise/getbit.slt +++ b/datafusion/sqllogictest/test_files/spark/bitwise/getbit.slt @@ -73,4 +73,3 @@ query I SELECT getbit(11, NULL); ---- NULL - diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index ed73eecda03e2..5858bbf74e28d 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1453,9 +1453,8 @@ logical_plan 01)LeftSemi Join: 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 -04)----Projection: -05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] -06)--------TableScan: t2 projection=[] +04)----Aggregate: groupBy=[[]], aggr=[[]] +05)------TableScan: t2 projection=[] statement count 0 drop table t1; From c79cd3e941b306bb4c6808c943f41595a28e9936 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 12 Aug 2025 15:38:58 +0200 Subject: [PATCH 3/3] fixup! Eliminate all redundant aggregations --- datafusion/core/tests/dataframe/mod.rs | 6 ++---- datafusion/expr/src/logical_plan/plan.rs | 8 ++++++++ .../optimizer/src/optimize_projections/mod.rs | 19 ++++++++++++++----- .../sqllogictest/test_files/explain.slt | 3 +-- .../sqllogictest/test_files/issue_17138.slt | 6 ++---- .../sqllogictest/test_files/subquery.slt | 3 +-- 6 files changed, 28 insertions(+), 17 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index d50cf15956ea1..a38d6ef1ac200 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2751,8 +2751,7 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> { | logical_plan | LeftSemi Join: | | | TableScan: t1 projection=[a, b] | | | SubqueryAlias: __correlated_sq_1 | - | | Aggregate: groupBy=[[]], aggr=[[]] | - | | TableScan: t2 projection=[] | + | | EmptyRelation | | physical_plan | NestedLoopJoinExec: join_type=RightSemi | | | PlaceholderRowExec | | | DataSourceExec: partitions=1, partition_sizes=[1] | @@ -2788,8 +2787,7 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> { | logical_plan | LeftSemi Join: | | | TableScan: t1 projection=[a, b] | | | SubqueryAlias: __correlated_sq_1 | - | | Aggregate: groupBy=[[]], aggr=[[]] | - | | TableScan: t2 projection=[] | + | | EmptyRelation | | physical_plan | NestedLoopJoinExec: join_type=RightSemi | | | PlaceholderRowExec | | | DataSourceExec: partitions=1, partition_sizes=[1] | diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 503afce577183..ea21da29849ee 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -3520,6 +3520,14 @@ impl Aggregate { aggr_expr: Vec, schema: DFSchemaRef, ) -> Result { + if group_expr.is_empty() && aggr_expr.is_empty() { + return plan_err!( + "Aggregate requires at least one grouping or aggregate expression. \ + Aggregate without grouping expressions nor aggregate expressions is \ + logically equivalent to, but less efficient than, VALUES producing \ + single row. Please use VALUES instead." + ); + } let group_expr_count = grouping_set_expr_count(&group_expr)?; if schema.fields().len() != group_expr_count + aggr_expr.len() { return plan_err!( diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index a07d2d3f53526..08909f5f86675 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -26,12 +26,12 @@ use std::sync::Arc; use datafusion_common::{ get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column, - HashMap, JoinType, Result, + DFSchema, HashMap, JoinType, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::{ - logical_plan::LogicalPlan, Aggregate, Distinct, Expr, Projection, TableScan, Unnest, - Window, + logical_plan::LogicalPlan, Aggregate, Distinct, EmptyRelation, Expr, Projection, + TableScan, Unnest, Window, }; use crate::optimize_projections::required_indices::RequiredIndices; @@ -155,6 +155,16 @@ fn optimize_projections( // by the parent: let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); + if new_group_bys.is_empty() && new_aggr_expr.is_empty() { + // Global aggregation with no aggregate functions always produces 1 row and no columns. + return Ok(Transformed::yes(LogicalPlan::EmptyRelation( + EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + }, + ))); + } + let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); let necessary_indices = @@ -1129,8 +1139,7 @@ mod tests { plan, @r" Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]] - Aggregate: groupBy=[[]], aggr=[[]] - TableScan: ?table? projection=[] + EmptyRelation " ) } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index c446088e6c447..c32a5e6b33d62 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -429,8 +429,7 @@ logical_plan 01)LeftSemi Join: 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 -04)----Aggregate: groupBy=[[]], aggr=[[]] -05)------TableScan: t2 projection=[] +04)----EmptyRelation physical_plan 01)NestedLoopJoinExec: join_type=LeftSemi 02)--DataSourceExec: partitions=1, partition_sizes=[0] diff --git a/datafusion/sqllogictest/test_files/issue_17138.slt b/datafusion/sqllogictest/test_files/issue_17138.slt index efc3fd9d486fc..d7dcf8d4dbdc2 100644 --- a/datafusion/sqllogictest/test_files/issue_17138.slt +++ b/datafusion/sqllogictest/test_files/issue_17138.slt @@ -14,8 +14,7 @@ EXPLAIN SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1 ---- logical_plan 01)Projection: Float64(NULL) AS col0 -02)--Aggregate: groupBy=[[]], aggr=[[]] -03)----TableScan: tab1 projection=[] +02)--EmptyRelation physical_plan 01)ProjectionExec: expr=[NULL as col0] 02)--PlaceholderRowExec @@ -31,8 +30,7 @@ EXPLAIN SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL ---- logical_plan 01)Projection: Float64(NULL) AS col0 -02)--Aggregate: groupBy=[[]], aggr=[[]] -03)----TableScan: tab1 projection=[] +02)--EmptyRelation physical_plan 01)ProjectionExec: expr=[NULL as col0] 02)--PlaceholderRowExec diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 5858bbf74e28d..e33271cf6de99 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1453,8 +1453,7 @@ logical_plan 01)LeftSemi Join: 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 -04)----Aggregate: groupBy=[[]], aggr=[[]] -05)------TableScan: t2 projection=[] +04)----EmptyRelation statement count 0 drop table t1;