From fcb6f9e4c63a613b5906c558e28315ba084e9474 Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 31 Oct 2023 17:31:02 +0800 Subject: [PATCH 1/3] fix: single_distinct_aggretation_to_group_by faile --- .../src/single_distinct_to_groupby.rs | 23 +++++++++--- .../test_files/single_distinct.slt | 35 +++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/single_distinct.slt diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 8e0f93cb57819..5f55ea55d54b9 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -28,7 +28,7 @@ use datafusion_expr::{ expr::AggregateFunction, logical_plan::{Aggregate, LogicalPlan, Projection}, utils::columnize_expr, - Expr, ExprSchemable, + Cast, Expr, ExprSchemable, }; use hashbrown::HashSet; @@ -165,13 +165,26 @@ impl OptimizerRule for SingleDistinctToGroupBy { }) => { // is_single_distinct_agg ensure args.len=1 if group_fields_set.insert(args[0].display_name()?) { - inner_group_exprs.push( - args[0].clone().alias(SINGLE_DISTINCT_ALIAS), - ); + inner_group_exprs.push(match &args[0] { + Expr::Cast(cast_expr) => cast_expr + .expr + .clone() + .alias(SINGLE_DISTINCT_ALIAS), + _ => args[0].clone().alias(SINGLE_DISTINCT_ALIAS), + }); } + + let mut expr = col(SINGLE_DISTINCT_ALIAS); + if let Expr::Cast(cast_expr) = &args[0] { + expr = Expr::Cast(Cast::new( + Box::new(expr), + cast_expr.data_type.clone(), + )); + } + Ok(Expr::AggregateFunction(AggregateFunction::new( fun.clone(), - vec![col(SINGLE_DISTINCT_ALIAS)], + vec![expr], false, // intentional to remove distinct here filter.clone(), order_by.clone(), diff --git a/datafusion/sqllogictest/test_files/single_distinct.slt b/datafusion/sqllogictest/test_files/single_distinct.slt new file mode 100644 index 0000000000000..64bc381de4076 --- /dev/null +++ b/datafusion/sqllogictest/test_files/single_distinct.slt @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +############# +## Tests for single distinct to group by optimization rule +############# + +statement ok +CREATE TABLE t(x int) AS VALUES (1), (2), (1); + +query II +SELECT SUM(DISTINCT x), MAX(DISTINCT x) from t GROUP BY x ORDER BY x; +---- +1 1 +2 2 + +query II +SELECT MAX(DISTINCT x), SUM(DISTINCT x) from t GROUP BY x ORDER BY x; +---- +1 1 +2 2 From b612f097610704b2374330a23a587338965fe30c Mon Sep 17 00:00:00 2001 From: hhj Date: Wed, 1 Nov 2023 10:53:23 +0800 Subject: [PATCH 2/3] fix --- .../src/single_distinct_to_groupby.rs | 25 +++-------- .../test_files/single_distinct.slt | 42 +++++++++++++++++++ 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 5f55ea55d54b9..be76c069f0b73 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -28,7 +28,7 @@ use datafusion_expr::{ expr::AggregateFunction, logical_plan::{Aggregate, LogicalPlan, Projection}, utils::columnize_expr, - Cast, Expr, ExprSchemable, + Expr, ExprSchemable, }; use hashbrown::HashSet; @@ -74,7 +74,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { distinct_count += 1; } for e in args { - fields_set.insert(e.display_name()?); + fields_set.insert(e.canonical_name()); } } } @@ -165,26 +165,13 @@ impl OptimizerRule for SingleDistinctToGroupBy { }) => { // is_single_distinct_agg ensure args.len=1 if group_fields_set.insert(args[0].display_name()?) { - inner_group_exprs.push(match &args[0] { - Expr::Cast(cast_expr) => cast_expr - .expr - .clone() - .alias(SINGLE_DISTINCT_ALIAS), - _ => args[0].clone().alias(SINGLE_DISTINCT_ALIAS), - }); + inner_group_exprs.push( + args[0].clone().alias(SINGLE_DISTINCT_ALIAS), + ); } - - let mut expr = col(SINGLE_DISTINCT_ALIAS); - if let Expr::Cast(cast_expr) = &args[0] { - expr = Expr::Cast(Cast::new( - Box::new(expr), - cast_expr.data_type.clone(), - )); - } - Ok(Expr::AggregateFunction(AggregateFunction::new( fun.clone(), - vec![expr], + vec![col(SINGLE_DISTINCT_ALIAS)], false, // intentional to remove distinct here filter.clone(), order_by.clone(), diff --git a/datafusion/sqllogictest/test_files/single_distinct.slt b/datafusion/sqllogictest/test_files/single_distinct.slt index 64bc381de4076..c9dd9bcb732ac 100644 --- a/datafusion/sqllogictest/test_files/single_distinct.slt +++ b/datafusion/sqllogictest/test_files/single_distinct.slt @@ -22,6 +22,9 @@ statement ok CREATE TABLE t(x int) AS VALUES (1), (2), (1); +statement ok +create table t1(x bigint,y int) as values (9223372036854775807,2), (9223372036854775806,2); + query II SELECT SUM(DISTINCT x), MAX(DISTINCT x) from t GROUP BY x ORDER BY x; ---- @@ -33,3 +36,42 @@ SELECT MAX(DISTINCT x), SUM(DISTINCT x) from t GROUP BY x ORDER BY x; ---- 1 1 2 2 + +query TT +EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT x) FROM t1 GROUP BY y; +---- +logical_plan +Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x) +--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT CAST(t1.x AS Float64)), MAX(DISTINCT t1.x)]] +----TableScan: t1 projection=[x, y] +physical_plan +ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)] +--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] +----CoalesceBatchesExec: target_batch_size=8192 +------RepartitionExec: partitioning=Hash([y@0], 4), input_partitions=4 +--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] +------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y; +---- +logical_plan +Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x) +--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]] +----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]], aggr=[[]] +------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y +--------TableScan: t1 projection=[x, y] +physical_plan +ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)] +--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] +----CoalesceBatchesExec: target_batch_size=8192 +------RepartitionExec: partitioning=Hash([y@0], 4), input_partitions=4 +--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] +----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[] +------------CoalesceBatchesExec: target_batch_size=8192 +--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 4), input_partitions=4 +----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[] +--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] +----------------------MemoryExec: partitions=1, partition_sizes=[1] From b8abcf02ac373ffabad1fecc1e0b60f7dfccdcb6 Mon Sep 17 00:00:00 2001 From: hhj Date: Thu, 2 Nov 2023 10:18:49 +0800 Subject: [PATCH 3/3] move test to groupby.slt --- .../sqllogictest/test_files/groupby.slt | 58 ++++++++++++++ .../test_files/single_distinct.slt | 77 ------------------- 2 files changed, 58 insertions(+), 77 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/single_distinct.slt diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 5cb3ac2f81353..ef6fee69abbfb 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3733,3 +3733,61 @@ AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[SUM(multip --------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true + +# Tests for single distinct to group by optimization rule +statement ok +CREATE TABLE t(x int) AS VALUES (1), (2), (1); + +statement ok +create table t1(x bigint,y int) as values (9223372036854775807,2), (9223372036854775806,2); + +query II +SELECT SUM(DISTINCT x), MAX(DISTINCT x) from t GROUP BY x ORDER BY x; +---- +1 1 +2 2 + +query II +SELECT MAX(DISTINCT x), SUM(DISTINCT x) from t GROUP BY x ORDER BY x; +---- +1 1 +2 2 + +query TT +EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT x) FROM t1 GROUP BY y; +---- +logical_plan +Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x) +--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT CAST(t1.x AS Float64)), MAX(DISTINCT t1.x)]] +----TableScan: t1 projection=[x, y] +physical_plan +ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)] +--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] +----CoalesceBatchesExec: target_batch_size=2 +------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8 +--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] +------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y; +---- +logical_plan +Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x) +--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]] +----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]], aggr=[[]] +------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y +--------TableScan: t1 projection=[x, y] +physical_plan +ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)] +--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] +----CoalesceBatchesExec: target_batch_size=2 +------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8 +--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] +----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[] +------------CoalesceBatchesExec: target_batch_size=2 +--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8 +----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[] +--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] +----------------------MemoryExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/single_distinct.slt b/datafusion/sqllogictest/test_files/single_distinct.slt deleted file mode 100644 index c9dd9bcb732ac..0000000000000 --- a/datafusion/sqllogictest/test_files/single_distinct.slt +++ /dev/null @@ -1,77 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -############# -## Tests for single distinct to group by optimization rule -############# - -statement ok -CREATE TABLE t(x int) AS VALUES (1), (2), (1); - -statement ok -create table t1(x bigint,y int) as values (9223372036854775807,2), (9223372036854775806,2); - -query II -SELECT SUM(DISTINCT x), MAX(DISTINCT x) from t GROUP BY x ORDER BY x; ----- -1 1 -2 2 - -query II -SELECT MAX(DISTINCT x), SUM(DISTINCT x) from t GROUP BY x ORDER BY x; ----- -1 1 -2 2 - -query TT -EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT x) FROM t1 GROUP BY y; ----- -logical_plan -Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x) ---Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT CAST(t1.x AS Float64)), MAX(DISTINCT t1.x)]] -----TableScan: t1 projection=[x, y] -physical_plan -ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)] ---AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] -----CoalesceBatchesExec: target_batch_size=8192 -------RepartitionExec: partitioning=Hash([y@0], 4), input_partitions=4 ---------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] -------------MemoryExec: partitions=1, partition_sizes=[1] - -query TT -EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y; ----- -logical_plan -Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x) ---Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]] -----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]], aggr=[[]] -------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y ---------TableScan: t1 projection=[x, y] -physical_plan -ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)] ---AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] -----CoalesceBatchesExec: target_batch_size=8192 -------RepartitionExec: partitioning=Hash([y@0], 4), input_partitions=4 ---------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] -----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[] -------------CoalesceBatchesExec: target_batch_size=8192 ---------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 4), input_partitions=4 -----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[] ---------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] -----------------------MemoryExec: partitions=1, partition_sizes=[1]