diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 4309a4216b1c2..c1fd60a89eb05 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -337,7 +337,7 @@ dependencies = [ [[package]] name = "datafusion" -version = "7.0.0" +version = "8.0.0" dependencies = [ "ahash", "arrow", @@ -369,7 +369,7 @@ dependencies = [ [[package]] name = "datafusion-cli" -version = "7.0.0" +version = "8.0.0" dependencies = [ "arrow", "clap", @@ -383,7 +383,7 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "7.0.0" +version = "8.0.0" dependencies = [ "arrow", "ordered-float 3.0.0", @@ -393,7 +393,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" -version = "7.0.0" +version = "8.0.0" dependencies = [ "async-trait", "chrono", @@ -406,7 +406,7 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "7.0.0" +version = "8.0.0" dependencies = [ "ahash", "arrow", @@ -416,7 +416,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "7.0.0" +version = "8.0.0" dependencies = [ "ahash", "arrow", @@ -439,7 +439,7 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "7.0.0" +version = "8.0.0" dependencies = [ "arrow", "datafusion-common", diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 7ca9a1e4d04f1..8c9eef3aafb04 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -33,8 +33,10 @@ use crate::{ MemTable, ViewTable, }, logical_plan::{PlanType, ToStringifiedPlan}, - optimizer::eliminate_filter::EliminateFilter, - optimizer::eliminate_limit::EliminateLimit, + optimizer::{ + eliminate_distinct_in_agg::EliminateDistinctInAgg, + eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit, + }, physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, @@ -1239,6 +1241,7 @@ impl SessionState { Arc::new(ProjectionPushDown::new()), Arc::new(FilterPushDown::new()), Arc::new(LimitPushDown::new()), + Arc::new(EliminateDistinctInAgg::new()), Arc::new(SingleDistinctToGroupBy::new()), ], physical_optimizers: vec![ diff --git a/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs b/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs new file mode 100644 index 0000000000000..a3a541280bcdc --- /dev/null +++ b/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to replace `where false` on a plan with an empty relation. +//! This saves time in planning and executing the query. +//! Note that this rule should be applied after simplify expressions optimizer rule. +use crate::error::Result; +use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{Aggregate, Projection}; +use crate::logical_plan::ExprSchemable; +use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan}; +use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::utils; +use datafusion_expr::AggregateFunction; +use std::sync::Arc; + +/// Optimization rule that elimanate the distinct in aggregation. +#[derive(Default)] +pub struct EliminateDistinctInAgg; + +impl EliminateDistinctInAgg { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +// Before optimize: +// Aggregate: groupBy=[[]], aggr=[[MAX(DISTINCT #test.c1)]] +// After optimize: +// Projection: #MAX(#test.c1) AS MAX(DISTINCT test.c1) +// Aggregate: groupBy=[[]], aggr=[[MAX(#test.c1)]] +impl OptimizerRule for EliminateDistinctInAgg { + fn optimize( + &self, + plan: &LogicalPlan, + execution_props: &ExecutionProps, + ) -> Result { + if is_max_or_min(plan) { + if let LogicalPlan::Aggregate(Aggregate { + input, + aggr_expr, + schema, + group_expr, + }) = plan + { + let len = group_expr.len(); + let mut all_args = group_expr.clone(); + // remove distinct + let new_aggr_expr = aggr_expr + .iter() + .map(|agg_expr| match agg_expr { + Expr::AggregateFunction { + fun, + args, + distinct, + } => { + if *distinct { + let expr = Expr::AggregateFunction { + fun: fun.clone(), + args: args.clone(), + distinct: false, + }; + all_args.push(expr.clone()); + expr + } else { + agg_expr.clone() + } + } + _ => agg_expr.clone(), + }) + .collect::>(); + + if all_args.len() == len { + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| self.optimize(plan, execution_props)) + .collect::>>()?; + + return utils::from_plan(plan, &plan.expressions(), &new_inputs); + } + + let all_field = all_args + .iter() + .map(|expr| expr.to_field(input.schema()).unwrap()) + .collect::>(); + + println!("all_field: {:?}", all_field); + + let agg_schema = DFSchema::new_with_metadata( + all_field, + input.schema().metadata().clone(), + ) + .unwrap(); + + let agg = LogicalPlan::Aggregate(Aggregate { + input: input.clone(), + aggr_expr: new_aggr_expr, + schema: Arc::new(agg_schema), + group_expr: group_expr.clone(), + }); + + let mut alias_expr: Vec = Vec::new(); + agg.expressions().iter().enumerate().for_each(|(i, field)| { + alias_expr.push(columnize_expr( + field.clone().alias(schema.clone().fields()[i].name()), + schema, + )); + }); + + // projection + return Ok(LogicalPlan::Projection(Projection { + expr: alias_expr, + input: Arc::new(agg), + schema: schema.clone(), + alias: Option::None, + })); + } + }; + + // Apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| self.optimize(plan, execution_props)) + .collect::>>()?; + + utils::from_plan(plan, &plan.expressions(), &new_inputs) + } + + fn name(&self) -> &str { + "eliminate_distinct_in_agg" + } +} + +fn is_max_or_min(plan: &LogicalPlan) -> bool { + match plan { + LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { + aggr_expr.iter().all(|expr| match expr { + Expr::AggregateFunction { fun, .. } => { + matches!(fun, AggregateFunction::Max | AggregateFunction::Min) + } + _ => false, + }) + } + _ => false, + } +} diff --git a/datafusion/core/src/optimizer/mod.rs b/datafusion/core/src/optimizer/mod.rs index b274ab645f54c..71864ac07a022 100644 --- a/datafusion/core/src/optimizer/mod.rs +++ b/datafusion/core/src/optimizer/mod.rs @@ -20,6 +20,7 @@ #![allow(clippy::module_inception)] pub mod common_subexpr_eliminate; +pub mod eliminate_distinct_in_agg; pub mod eliminate_filter; pub mod eliminate_limit; pub mod filter_push_down; diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index dfbefa63acd8f..4fa338f389b7f 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -24,6 +24,7 @@ use crate::logical_plan::ExprSchemable; use crate::logical_plan::{col, columnize_expr, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; +use datafusion_expr::AggregateFunction; use hashbrown::HashSet; use std::sync::Arc; @@ -61,7 +62,7 @@ fn optimize(plan: &LogicalPlan) -> Result { schema, group_expr, }) => { - if is_single_distinct_agg(plan) { + if is_single_distinct_agg_not_max_min(plan) { let mut group_fields_set = HashSet::new(); let mut all_group_args = group_expr.clone(); // remove distinct and collection args @@ -69,7 +70,7 @@ fn optimize(plan: &LogicalPlan) -> Result { .iter() .map(|agg_expr| match agg_expr { Expr::AggregateFunction { fun, args, .. } => { - // is_single_distinct_agg ensure args.len=1 + // is_single_distinct_agg_not_max_min ensure args.len=1 if group_fields_set .insert(args[0].name(input.schema()).unwrap()) { @@ -158,7 +159,7 @@ fn optimize_children(plan: &LogicalPlan) -> Result { utils::from_plan(plan, &expr, &new_inputs) } -fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { +fn is_single_distinct_agg_not_max_min(plan: &LogicalPlan) -> bool { match plan { LogicalPlan::Aggregate(Aggregate { input, aggr_expr, .. @@ -168,8 +169,18 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { .iter() .filter(|expr| { let mut is_distinct = false; - if let Expr::AggregateFunction { distinct, args, .. } = expr { - is_distinct = *distinct; + if let Expr::AggregateFunction { + distinct, + args, + fun, + } = expr + { + let is_max_min = matches!( + fun, + AggregateFunction::Max | AggregateFunction::Min + ); + + is_distinct = *distinct && is_max_min; args.iter().for_each(|expr| { fields_set.insert(expr.name(input.schema()).unwrap()); }) @@ -238,10 +249,8 @@ mod tests { .build()?; // Should work - let expected = "Projection: #COUNT(alias1) AS COUNT(DISTINCT test.b) [COUNT(DISTINCT test.b):UInt64;N]\ - \n Aggregate: groupBy=[[]], aggr=[[COUNT(#alias1)]] [COUNT(alias1):UInt64;N]\ - \n Aggregate: groupBy=[[#test.b AS alias1]], aggr=[[]] [alias1:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT #test.b)]] [COUNT(DISTINCT test.b):UInt64;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -255,10 +264,8 @@ mod tests { .aggregate(Vec::::new(), vec![count_distinct(lit(2) * col("b"))])? .build()?; - let expected = "Projection: #COUNT(alias1) AS COUNT(DISTINCT Int32(2) * test.b) [COUNT(DISTINCT Int32(2) * test.b):UInt64;N]\ - \n Aggregate: groupBy=[[]], aggr=[[COUNT(#alias1)]] [COUNT(alias1):UInt64;N]\ - \n Aggregate: groupBy=[[Int32(2) * #test.b AS alias1]], aggr=[[]] [alias1:Int32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected ="Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT Int32(2) * #test.b)]] [COUNT(DISTINCT Int32(2) * test.b):UInt64;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -273,10 +280,8 @@ mod tests { .build()?; // Should work - let expected = "Projection: #test.a AS a, #COUNT(alias1) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N]\ - \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(#alias1)]] [a:UInt32, COUNT(alias1):UInt64;N]\ - \n Aggregate: groupBy=[[#test.a, #test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(DISTINCT #test.b)]] [a:UInt32, COUNT(DISTINCT test.b):UInt64;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -319,10 +324,8 @@ mod tests { )? .build()?; // Should work - let expected = "Projection: #test.a AS a, #COUNT(alias1) AS COUNT(DISTINCT test.b), #MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, MAX(DISTINCT test.b):UInt32;N]\ - \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(#alias1), MAX(#alias1)]] [a:UInt32, COUNT(alias1):UInt64;N, MAX(alias1):UInt32;N]\ - \n Aggregate: groupBy=[[#test.a, #test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(DISTINCT #test.b), MAX(DISTINCT #test.b)]] [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, MAX(DISTINCT test.b):UInt32;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 42999a743bbd2..3dd4aa3aaccf1 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -328,11 +328,11 @@ async fn csv_query_count_distinct_expr() -> Result<()> { let sql = "SELECT count(distinct c2 % 2) FROM aggregate_test_100"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ - "+--------------------------------------------------+", - "| COUNT(DISTINCT aggregate_test_100.c2 % Int64(2)) |", - "+--------------------------------------------------+", - "| 2 |", - "+--------------------------------------------------+", + "+-------------------------------------------------------+", + "| COUNT(DISTINCT aggregate_test_100.c2 Modulo Int64(2)) |", + "+-------------------------------------------------------+", + "| 2 |", + "+-------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); Ok(())