From 97f8e5ffae68bd996e0671eaec6cd53df15ee7c3 Mon Sep 17 00:00:00 2001 From: jackwener Date: Fri, 13 May 2022 20:05:41 +0800 Subject: [PATCH 1/2] optimizer: eliminate max/min in agg --- datafusion-cli/Cargo.lock | 14 +- datafusion/core/src/execution/context.rs | 7 +- .../optimizer/eliminate_distinct_in_agg.rs | 157 ++++++++++++++++++ datafusion/core/src/optimizer/mod.rs | 1 + .../optimizer/single_distinct_to_groupby.rs | 21 ++- 5 files changed, 186 insertions(+), 14 deletions(-) create mode 100644 datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 4309a4216b1c2..c1fd60a89eb05 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -337,7 +337,7 @@ dependencies = [ [[package]] name = "datafusion" -version = "7.0.0" +version = "8.0.0" dependencies = [ "ahash", "arrow", @@ -369,7 +369,7 @@ dependencies = [ [[package]] name = "datafusion-cli" -version = "7.0.0" +version = "8.0.0" dependencies = [ "arrow", "clap", @@ -383,7 +383,7 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "7.0.0" +version = "8.0.0" dependencies = [ "arrow", "ordered-float 3.0.0", @@ -393,7 +393,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" -version = "7.0.0" +version = "8.0.0" dependencies = [ "async-trait", "chrono", @@ -406,7 +406,7 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "7.0.0" +version = "8.0.0" dependencies = [ "ahash", "arrow", @@ -416,7 +416,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "7.0.0" +version = "8.0.0" dependencies = [ "ahash", "arrow", @@ -439,7 +439,7 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "7.0.0" +version = "8.0.0" dependencies = [ "arrow", "datafusion-common", diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 7ca9a1e4d04f1..8c9eef3aafb04 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -33,8 +33,10 @@ use crate::{ MemTable, ViewTable, }, logical_plan::{PlanType, ToStringifiedPlan}, - optimizer::eliminate_filter::EliminateFilter, - optimizer::eliminate_limit::EliminateLimit, + optimizer::{ + eliminate_distinct_in_agg::EliminateDistinctInAgg, + eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit, + }, physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, @@ -1239,6 +1241,7 @@ impl SessionState { Arc::new(ProjectionPushDown::new()), Arc::new(FilterPushDown::new()), Arc::new(LimitPushDown::new()), + Arc::new(EliminateDistinctInAgg::new()), Arc::new(SingleDistinctToGroupBy::new()), ], physical_optimizers: vec![ diff --git a/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs b/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs new file mode 100644 index 0000000000000..6b8de7c6efc04 --- /dev/null +++ b/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to replace `where false` on a plan with an empty relation. +//! This saves time in planning and executing the query. +//! Note that this rule should be applied after simplify expressions optimizer rule. +use crate::error::Result; +use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{Aggregate, Projection}; +use crate::logical_plan::ExprSchemable; +use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan}; +use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::utils; +use datafusion_expr::AggregateFunction; +use std::sync::Arc; + +/// Optimization rule that elimanate the distinct in aggregation. +#[derive(Default)] +pub struct EliminateDistinctInAgg; + +impl EliminateDistinctInAgg { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +// Before optimize: +// Aggregate: groupBy=[[]], aggr=[[MAX(DISTINCT #test.c1)]] +// After optimize: +// Projection: #MAX(#test.c1) AS MAX(DISTINCT test.c1) +// Aggregate: groupBy=[[]], aggr=[[MAX(#test.c1)]] +impl OptimizerRule for EliminateDistinctInAgg { + fn optimize( + &self, + plan: &LogicalPlan, + execution_props: &ExecutionProps, + ) -> Result { + if is_max_or_min(plan) { + match plan { + LogicalPlan::Aggregate(Aggregate { + input, + aggr_expr, + schema, + group_expr, + }) => { + println!("{}", format!("{:?}", schema)); + println!("{}", format!("{:?}", aggr_expr)); + + println!("{}", format!("{:?}", input.schema())); + + println!("----------------"); + let mut all_args = group_expr.clone(); + // remove distinct + let new_aggr_expr = aggr_expr + .iter() + .map(|agg_expr| match agg_expr { + Expr::AggregateFunction { fun, args, .. } => { + let expr = Expr::AggregateFunction { + fun: fun.clone(), + args: args.clone(), + distinct: false, + }; + all_args.push(expr.clone()); + expr + } + _ => agg_expr.clone(), + }) + .collect::>(); + + let all_field = all_args + .iter() + .map(|expr| expr.to_field(input.schema()).unwrap()) + .collect::>(); + + println!("all_field: {:?}", all_field); + + let agg_schema = DFSchema::new_with_metadata( + all_field, + input.schema().metadata().clone(), + ) + .unwrap(); + + println!("{}", format!("{:?}", agg_schema)); + println!("{}", format!("{:?}", new_aggr_expr)); + + let agg = LogicalPlan::Aggregate(Aggregate { + input: input.clone(), + aggr_expr: new_aggr_expr.clone(), + schema: Arc::new(agg_schema.clone()), + group_expr: group_expr.clone(), + }); + + let mut alias_expr: Vec = Vec::new(); + agg.expressions().iter().enumerate().for_each(|(i, field)| { + alias_expr.push(columnize_expr( + field.clone().alias(schema.clone().fields()[i].name()), + &schema, + )); + }); + + println!("Projection schema {}", format!("{:?}", schema)); + println!("Projection expr {}", format!("{:?}", alias_expr)); + // projection + return Ok(LogicalPlan::Projection(Projection { + expr: alias_expr.clone(), + input: Arc::new(agg.clone()), + schema: schema.clone(), + alias: Option::None, + })); + } + _ => {} + } + }; + // Apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| self.optimize(plan, execution_props)) + .collect::>>()?; + + utils::from_plan(plan, &plan.expressions(), &new_inputs) + } + + fn name(&self) -> &str { + "eliminate_distinct_in_agg" + } +} + +fn is_max_or_min(plan: &LogicalPlan) -> bool { + match plan { + LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { + aggr_expr.iter().all(|expr| match expr { + Expr::AggregateFunction { fun, .. } => match fun { + AggregateFunction::Max | AggregateFunction::Min => true, + _ => false, + }, + _ => false, + }) + } + _ => false, + } +} diff --git a/datafusion/core/src/optimizer/mod.rs b/datafusion/core/src/optimizer/mod.rs index b274ab645f54c..71864ac07a022 100644 --- a/datafusion/core/src/optimizer/mod.rs +++ b/datafusion/core/src/optimizer/mod.rs @@ -20,6 +20,7 @@ #![allow(clippy::module_inception)] pub mod common_subexpr_eliminate; +pub mod eliminate_distinct_in_agg; pub mod eliminate_filter; pub mod eliminate_limit; pub mod filter_push_down; diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index dfbefa63acd8f..6be895bc461ad 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -24,6 +24,7 @@ use crate::logical_plan::ExprSchemable; use crate::logical_plan::{col, columnize_expr, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; +use datafusion_expr::AggregateFunction; use hashbrown::HashSet; use std::sync::Arc; @@ -61,7 +62,7 @@ fn optimize(plan: &LogicalPlan) -> Result { schema, group_expr, }) => { - if is_single_distinct_agg(plan) { + if is_single_distinct_agg_not_max_min(plan) { let mut group_fields_set = HashSet::new(); let mut all_group_args = group_expr.clone(); // remove distinct and collection args @@ -69,7 +70,7 @@ fn optimize(plan: &LogicalPlan) -> Result { .iter() .map(|agg_expr| match agg_expr { Expr::AggregateFunction { fun, args, .. } => { - // is_single_distinct_agg ensure args.len=1 + // is_single_distinct_agg_not_max_min ensure args.len=1 if group_fields_set .insert(args[0].name(input.schema()).unwrap()) { @@ -158,7 +159,7 @@ fn optimize_children(plan: &LogicalPlan) -> Result { utils::from_plan(plan, &expr, &new_inputs) } -fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { +fn is_single_distinct_agg_not_max_min(plan: &LogicalPlan) -> bool { match plan { LogicalPlan::Aggregate(Aggregate { input, aggr_expr, .. @@ -168,8 +169,18 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { .iter() .filter(|expr| { let mut is_distinct = false; - if let Expr::AggregateFunction { distinct, args, .. } = expr { - is_distinct = *distinct; + if let Expr::AggregateFunction { + distinct, + args, + fun, + } = expr + { + let is_max_min = match fun { + AggregateFunction::Max | AggregateFunction::Min => true, + _ => false, + }; + + is_distinct = *distinct && is_max_min; args.iter().for_each(|expr| { fields_set.insert(expr.name(input.schema()).unwrap()); }) From 1e55fcb9121fa34016f18e258e0cb59dd9b52df7 Mon Sep 17 00:00:00 2001 From: jackwener Date: Fri, 13 May 2022 21:23:56 +0800 Subject: [PATCH 2/2] *: fix CI and test --- .../optimizer/eliminate_distinct_in_agg.rs | 140 +++++++++--------- .../optimizer/single_distinct_to_groupby.rs | 32 ++-- datafusion/core/tests/sql/aggregates.rs | 10 +- 3 files changed, 90 insertions(+), 92 deletions(-) diff --git a/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs b/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs index 6b8de7c6efc04..a3a541280bcdc 100644 --- a/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs +++ b/datafusion/core/src/optimizer/eliminate_distinct_in_agg.rs @@ -51,25 +51,25 @@ impl OptimizerRule for EliminateDistinctInAgg { execution_props: &ExecutionProps, ) -> Result { if is_max_or_min(plan) { - match plan { - LogicalPlan::Aggregate(Aggregate { - input, - aggr_expr, - schema, - group_expr, - }) => { - println!("{}", format!("{:?}", schema)); - println!("{}", format!("{:?}", aggr_expr)); - - println!("{}", format!("{:?}", input.schema())); - - println!("----------------"); - let mut all_args = group_expr.clone(); - // remove distinct - let new_aggr_expr = aggr_expr - .iter() - .map(|agg_expr| match agg_expr { - Expr::AggregateFunction { fun, args, .. } => { + if let LogicalPlan::Aggregate(Aggregate { + input, + aggr_expr, + schema, + group_expr, + }) = plan + { + let len = group_expr.len(); + let mut all_args = group_expr.clone(); + // remove distinct + let new_aggr_expr = aggr_expr + .iter() + .map(|agg_expr| match agg_expr { + Expr::AggregateFunction { + fun, + args, + distinct, + } => { + if *distinct { let expr = Expr::AggregateFunction { fun: fun.clone(), args: args.clone(), @@ -77,55 +77,62 @@ impl OptimizerRule for EliminateDistinctInAgg { }; all_args.push(expr.clone()); expr + } else { + agg_expr.clone() } - _ => agg_expr.clone(), - }) - .collect::>(); - - let all_field = all_args + } + _ => agg_expr.clone(), + }) + .collect::>(); + + if all_args.len() == len { + let inputs = plan.inputs(); + let new_inputs = inputs .iter() - .map(|expr| expr.to_field(input.schema()).unwrap()) - .collect::>(); - - println!("all_field: {:?}", all_field); - - let agg_schema = DFSchema::new_with_metadata( - all_field, - input.schema().metadata().clone(), - ) - .unwrap(); - - println!("{}", format!("{:?}", agg_schema)); - println!("{}", format!("{:?}", new_aggr_expr)); - - let agg = LogicalPlan::Aggregate(Aggregate { - input: input.clone(), - aggr_expr: new_aggr_expr.clone(), - schema: Arc::new(agg_schema.clone()), - group_expr: group_expr.clone(), - }); - - let mut alias_expr: Vec = Vec::new(); - agg.expressions().iter().enumerate().for_each(|(i, field)| { - alias_expr.push(columnize_expr( - field.clone().alias(schema.clone().fields()[i].name()), - &schema, - )); - }); - - println!("Projection schema {}", format!("{:?}", schema)); - println!("Projection expr {}", format!("{:?}", alias_expr)); - // projection - return Ok(LogicalPlan::Projection(Projection { - expr: alias_expr.clone(), - input: Arc::new(agg.clone()), - schema: schema.clone(), - alias: Option::None, - })); + .map(|plan| self.optimize(plan, execution_props)) + .collect::>>()?; + + return utils::from_plan(plan, &plan.expressions(), &new_inputs); } - _ => {} + + let all_field = all_args + .iter() + .map(|expr| expr.to_field(input.schema()).unwrap()) + .collect::>(); + + println!("all_field: {:?}", all_field); + + let agg_schema = DFSchema::new_with_metadata( + all_field, + input.schema().metadata().clone(), + ) + .unwrap(); + + let agg = LogicalPlan::Aggregate(Aggregate { + input: input.clone(), + aggr_expr: new_aggr_expr, + schema: Arc::new(agg_schema), + group_expr: group_expr.clone(), + }); + + let mut alias_expr: Vec = Vec::new(); + agg.expressions().iter().enumerate().for_each(|(i, field)| { + alias_expr.push(columnize_expr( + field.clone().alias(schema.clone().fields()[i].name()), + schema, + )); + }); + + // projection + return Ok(LogicalPlan::Projection(Projection { + expr: alias_expr, + input: Arc::new(agg), + schema: schema.clone(), + alias: Option::None, + })); } }; + // Apply the optimization to all inputs of the plan let inputs = plan.inputs(); let new_inputs = inputs @@ -145,10 +152,9 @@ fn is_max_or_min(plan: &LogicalPlan) -> bool { match plan { LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { aggr_expr.iter().all(|expr| match expr { - Expr::AggregateFunction { fun, .. } => match fun { - AggregateFunction::Max | AggregateFunction::Min => true, - _ => false, - }, + Expr::AggregateFunction { fun, .. } => { + matches!(fun, AggregateFunction::Max | AggregateFunction::Min) + } _ => false, }) } diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index 6be895bc461ad..4fa338f389b7f 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -175,10 +175,10 @@ fn is_single_distinct_agg_not_max_min(plan: &LogicalPlan) -> bool { fun, } = expr { - let is_max_min = match fun { - AggregateFunction::Max | AggregateFunction::Min => true, - _ => false, - }; + let is_max_min = matches!( + fun, + AggregateFunction::Max | AggregateFunction::Min + ); is_distinct = *distinct && is_max_min; args.iter().for_each(|expr| { @@ -249,10 +249,8 @@ mod tests { .build()?; // Should work - let expected = "Projection: #COUNT(alias1) AS COUNT(DISTINCT test.b) [COUNT(DISTINCT test.b):UInt64;N]\ - \n Aggregate: groupBy=[[]], aggr=[[COUNT(#alias1)]] [COUNT(alias1):UInt64;N]\ - \n Aggregate: groupBy=[[#test.b AS alias1]], aggr=[[]] [alias1:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT #test.b)]] [COUNT(DISTINCT test.b):UInt64;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -266,10 +264,8 @@ mod tests { .aggregate(Vec::::new(), vec![count_distinct(lit(2) * col("b"))])? .build()?; - let expected = "Projection: #COUNT(alias1) AS COUNT(DISTINCT Int32(2) * test.b) [COUNT(DISTINCT Int32(2) * test.b):UInt64;N]\ - \n Aggregate: groupBy=[[]], aggr=[[COUNT(#alias1)]] [COUNT(alias1):UInt64;N]\ - \n Aggregate: groupBy=[[Int32(2) * #test.b AS alias1]], aggr=[[]] [alias1:Int32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected ="Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT Int32(2) * #test.b)]] [COUNT(DISTINCT Int32(2) * test.b):UInt64;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -284,10 +280,8 @@ mod tests { .build()?; // Should work - let expected = "Projection: #test.a AS a, #COUNT(alias1) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N]\ - \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(#alias1)]] [a:UInt32, COUNT(alias1):UInt64;N]\ - \n Aggregate: groupBy=[[#test.a, #test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(DISTINCT #test.b)]] [a:UInt32, COUNT(DISTINCT test.b):UInt64;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -330,10 +324,8 @@ mod tests { )? .build()?; // Should work - let expected = "Projection: #test.a AS a, #COUNT(alias1) AS COUNT(DISTINCT test.b), #MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, MAX(DISTINCT test.b):UInt32;N]\ - \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(#alias1), MAX(#alias1)]] [a:UInt32, COUNT(alias1):UInt64;N, MAX(alias1):UInt32;N]\ - \n Aggregate: groupBy=[[#test.a, #test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(DISTINCT #test.b), MAX(DISTINCT #test.b)]] [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, MAX(DISTINCT test.b):UInt32;N]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 42999a743bbd2..3dd4aa3aaccf1 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -328,11 +328,11 @@ async fn csv_query_count_distinct_expr() -> Result<()> { let sql = "SELECT count(distinct c2 % 2) FROM aggregate_test_100"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ - "+--------------------------------------------------+", - "| COUNT(DISTINCT aggregate_test_100.c2 % Int64(2)) |", - "+--------------------------------------------------+", - "| 2 |", - "+--------------------------------------------------+", + "+-------------------------------------------------------+", + "| COUNT(DISTINCT aggregate_test_100.c2 Modulo Int64(2)) |", + "+-------------------------------------------------------+", + "| 2 |", + "+-------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); Ok(())