From babfb99ad644d7c0b487b2b48b6f025c19a0da29 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 15 Dec 2022 11:39:01 +0000 Subject: [PATCH 1/3] Make OptimizerConfig a trait (#4631) (#4638) --- datafusion-examples/examples/rewrite_expr.rs | 11 +- datafusion/core/src/execution/context.rs | 58 +++--- datafusion/core/tests/user_defined_plan.rs | 6 +- datafusion/optimizer/README.md | 6 +- .../optimizer/src/common_subexpr_eliminate.rs | 60 +++---- .../optimizer/src/decorrelate_where_exists.rs | 16 +- .../optimizer/src/decorrelate_where_in.rs | 32 ++-- .../optimizer/src/eliminate_cross_join.rs | 33 ++-- datafusion/optimizer/src/eliminate_filter.rs | 22 +-- datafusion/optimizer/src/eliminate_limit.rs | 32 ++-- .../optimizer/src/eliminate_outer_join.rs | 23 +-- .../optimizer/src/filter_null_join_keys.rs | 36 ++-- datafusion/optimizer/src/inline_table_scan.rs | 16 +- datafusion/optimizer/src/lib.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 165 +++++++++++------- .../optimizer/src/propagate_empty_relation.rs | 12 +- datafusion/optimizer/src/push_down_filter.rs | 33 ++-- datafusion/optimizer/src/push_down_limit.rs | 29 +-- .../optimizer/src/push_down_projection.rs | 36 ++-- .../src/rewrite_disjunctive_predicate.rs | 10 +- .../optimizer/src/scalar_subquery_to_join.rs | 29 ++- .../simplify_expressions/simplify_exprs.rs | 18 +- .../src/single_distinct_to_groupby.rs | 19 +- .../optimizer/src/subquery_filter_to_join.rs | 15 +- datafusion/optimizer/src/test/mod.rs | 8 +- datafusion/optimizer/src/type_coercion.rs | 24 ++- .../src/unwrap_cast_in_comparison.rs | 4 +- datafusion/optimizer/src/utils.rs | 4 +- .../optimizer/tests/integration-test.rs | 8 +- 29 files changed, 358 insertions(+), 409 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 216a6932c8d16..a0be8c196e34d 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -22,7 +22,7 @@ use datafusion_expr::{ AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, }; use datafusion_optimizer::optimizer::Optimizer; -use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule}; +use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; use datafusion_sql::sqlparser::parser::Parser; @@ -47,9 +47,8 @@ pub fn main() -> Result<()> { // now run the optimizer with our custom rule let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); - let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false); - let optimized_plan = - optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?; + let config = OptimizerContext::default().with_skip_failing_rules(false); + let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; println!( "Optimized Logical Plan:\n\n{}\n", optimized_plan.display_indent() @@ -76,10 +75,10 @@ impl OptimizerRule for MyRule { fn try_optimize( &self, plan: &LogicalPlan, - _config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { // recurse down and optimize children first - let plan = utils::optimize_children(self, plan, _config)?; + let plan = utils::optimize_children(self, plan, config)?; match plan { LogicalPlan::Filter(filter) => { diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f98e21dd1dd54..16f2a29f36bf7 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -68,7 +68,7 @@ use crate::logical_expr::{ CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE, }; -use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use crate::optimizer::{OptimizerContext, OptimizerRule}; use datafusion_sql::{ResolvedTableReference, TableReference}; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; @@ -1557,14 +1557,6 @@ impl SessionState { .register_catalog(config.default_catalog.clone(), default_catalog); } - let optimizer_config = OptimizerConfig::new().filter_null_keys( - config - .config_options - .read() - .get_bool(OPT_FILTER_NULL_JOIN_KEYS) - .unwrap_or_default(), - ); - let mut physical_optimizers: Vec> = vec![ Arc::new(AggregateStatistics::new()), Arc::new(JoinSelection::new()), @@ -1593,7 +1585,7 @@ impl SessionState { SessionState { session_id, - optimizer: Optimizer::new(&optimizer_config), + optimizer: Optimizer::new(), physical_optimizers, query_planner: Arc::new(DefaultQueryPlanner {}), catalog_list, @@ -1741,24 +1733,29 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let mut optimizer_config = OptimizerConfig::new() - .with_skip_failing_rules( - self.config - .config_options - .read() - .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES) - .unwrap_or_default(), - ) - .with_max_passes( - self.config - .config_options - .read() - .get_u64(OPT_OPTIMIZER_MAX_PASSES) - .unwrap_or_default() as u8, - ) - .with_query_execution_start_time( - self.execution_props.query_execution_start_time, - ); + // TODO: Implement OptimizerContext directly on DataFrame (#4631) (#4626) + let config = { + let config_options = self.config.config_options.read(); + OptimizerContext::new() + .with_skip_failing_rules( + config_options + .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES) + .unwrap_or_default(), + ) + .with_max_passes( + config_options + .get_u64(OPT_OPTIMIZER_MAX_PASSES) + .unwrap_or_default() as u8, + ) + .with_query_execution_start_time( + self.execution_props.query_execution_start_time, + ) + .filter_null_keys( + config_options + .get_bool(OPT_FILTER_NULL_JOIN_KEYS) + .unwrap_or_default(), + ) + }; if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); @@ -1766,7 +1763,7 @@ impl SessionState { // optimize the child plan, capturing the output of each optimizer let plan = self.optimizer.optimize( e.plan.as_ref(), - &mut optimizer_config, + &config, |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; @@ -1781,8 +1778,7 @@ impl SessionState { schema: e.schema.clone(), })) } else { - self.optimizer - .optimize(plan, &mut optimizer_config, |_, _| {}) + self.optimizer.optimize(plan, &config, |_, _| {}) } } diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 4b5a7e66f18a3..89cfd48e64a5c 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { // Note: this code simply looks for the pattern of a Limit followed by a // Sort and replaces it by a TopK node. It does not handle many @@ -308,7 +308,7 @@ impl OptimizerRule for TopKOptimizerRule { node: Arc::new(TopKPlanNode { k: *fetch, input: self - .try_optimize(input.as_ref(), optimizer_config)? + .try_optimize(input.as_ref(), config)? .unwrap_or_else(|| input.as_ref().clone()), expr: expr[0].clone(), }), @@ -319,7 +319,7 @@ impl OptimizerRule for TopKOptimizerRule { // If we didn't find the Limit/Sort combination, recurse as // normal and build the result. - Ok(Some(optimize_children(self, plan, optimizer_config)?)) + Ok(Some(optimize_children(self, plan, config)?)) } fn name(&self) -> &str { diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 51bc8e3eeb308..01c6f6dd26ce1 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -42,9 +42,9 @@ and applying it to a logical plan to produce an optimized logical plan. // The `datafusion` crate provides a DataFrame API that can create a LogicalPlan let logical_plan = ... -let mut config = OptimizerConfig::default(); +let mut config = OptimizerContext::default(); let optimizer = Optimizer::new(&config); -let optimized_plan = optimizer.optimize(&logical_plan, &mut config, observe)?; +let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?; fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { println!( @@ -82,7 +82,7 @@ pub trait OptimizerRule { fn optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result; /// A human readable name for this optimizer rule diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 6885a0bfbb68d..c244685eb4d04 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -17,8 +17,11 @@ //! Eliminate common sub-expression. -use crate::{utils, OptimizerConfig, OptimizerRule}; +use std::collections::{BTreeSet, HashMap}; +use std::sync::Arc; + use arrow::datatypes::DataType; + use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::{ col, @@ -27,8 +30,8 @@ use datafusion_expr::{ logical_plan::{Aggregate, Filter, LogicalPlan, Projection, Sort, Window}, Expr, ExprSchemable, }; -use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; + +use crate::{utils, OptimizerConfig, OptimizerRule}; /// A map from expression's identifier to tuple including /// - the expression itself (cloned) @@ -60,7 +63,7 @@ impl CommonSubexprEliminate { arrays_list: &[&[Vec<(usize, String)>]], input: &LogicalPlan, expr_set: &mut ExprSet, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result<(Vec>, LogicalPlan)> { let mut affected_id = BTreeSet::::new(); @@ -80,7 +83,7 @@ impl CommonSubexprEliminate { .collect::>>()?; let mut new_input = self - .try_optimize(input, optimizer_config)? + .try_optimize(input, config)? .unwrap_or_else(|| input.clone()); if !affected_id.is_empty() { new_input = build_project_plan(new_input, affected_id, expr_set)?; @@ -94,7 +97,7 @@ impl OptimizerRule for CommonSubexprEliminate { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { let mut expr_set = ExprSet::new(); @@ -107,13 +110,8 @@ impl OptimizerRule for CommonSubexprEliminate { let input_schema = Arc::clone(input.schema()); let arrays = to_arrays(expr, input_schema, &mut expr_set)?; - let (mut new_expr, new_input) = self.rewrite_expr( - &[expr], - &[&arrays], - input, - &mut expr_set, - optimizer_config, - )?; + let (mut new_expr, new_input) = + self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?; Ok(Some(LogicalPlan::Projection( Projection::try_new_with_schema( @@ -140,7 +138,7 @@ impl OptimizerRule for CommonSubexprEliminate { &[&[id_array]], filter.input(), &mut expr_set, - optimizer_config, + config, )?; if let Some(predicate) = pop_expr(&mut new_expr)?.pop() { @@ -167,7 +165,7 @@ impl OptimizerRule for CommonSubexprEliminate { &[&arrays], input, &mut expr_set, - optimizer_config, + config, )?; Ok(Some(LogicalPlan::Window(Window { @@ -192,7 +190,7 @@ impl OptimizerRule for CommonSubexprEliminate { &[&group_arrays, &aggr_arrays], input, &mut expr_set, - optimizer_config, + config, )?; // note the reversed pop order. let new_aggr_expr = pop_expr(&mut new_expr)?; @@ -211,13 +209,8 @@ impl OptimizerRule for CommonSubexprEliminate { let input_schema = Arc::clone(input.schema()); let arrays = to_arrays(expr, input_schema, &mut expr_set)?; - let (mut new_expr, new_input) = self.rewrite_expr( - &[expr], - &[&arrays], - input, - &mut expr_set, - optimizer_config, - )?; + let (mut new_expr, new_input) = + self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?; Ok(Some(LogicalPlan::Sort(Sort { expr: pop_expr(&mut new_expr)?, @@ -249,11 +242,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Extension(_) | LogicalPlan::Prepare(_) => { // apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } @@ -572,20 +561,25 @@ fn replace_common_expr( #[cfg(test)] mod test { - use super::*; - use crate::test::*; + use std::iter; + use arrow::datatypes::{Field, Schema}; + use datafusion_expr::logical_plan::{table_scan, JoinType}; use datafusion_expr::{ avg, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, sum, Operator, }; - use std::iter; + + use crate::optimizer::OptimizerContext; + use crate::test::*; + + use super::*; fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) { let optimizer = CommonSubexprEliminate {}; let optimized_plan = optimizer - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); @@ -831,7 +825,7 @@ mod test { .unwrap(); let rule = CommonSubexprEliminate {}; let optimized_plan = rule - .try_optimize(&plan, &mut OptimizerConfig::new()) + .try_optimize(&plan, &OptimizerContext::new()) .unwrap() .unwrap(); diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 54e346a08a685..d7e6017193641 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -48,7 +48,7 @@ impl DecorrelateWhereExists { fn extract_subquery_exprs( &self, predicate: &Expr, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result<(Vec, Vec)> { let filters = split_conjunction(predicate); @@ -58,7 +58,7 @@ impl DecorrelateWhereExists { match it { Expr::Exists { subquery, negated } => { let subquery = self - .try_optimize(&subquery.subquery, optimizer_config)? + .try_optimize(&subquery.subquery, config)? .map(Arc::new) .unwrap_or_else(|| subquery.subquery.clone()); let subquery = Subquery { subquery }; @@ -77,7 +77,7 @@ impl OptimizerRule for DecorrelateWhereExists { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { @@ -86,11 +86,11 @@ impl OptimizerRule for DecorrelateWhereExists { // Apply optimizer rule to current input let optimized_input = self - .try_optimize(filter_input, optimizer_config)? + .try_optimize(filter_input, config)? .unwrap_or_else(|| filter_input.clone()); let (subqueries, other_exprs) = - self.extract_subquery_exprs(predicate, optimizer_config)?; + self.extract_subquery_exprs(predicate, config)?; let optimized_plan = LogicalPlan::Filter(Filter::try_new( predicate.clone(), Arc::new(optimized_input), @@ -114,11 +114,7 @@ impl OptimizerRule for DecorrelateWhereExists { } _ => { // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index dac5ff03b29cd..1f5008757d495 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -20,7 +20,7 @@ use crate::utils::{ only_or_err, split_conjunction, swap_table, verify_not_disjunction, }; use crate::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_common::context; +use datafusion_common::{context, Result}; use datafusion_expr::logical_plan::{Filter, JoinType, Projection, Subquery}; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use log::debug; @@ -46,7 +46,7 @@ impl DecorrelateWhereIn { fn extract_subquery_exprs( &self, predicate: &Expr, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> datafusion_common::Result<(Vec, Vec)> { let filters = split_conjunction(predicate); // TODO: disjunctions @@ -60,7 +60,7 @@ impl DecorrelateWhereIn { negated, } => { let subquery = self - .try_optimize(&subquery.subquery, optimizer_config)? + .try_optimize(&subquery.subquery, config)? .map(Arc::new) .unwrap_or_else(|| subquery.subquery.clone()); let subquery = Subquery { subquery }; @@ -81,8 +81,8 @@ impl OptimizerRule for DecorrelateWhereIn { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result> { + config: &dyn OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Filter(filter) => { let predicate = filter.predicate(); @@ -90,11 +90,11 @@ impl OptimizerRule for DecorrelateWhereIn { // Apply optimizer rule to current input let optimized_input = self - .try_optimize(filter_input, optimizer_config)? + .try_optimize(filter_input, config)? .unwrap_or_else(|| filter_input.clone()); let (subqueries, other_exprs) = - self.extract_subquery_exprs(predicate, optimizer_config)?; + self.extract_subquery_exprs(predicate, config)?; let optimized_plan = LogicalPlan::Filter(Filter::try_new( predicate.clone(), Arc::new(optimized_input), @@ -107,22 +107,14 @@ impl OptimizerRule for DecorrelateWhereIn { // iterate through all exists clauses in predicate, turning each into a join let mut cur_input = filter_input.clone(); for subquery in subqueries { - cur_input = optimize_where_in( - &subquery, - &cur_input, - &other_exprs, - optimizer_config, - )?; + cur_input = + optimize_where_in(&subquery, &cur_input, &other_exprs, config)?; } Ok(Some(cur_input)) } _ => { // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } @@ -136,7 +128,7 @@ fn optimize_where_in( query_info: &SubqueryInfo, outer_input: &LogicalPlan, outer_other_exprs: &[Expr], - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> datafusion_common::Result { let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("a projection is required", e))?; @@ -179,7 +171,7 @@ fn optimize_where_in( merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], &outer_cols)); // build subquery side of join - the thing the subquery was querying - let subqry_alias = format!("__sq_{}", optimizer_config.next_id()); + let subqry_alias = format!("__sq_{}", config.next_id()); let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone()); if let Some(expr) = conjunction(other_subqry_exprs) { // if the subquery had additional expressions, restore them diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index a00362d70e38a..f2fb35d3e1a87 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -16,7 +16,9 @@ // under the License. //! Optimizer rule to eliminate cross join to inner join if join predicates are available in filters. -use crate::{utils, OptimizerConfig, OptimizerRule}; +use std::collections::HashSet; +use std::sync::Arc; + use datafusion_common::{DataFusionError, Result}; use datafusion_expr::expr::{BinaryExpr, Expr}; use datafusion_expr::logical_plan::{ @@ -27,8 +29,8 @@ use datafusion_expr::{ and, build_join_schema, or, wrap_projection_for_join_if_necessary, ExprSchemable, Operator, }; -use std::collections::HashSet; -use std::sync::Arc; + +use crate::{utils, OptimizerConfig, OptimizerRule}; #[derive(Default)] pub struct EliminateCrossJoin; @@ -54,7 +56,7 @@ impl OptimizerRule for EliminateCrossJoin { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { @@ -78,11 +80,7 @@ impl OptimizerRule for EliminateCrossJoin { )?; } _ => { - return Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)); + return Ok(Some(utils::optimize_children(self, plan, config)?)); } } @@ -102,7 +100,7 @@ impl OptimizerRule for EliminateCrossJoin { )?; } - left = utils::optimize_children(self, &left, optimizer_config)?; + left = utils::optimize_children(self, &left, config)?; if plan.schema() != left.schema() { left = LogicalPlan::Projection(Projection::new_from_schema( @@ -128,11 +126,7 @@ impl OptimizerRule for EliminateCrossJoin { } } - _ => Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)), + _ => Ok(Some(utils::optimize_children(self, plan, config)?)), } } @@ -378,18 +372,21 @@ fn remove_join_expressions( #[cfg(test)] mod tests { - use super::*; - use crate::test::*; use datafusion_expr::{ binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Operator::{And, Or}, }; + use crate::optimizer::OptimizerContext; + use crate::test::*; + + use super::*; + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) { let rule = EliminateCrossJoin::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted = optimized_plan.display_indent_schema().to_string(); diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index 046d7b11f01d9..7636a6a9fcc7b 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -18,13 +18,14 @@ //! Optimizer rule to replace `where false` on a plan with an empty relation. //! This saves time in planning and executing the query. //! Note that this rule should be applied after simplify expressions optimizer rule. -use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ logical_plan::{EmptyRelation, LogicalPlan}, Expr, }; +use crate::{utils, OptimizerConfig, OptimizerRule}; + /// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation] #[derive(Default)] pub struct EliminateFilter; @@ -40,7 +41,7 @@ impl OptimizerRule for EliminateFilter { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { let predicate_and_input = match plan { LogicalPlan::Filter(filter) => match filter.predicate() { @@ -53,18 +54,14 @@ impl OptimizerRule for EliminateFilter { }; match predicate_and_input { - Some((true, input)) => self.try_optimize(input, optimizer_config), + Some((true, input)) => self.try_optimize(input, config), Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: input.schema().clone(), }))), None => { // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } @@ -76,14 +73,17 @@ impl OptimizerRule for EliminateFilter { #[cfg(test)] mod tests { - use super::*; - use crate::test::*; use datafusion_expr::{col, lit, logical_plan::builder::LogicalPlanBuilder, sum}; + use crate::optimizer::OptimizerContext; + use crate::test::*; + + use super::*; + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateFilter::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index 45844e120426d..eb9d8bf3140c3 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -20,10 +20,11 @@ //! on a plan with an empty relation. //! This rule also removes OFFSET 0 from the [LogicalPlan] //! This saves time in planning and executing the query. -use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan}; +use crate::{utils, OptimizerConfig, OptimizerRule}; + /// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None). /// It can cooperate with `propagate_empty_relation` and `limit_push_down`. #[derive(Default)] @@ -40,7 +41,7 @@ impl OptimizerRule for EliminateLimit { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { if let LogicalPlan::Limit(limit) = plan { match limit.fetch { @@ -55,20 +56,12 @@ impl OptimizerRule for EliminateLimit { None => { if limit.skip == 0 { let input = &*limit.input; - return Ok(Some(utils::optimize_children( - self, - input, - optimizer_config, - )?)); + return Ok(Some(utils::optimize_children(self, input, config)?)); } } } } - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } fn name(&self) -> &str { @@ -78,9 +71,6 @@ impl OptimizerRule for EliminateLimit { #[cfg(test)] mod tests { - use super::*; - use crate::push_down_limit::PushDownLimit; - use crate::test::*; use datafusion_common::Column; use datafusion_expr::{ col, @@ -88,9 +78,15 @@ mod tests { sum, }; + use crate::optimizer::OptimizerContext; + use crate::push_down_limit::PushDownLimit; + use crate::test::*; + + use super::*; + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { let optimized_plan = EliminateLimit::new() - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); @@ -104,11 +100,11 @@ mod tests { expected: &str, ) -> Result<()> { let optimized_plan = PushDownLimit::new() - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let optimized_plan = EliminateLimit::new() - .try_optimize(&optimized_plan, &mut OptimizerConfig::new()) + .try_optimize(&optimized_plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index a78d61dace929..128cf4729b92c 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -65,7 +65,7 @@ impl OptimizerRule for EliminateOuterJoin { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => match filter.input().as_ref() { @@ -110,23 +110,11 @@ impl OptimizerRule for EliminateOuterJoin { null_equals_null: join.null_equals_null, }); let new_plan = from_plan(plan, &plan.expressions(), &[new_join])?; - Ok(Some(utils::optimize_children( - self, - &new_plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, &new_plan, config)?)) } - _ => Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)), + _ => Ok(Some(utils::optimize_children(self, plan, config)?)), }, - _ => Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)), + _ => Ok(Some(utils::optimize_children(self, plan, config)?)), } } @@ -308,6 +296,7 @@ fn extract_non_nullable_columns( #[cfg(test)] mod tests { use super::*; + use crate::optimizer::OptimizerContext; use crate::test::*; use arrow::datatypes::DataType; use datafusion_expr::{ @@ -320,7 +309,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { let rule = EliminateOuterJoin::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index 8563c13a875a8..940f966b60f6a 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -20,12 +20,14 @@ //! and then insert an `IsNotNull` filter on the nullable side since null values //! can never match. -use crate::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_common::{Column, DFField, DFSchemaRef}; +use std::sync::Arc; + +use datafusion_common::{Column, DFField, DFSchemaRef, Result}; use datafusion_expr::{ and, logical_plan::Filter, logical_plan::JoinType, Expr, LogicalPlan, }; -use std::sync::Arc; + +use crate::{utils, OptimizerConfig, OptimizerRule}; /// The FilterNullJoinKeys rule will identify inner joins with equi-join conditions /// where the join key is nullable on one side and non-nullable on the other side @@ -34,22 +36,26 @@ use std::sync::Arc; #[derive(Default)] pub struct FilterNullJoinKeys {} +impl FilterNullJoinKeys { + pub const NAME: &'static str = "filter_null_join_keys"; +} + impl OptimizerRule for FilterNullJoinKeys { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result> { + config: &dyn OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Join(join) if join.join_type == JoinType::Inner => { // recurse down first and optimize inputs let mut join = join.clone(); join.left = Arc::new( - self.try_optimize(&join.left, optimizer_config)? + self.try_optimize(&join.left, config)? .unwrap_or_else(|| join.left.as_ref().clone()), ); join.right = Arc::new( - self.try_optimize(&join.right, optimizer_config)? + self.try_optimize(&join.right, config)? .unwrap_or_else(|| join.right.as_ref().clone()), ); @@ -90,17 +96,13 @@ impl OptimizerRule for FilterNullJoinKeys { } _ => { // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } fn name(&self) -> &str { - "filter_null_join_keys" + Self::NAME } } @@ -147,15 +149,19 @@ fn resolve_fields( #[cfg(test)] mod tests { - use super::*; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{Column, Result}; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{logical_plan::JoinType, LogicalPlanBuilder}; + use crate::optimizer::OptimizerContext; + + use super::*; + fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterNullJoinKeys::default(); - rule.try_optimize(plan, &mut OptimizerConfig::new()) + rule.try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan") } diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index d8283bf7148d3..fe24e675de0c0 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -38,7 +38,7 @@ impl OptimizerRule for InlineTableScan { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { // Match only on scans without filter / projection / fetch @@ -52,8 +52,7 @@ impl OptimizerRule for InlineTableScan { }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recursively apply optimization - let plan = - utils::optimize_children(self, sub_plan, optimizer_config)?; + let plan = utils::optimize_children(self, sub_plan, config)?; let plan = LogicalPlanBuilder::from(plan) .project(vec![Expr::Wildcard])? .alias(table_name)?; @@ -67,11 +66,7 @@ impl OptimizerRule for InlineTableScan { // Rest: Recurse _ => { // apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } @@ -88,7 +83,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; - use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule}; + use crate::optimizer::OptimizerContext; + use crate::{inline_table_scan::InlineTableScan, OptimizerRule}; pub struct RawTableSource {} @@ -158,7 +154,7 @@ mod tests { let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); let optimized_plan = rule - .try_optimize(&plan, &mut OptimizerConfig::new()) + .try_optimize(&plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 1bf7ad58bb957..a4804ca5b0da0 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -41,5 +41,5 @@ pub mod rewrite_disjunctive_predicate; pub mod test; pub mod unwrap_cast_in_comparison; -pub use optimizer::{OptimizerConfig, OptimizerRule}; +pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; pub use utils::optimize_children; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 4e9eadc47e5b6..43fb34b19bd4d 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -41,6 +41,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use log::{debug, trace, warn}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; @@ -54,7 +55,7 @@ pub trait OptimizerRule { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result>; /// A human readable name for this optimizer rule @@ -62,15 +63,35 @@ pub trait OptimizerRule { } /// Options to control the DataFusion Optimizer. +pub trait OptimizerConfig { + /// Return the time at which the query execution started. This + /// time is used as the value for now() + fn query_execution_start_time(&self) -> DateTime; + + /// Returns false if the given rule should be skipped + fn rule_enabled(&self, name: &str) -> bool; + + /// The optimizer will skip failing rules if this returns true + fn skip_failing_rules(&self) -> bool; + + /// How many times to attempt to optimize the plan + fn max_passes(&self) -> u8; + + /// Return a unique ID + /// + /// This is useful for assigning unique names to aliases + fn next_id(&self) -> usize; +} + +/// A standalone [`OptimizerConfig`] that can be used independently +/// of DataFusion's config management #[derive(Debug)] -pub struct OptimizerConfig { +pub struct OptimizerContext { /// Query execution start time that can be used to rewrite /// expressions such as `now()` to use a literal value instead query_execution_start_time: DateTime, /// id generator for optimizer passes - // TODO this should not be on the config, - // it should be its own 'OptimizerState' or something) - next_id: usize, + next_id: AtomicUsize, /// Option to skip rules that produce errors skip_failing_rules: bool, /// Specify whether to enable the filter_null_keys rule @@ -79,12 +100,12 @@ pub struct OptimizerConfig { max_passes: u8, } -impl OptimizerConfig { +impl OptimizerContext { /// Create optimizer config pub fn new() -> Self { Self { query_execution_start_time: Utc::now(), - next_id: 0, // useful for generating things like unique subquery aliases + next_id: AtomicUsize::new(1), skip_failing_rules: true, filter_null_keys: true, max_passes: 3, @@ -119,24 +140,36 @@ impl OptimizerConfig { self.max_passes = v; self } +} - /// Generate the next ID needed - pub fn next_id(&mut self) -> usize { - self.next_id += 1; - self.next_id +impl Default for OptimizerContext { + /// Create optimizer config + fn default() -> Self { + Self::new() } +} - /// Return the time at which the query execution started. This - /// time is used as the value for now() - pub fn query_execution_start_time(&self) -> DateTime { +impl OptimizerConfig for OptimizerContext { + fn query_execution_start_time(&self) -> DateTime { self.query_execution_start_time } -} -impl Default for OptimizerConfig { - /// Create optimizer config - fn default() -> Self { - Self::new() + fn rule_enabled(&self, name: &str) -> bool { + self.filter_null_keys || name != FilterNullJoinKeys::NAME + } + + fn skip_failing_rules(&self) -> bool { + self.skip_failing_rules + } + + fn max_passes(&self) -> u8 { + self.max_passes + } + + fn next_id(&self) -> usize { + use std::sync::atomic::Ordering; + // Can use relaxed ordering as not used for synchronisation + self.next_id.fetch_add(1, Ordering::Relaxed) } } @@ -147,10 +180,16 @@ pub struct Optimizer { pub rules: Vec>, } +impl Default for Optimizer { + fn default() -> Self { + Self::new() + } +} + impl Optimizer { /// Create a new optimizer using the recommended list of rules - pub fn new(config: &OptimizerConfig) -> Self { - let mut rules: Vec> = vec![ + pub fn new() -> Self { + let rules: Vec> = vec![ Arc::new(InlineTableScan::new()), Arc::new(TypeCoercion::new()), Arc::new(SimplifyExpressions::new()), @@ -169,22 +208,19 @@ impl Optimizer { Arc::new(EliminateLimit::new()), Arc::new(PropagateEmptyRelation::new()), Arc::new(RewriteDisjunctivePredicate::new()), + Arc::new(FilterNullJoinKeys::default()), + Arc::new(EliminateOuterJoin::new()), + // Filters can't be pushed down past Limits, we should do PushDownFilter after LimitPushDown + Arc::new(PushDownLimit::new()), + Arc::new(PushDownFilter::new()), + Arc::new(SingleDistinctToGroupBy::new()), + // The previous optimizations added expressions and projections, + // that might benefit from the following rules + Arc::new(SimplifyExpressions::new()), + Arc::new(UnwrapCastInComparison::new()), + Arc::new(CommonSubexprEliminate::new()), + Arc::new(PushDownProjection::new()), ]; - if config.filter_null_keys { - rules.push(Arc::new(FilterNullJoinKeys::default())); - } - rules.push(Arc::new(EliminateOuterJoin::new())); - // Filters can't be pushed down past Limits, we should do PushDownFilter after LimitPushDown - rules.push(Arc::new(PushDownLimit::new())); - rules.push(Arc::new(PushDownFilter::new())); - rules.push(Arc::new(SingleDistinctToGroupBy::new())); - - // The previous optimizations added expressions and projections, - // that might benefit from the following rules - rules.push(Arc::new(SimplifyExpressions::new())); - rules.push(Arc::new(UnwrapCastInComparison::new())); - rules.push(Arc::new(CommonSubexprEliminate::new())); - rules.push(Arc::new(PushDownProjection::new())); Self::with_rules(rules) } @@ -199,7 +235,7 @@ impl Optimizer { pub fn optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, mut observer: F, ) -> Result where @@ -209,11 +245,15 @@ impl Optimizer { let mut plan_str = format!("{}", plan.display_indent()); let mut new_plan = plan.clone(); let mut i = 0; - while i < optimizer_config.max_passes { + while i < config.max_passes() { log_plan(&format!("Optimizer input (pass {})", i), &new_plan); for rule in &self.rules { - let result = rule.try_optimize(&new_plan, optimizer_config); + if !config.rule_enabled(rule.name()) { + continue; + } + + let result = rule.try_optimize(&new_plan, config); match result { Ok(Some(plan)) => { if !plan.schema().equivalent_names_and_types(new_plan.schema()) { @@ -237,7 +277,7 @@ impl Optimizer { ); } Err(ref e) => { - if optimizer_config.skip_failing_rules { + if config.skip_failing_rules() { // Note to future readers: if you see this warning it signals a // bug in the DataFusion optimizer. Please consider filing a ticket // https://github.com/apache/arrow-datafusion @@ -286,51 +326,49 @@ fn log_plan(description: &str, plan: &LogicalPlan) { mod tests { use crate::optimizer::Optimizer; use crate::test::test_table_scan; - use crate::{OptimizerConfig, OptimizerRule}; - use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError}; + use crate::{OptimizerConfig, OptimizerContext, OptimizerRule}; + use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::logical_plan::EmptyRelation; use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection}; use std::sync::Arc; #[test] - fn skip_failing_rule() -> Result<(), DataFusionError> { + fn skip_failing_rule() { let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]); - let mut config = OptimizerConfig::new().with_skip_failing_rules(true); + let config = OptimizerContext::new().with_skip_failing_rules(true); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); - opt.optimize(&plan, &mut config, &observe)?; - Ok(()) + opt.optimize(&plan, &config, &observe).unwrap(); } #[test] - fn no_skip_failing_rule() -> Result<(), DataFusionError> { + fn no_skip_failing_rule() { let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]); - let mut config = OptimizerConfig::new().with_skip_failing_rules(false); + let config = OptimizerContext::new().with_skip_failing_rules(false); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); - let result = opt.optimize(&plan, &mut config, &observe); + let err = opt.optimize(&plan, &config, &observe).unwrap_err(); assert_eq!( "Internal error: Optimizer rule 'bad rule' failed due to unexpected error: \ Error during planning: rule failed. This was likely caused by a bug in \ DataFusion's code and we would welcome that you file an bug report in our issue tracker", - format!("{}", result.err().unwrap()) + err.to_string() ); - Ok(()) } #[test] - fn generate_different_schema() -> Result<(), DataFusionError> { + fn generate_different_schema() { let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]); - let mut config = OptimizerConfig::new().with_skip_failing_rules(false); + let config = OptimizerContext::new().with_skip_failing_rules(false); let plan = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: Arc::new(DFSchema::empty()), }); - let result = opt.optimize(&plan, &mut config, &observe); + let err = opt.optimize(&plan, &config, &observe).unwrap_err(); assert_eq!( "Internal error: Optimizer rule 'get table_scan rule' failed, due to generate a different schema, \ original schema: DFSchema { fields: [], metadata: {} }, \ @@ -341,9 +379,8 @@ mod tests { metadata: {} }. \ This was likely caused by a bug in DataFusion's code \ and we would welcome that you file an bug report in our issue tracker", - format!("{}", result.err().unwrap()) + err.to_string() ); - Ok(()) } #[test] @@ -351,7 +388,7 @@ mod tests { // if the plan creates more metadata than previously (because // some wrapping functions are removed, etc) do not error let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]); - let mut config = OptimizerConfig::new().with_skip_failing_rules(false); + let config = OptimizerContext::new().with_skip_failing_rules(false); let input = Arc::new(test_table_scan().unwrap()); let input_schema = input.schema().clone(); @@ -364,7 +401,7 @@ mod tests { // optimizing should be ok, but the schema will have changed (no metadata) assert_ne!(plan.schema().as_ref(), input_schema.as_ref()); - let optimized_plan = opt.optimize(&plan, &mut config, &observe).unwrap(); + let optimized_plan = opt.optimize(&plan, &config, &observe).unwrap(); // metadata was removed assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref()); } @@ -399,9 +436,9 @@ mod tests { impl OptimizerRule for BadRule { fn try_optimize( &self, - _plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result> { + _: &LogicalPlan, + _: &dyn OptimizerConfig, + ) -> Result> { Err(DataFusionError::Plan("rule failed".to_string())) } @@ -416,9 +453,9 @@ mod tests { impl OptimizerRule for GetTableScanRule { fn try_optimize( &self, - _plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, - ) -> datafusion_common::Result> { + _: &LogicalPlan, + _: &dyn OptimizerConfig, + ) -> Result> { let table_scan = test_table_scan()?; Ok(Some(LogicalPlanBuilder::from(table_scan).build()?)) } diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 9b5869366d5ff..952b97fcd9b88 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -37,11 +37,10 @@ impl OptimizerRule for PropagateEmptyRelation { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { // optimize child plans first - let optimized_children_plan = - utils::optimize_children(self, plan, optimizer_config)?; + let optimized_children_plan = utils::optimize_children(self, plan, config)?; match &optimized_children_plan { LogicalPlan::EmptyRelation(_) => Ok(Some(optimized_children_plan)), LogicalPlan::Projection(_) @@ -204,6 +203,7 @@ fn empty_child(plan: &LogicalPlan) -> Result> { mod tests { use crate::eliminate_filter::EliminateFilter; use crate::test::{test_table_scan, test_table_scan_with_name}; + use crate::OptimizerContext; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::logical_plan::table_scan; @@ -217,7 +217,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = PropagateEmptyRelation::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); @@ -227,11 +227,11 @@ mod tests { fn assert_together_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let optimize_one = EliminateFilter::new() - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let optimize_two = PropagateEmptyRelation::new() - .try_optimize(&optimize_one, &mut OptimizerConfig::new()) + .try_optimize(&optimize_one, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimize_two); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index fc2a2d84ad361..279f316c5783d 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -506,7 +506,7 @@ impl OptimizerRule for PushDownFilter { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { let filter = match plan { LogicalPlan::Filter(filter) => filter, @@ -517,22 +517,12 @@ impl OptimizerRule for PushDownFilter { Some(optimized_plan) => Ok(Some(utils::optimize_children( self, &optimized_plan, - optimizer_config, - )?)), - None => Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, + config, )?)), + None => Ok(Some(utils::optimize_children(self, plan, config)?)), }; } - _ => { - return Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) - } + _ => return Ok(Some(utils::optimize_children(self, plan, config)?)), }; let child_plan = &**filter.input(); @@ -544,7 +534,7 @@ impl OptimizerRule for PushDownFilter { new_predicate, child_filter.input().clone(), )?); - return self.try_optimize(&new_plan, optimizer_config); + return self.try_optimize(&new_plan, config); } LogicalPlan::Repartition(_) | LogicalPlan::Distinct(_) @@ -745,11 +735,7 @@ impl OptimizerRule for PushDownFilter { _ => plan.clone(), }; - Ok(Some(utils::optimize_children( - self, - &new_plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, &new_plan, config)?)) } } @@ -786,6 +772,7 @@ fn replace_cols_by_name(e: Expr, replace_map: &HashMap) -> Result< mod tests { use super::*; use crate::test::*; + use crate::OptimizerContext; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion_common::DFSchema; @@ -798,7 +785,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { let optimized_plan = PushDownFilter::new() - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); @@ -1946,7 +1933,7 @@ mod tests { table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?; let optimised_plan = PushDownFilter::new() - .try_optimize(&plan, &mut OptimizerConfig::new()) + .try_optimize(&plan, &OptimizerContext::new()) .expect("failed to optimize plan") .unwrap(); @@ -2298,7 +2285,7 @@ mod tests { // Originally global state which can help to avoid duplicate Filters been generated and pushed down. // Now the global state is removed. Need to double confirm that avoid duplicate Filters. let optimized_plan = PushDownFilter::new() - .try_optimize(&plan, &mut OptimizerConfig::new()) + .try_optimize(&plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); assert_optimized_plan_eq(&optimized_plan, expected) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 22e837a6c439f..86d8af9accaa3 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -78,17 +78,11 @@ impl OptimizerRule for PushDownLimit { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { let limit = match plan { LogicalPlan::Limit(limit) => limit, - _ => { - return Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) - } + _ => return Ok(Some(utils::optimize_children(self, plan, config)?)), }; if let LogicalPlan::Limit(child_limit) = &*limit.input { @@ -118,18 +112,12 @@ impl OptimizerRule for PushDownLimit { fetch: new_fetch, input: Arc::new((*child_limit.input).clone()), }); - return self.try_optimize(&plan, optimizer_config); + return self.try_optimize(&plan, config); } let fetch = match limit.fetch { Some(fetch) => fetch, - None => { - return Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) - } + None => return Ok(Some(utils::optimize_children(self, plan, config)?)), }; let skip = limit.skip; @@ -237,11 +225,7 @@ impl OptimizerRule for PushDownLimit { _ => plan.clone(), }; - Ok(Some(utils::optimize_children( - self, - &plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, &plan, config)?)) } fn name(&self) -> &str { @@ -263,6 +247,7 @@ mod test { use super::*; use crate::test::*; + use crate::OptimizerContext; use datafusion_expr::{ col, exists, logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan}, @@ -271,7 +256,7 @@ mod test { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { let optimized_plan = PushDownLimit::new() - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 642aa188e1671..e50f515297d77 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -49,7 +49,7 @@ impl OptimizerRule for PushDownProjection { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { // set of all columns referred by the plan (and thus considered required by the root) let required_columns = plan @@ -63,7 +63,7 @@ impl OptimizerRule for PushDownProjection { plan, &required_columns, false, - optimizer_config, + config, )?)) } @@ -85,7 +85,7 @@ fn optimize_plan( plan: &LogicalPlan, required_columns: &HashSet, // set of columns required up to this step has_projection: bool, - _optimizer_config: &OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result { let mut new_required_columns = required_columns.clone(); let new_plan = match plan { @@ -117,13 +117,8 @@ fn optimize_plan( expr_to_columns(e, &mut new_required_columns)? } - let new_input = optimize_plan( - _optimizer, - input, - &new_required_columns, - true, - _optimizer_config, - )?; + let new_input = + optimize_plan(_optimizer, input, &new_required_columns, true, _config)?; let new_required_columns_optimized = new_input .schema() @@ -174,7 +169,7 @@ fn optimize_plan( left, &new_required_columns, true, - _optimizer_config, + _config, )?); let optimized_right = Arc::new(optimize_plan( @@ -182,7 +177,7 @@ fn optimize_plan( right, &new_required_columns, true, - _optimizer_config, + _config, )?); let schema = build_join_schema( @@ -229,7 +224,7 @@ fn optimize_plan( input, required_columns, true, - _optimizer_config, + _config, )?) .build(); }; @@ -245,7 +240,7 @@ fn optimize_plan( input, &new_required_columns, true, - _optimizer_config, + _config, )?) .window(new_window_expr)? .build() @@ -286,7 +281,7 @@ fn optimize_plan( input, &new_required_columns, true, - _optimizer_config, + _config, )?), group_expr.clone(), new_aggr_expr, @@ -316,7 +311,7 @@ fn optimize_plan( &a.input, &required_columns, false, - _optimizer_config, + _config, )?), verbose: a.verbose, schema: a.schema.clone(), @@ -348,7 +343,7 @@ fn optimize_plan( input_plan, &new_required_columns, has_projection, - _optimizer_config, + _config, ) }) .collect::>>()?; @@ -374,7 +369,7 @@ fn optimize_plan( input, &new_required_columns, has_projection, - _optimizer_config, + _config, )?; from_plan(plan, &plan.expressions(), &[child]) } @@ -413,7 +408,7 @@ fn optimize_plan( input_plan, &new_required_columns, has_projection, - _optimizer_config, + _config, ) }) .collect::>>()?; @@ -527,6 +522,7 @@ fn push_down_scan( mod tests { use super::*; use crate::test::*; + use crate::OptimizerContext; use arrow::datatypes::{DataType, Schema}; use datafusion_expr::expr::Cast; use datafusion_expr::{ @@ -1020,7 +1016,7 @@ mod tests { fn optimize(plan: &LogicalPlan) -> Result { let rule = PushDownProjection::new(); Ok(rule - .try_optimize(plan, &mut OptimizerConfig::new())? + .try_optimize(plan, &OptimizerContext::new())? .unwrap()) } } diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 079046273f0fd..0f9ba3d371e12 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -127,7 +127,7 @@ impl OptimizerRule for RewriteDisjunctivePredicate { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { @@ -136,16 +136,12 @@ impl OptimizerRule for RewriteDisjunctivePredicate { let rewritten_expr = normalize_predicate(rewritten_predicate); Ok(Some(LogicalPlan::Filter(Filter::try_new( rewritten_expr, - self.try_optimize(filter.input(), optimizer_config)? + self.try_optimize(filter.input(), config)? .map(Arc::new) .unwrap_or_else(|| filter.input().clone()), )?))) } - _ => Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)), + _ => Ok(Some(utils::optimize_children(self, plan, config)?)), } } diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 3ee46c8d6c47f..51e1387dffded 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -47,7 +47,7 @@ impl ScalarSubqueryToJoin { fn extract_subquery_exprs( &self, predicate: &Expr, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result<(Vec, Vec)> { let filters = split_conjunction(predicate); // TODO: disjunctions @@ -69,7 +69,7 @@ impl ScalarSubqueryToJoin { _ => return Ok(()), }; let subquery = self - .try_optimize(&subquery.subquery, optimizer_config)? + .try_optimize(&subquery.subquery, config)? .map(Arc::new) .unwrap_or_else(|| subquery.subquery.clone()); let subquery = Subquery { subquery }; @@ -93,17 +93,17 @@ impl OptimizerRule for ScalarSubqueryToJoin { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input let optimized_input = self - .try_optimize(filter.input(), optimizer_config)? + .try_optimize(filter.input(), config)? .unwrap_or_else(|| filter.input().as_ref().clone()); let (subqueries, other_exprs) = - self.extract_subquery_exprs(filter.predicate(), optimizer_config)?; + self.extract_subquery_exprs(filter.predicate(), config)?; if subqueries.is_empty() { // regular filter, no subquery exists clause here @@ -116,12 +116,9 @@ impl OptimizerRule for ScalarSubqueryToJoin { // iterate through all subqueries in predicate, turning each into a join let mut cur_input = filter.input().as_ref().clone(); for subquery in subqueries { - if let Some(optimized_subquery) = optimize_scalar( - &subquery, - &cur_input, - &other_exprs, - optimizer_config, - )? { + if let Some(optimized_subquery) = + optimize_scalar(&subquery, &cur_input, &other_exprs, config)? + { cur_input = optimized_subquery; } else { // if we can't handle all of the subqueries then bail for now @@ -135,11 +132,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { } _ => { // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } @@ -189,7 +182,7 @@ fn optimize_scalar( query_info: &SubqueryInfo, filter_input: &LogicalPlan, outer_others: &[Expr], - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { let subquery = query_info.query.subquery.as_ref(); debug!( @@ -258,7 +251,7 @@ fn optimize_scalar( } // Only operate if one column is present and the other closed upon from outside scope - let subqry_alias = format!("__sq_{}", optimizer_config.next_id()); + let subqry_alias = format!("__sq_{}", config.next_id()); let group_by: Vec<_> = subqry_cols .iter() .map(|it| Expr::Column(it.clone())) diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index f56776b2720f9..dd89e9095bd16 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -46,11 +46,10 @@ impl OptimizerRule for SimplifyExpressions { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { let mut execution_props = ExecutionProps::new(); - execution_props.query_execution_start_time = - optimizer_config.query_execution_start_time(); + execution_props.query_execution_start_time = config.query_execution_start_time(); Ok(Some(Self::optimize_internal(plan, &execution_props)?)) } } @@ -128,6 +127,7 @@ mod tests { use datafusion_common::ScalarValue; use datafusion_expr::{or, Between, BinaryExpr, Cast, Operator}; + use crate::OptimizerContext; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ and, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, @@ -172,7 +172,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { let rule = SimplifyExpressions::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); @@ -380,12 +380,11 @@ mod tests { // expect optimizing will result in an error, returning the error string fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime) -> String { - let mut config = - OptimizerConfig::new().with_query_execution_start_time(*date_time); + let config = OptimizerContext::new().with_query_execution_start_time(*date_time); let rule = SimplifyExpressions::new(); let err = rule - .try_optimize(plan, &mut config) + .try_optimize(plan, &config) .expect_err("expected optimization to fail"); err.to_string() @@ -395,12 +394,11 @@ mod tests { plan: &LogicalPlan, date_time: &DateTime, ) -> String { - let mut config = - OptimizerConfig::new().with_query_execution_start_time(*date_time); + let config = OptimizerContext::new().with_query_execution_start_time(*date_time); let rule = SimplifyExpressions::new(); let optimized_plan = rule - .try_optimize(plan, &mut config) + .try_optimize(plan, &config) .unwrap() .expect("failed to optimize plan"); format!("{:?}", optimized_plan) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index e4878bd16441a..d8ebc376bc087 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -86,7 +86,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Aggregate(Aggregate { @@ -156,7 +156,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { Vec::new(), )?); let inner_agg = - utils::optimize_children(self, &grouped_aggr, optimizer_config)?; + utils::optimize_children(self, &grouped_aggr, config)?; let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata( outer_group_exprs @@ -200,18 +200,10 @@ impl OptimizerRule for SingleDistinctToGroupBy { )?, ))) } else { - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } - _ => Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)), + _ => Ok(Some(utils::optimize_children(self, plan, config)?)), } } fn name(&self) -> &str { @@ -223,6 +215,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { mod tests { use super::*; use crate::test::*; + use crate::OptimizerContext; use datafusion_expr::expr::GroupingSet; use datafusion_expr::{ col, count, count_distinct, lit, logical_plan::builder::LogicalPlanBuilder, max, @@ -232,7 +225,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SingleDistinctToGroupBy::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index 36c18ea749634..436d478b9ec0c 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -52,13 +52,13 @@ impl OptimizerRule for SubqueryFilterToJoin { fn try_optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input let optimized_input = self - .try_optimize(filter.input(), optimizer_config)? + .try_optimize(filter.input(), config)? .unwrap_or_else(|| filter.input().as_ref().clone()); // Splitting filter expression into components by AND @@ -98,7 +98,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } => { let right_input = self.try_optimize( &subquery.subquery, - optimizer_config + config )?.unwrap_or_else(||subquery.subquery.as_ref().clone()); let right_schema = right_input.schema(); if right_schema.fields().len() != 1 { @@ -168,11 +168,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } _ => { // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children( - self, - plan, - optimizer_config, - )?)) + Ok(Some(utils::optimize_children(self, plan, config)?)) } } } @@ -204,6 +200,7 @@ fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Res mod tests { use super::*; use crate::test::*; + use crate::OptimizerContext; use datafusion_expr::{ and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery, or, Operator, @@ -212,7 +209,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SubqueryFilterToJoin::new(); let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 1f51b38f68106..462b94dd0d050 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan, LogicalPlanBuilder}; @@ -107,7 +107,7 @@ pub fn assert_optimized_plan_eq( expected: &str, ) { let optimized_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); @@ -119,7 +119,7 @@ pub fn assert_optimizer_err( plan: &LogicalPlan, expected: &str, ) { - let res = rule.try_optimize(plan, &mut OptimizerConfig::new()); + let res = rule.try_optimize(plan, &OptimizerContext::new()); match res { Ok(plan) => assert_eq!(format!("{}", plan.unwrap().display_indent()), "An error"), Err(ref e) => { @@ -133,7 +133,7 @@ pub fn assert_optimizer_err( pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: &LogicalPlan) { let new_plan = rule - .try_optimize(plan, &mut OptimizerConfig::new()) + .try_optimize(plan, &OptimizerContext::new()) .unwrap() .unwrap(); assert_eq!( diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index a7b68cc18d44d..74b30db1e01af 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -17,9 +17,10 @@ //! Optimizer rule for type validation and coercion -use crate::utils::rewrite_preserving_name; -use crate::{OptimizerConfig, OptimizerRule}; +use std::sync::Arc; + use arrow::datatypes::{DataType, IntervalUnit}; + use datafusion_common::{ parse_interval, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; @@ -39,7 +40,9 @@ use datafusion_expr::{ WindowFrame, WindowFrameBound, WindowFrameUnits, }; use datafusion_expr::{ExprSchemable, Signature}; -use std::sync::Arc; + +use crate::utils::rewrite_preserving_name; +use crate::{OptimizerConfig, OptimizerRule}; #[derive(Default)] pub struct TypeCoercion {} @@ -58,7 +61,7 @@ impl OptimizerRule for TypeCoercion { fn try_optimize( &self, plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, + _: &dyn OptimizerConfig, ) -> Result> { Ok(Some(optimize_internal(&DFSchema::empty(), plan)?)) } @@ -584,9 +587,10 @@ fn coerce_agg_exprs_for_signature( #[cfg(test)] mod test { - use crate::type_coercion::{TypeCoercion, TypeCoercionRewriter}; - use crate::{OptimizerConfig, OptimizerRule}; + use std::sync::Arc; + use arrow::datatypes::DataType; + use datafusion_common::{DFField, DFSchema, Result, ScalarValue}; use datafusion_expr::expr::Like; use datafusion_expr::expr_rewriter::ExprRewritable; @@ -602,12 +606,14 @@ mod test { Signature, Volatility, }; use datafusion_physical_expr::expressions::AvgAccumulator; - use std::sync::Arc; + + use crate::type_coercion::{TypeCoercion, TypeCoercionRewriter}; + use crate::{OptimizerContext, OptimizerRule}; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { let rule = TypeCoercion::new(); - let mut config = OptimizerConfig::default(); - let plan = rule.try_optimize(plan, &mut config)?.unwrap(); + let config = OptimizerContext::default(); + let plan = rule.try_optimize(plan, &config)?.unwrap(); assert_eq!(expected, &format!("{:?}", plan)); Ok(()) } diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 687e15b965c9f..b022653e467cc 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -82,13 +82,13 @@ impl OptimizerRule for UnwrapCastInComparison { fn try_optimize( &self, plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { let new_inputs = plan .inputs() .into_iter() .map(|input| { - self.try_optimize(input, _optimizer_config) + self.try_optimize(input, _config) .map(|o| o.unwrap_or_else(|| input.clone())) }) .collect::>>()?; diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index e2d326c16cc35..324f49e44f32f 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -40,12 +40,12 @@ use std::sync::Arc; pub fn optimize_children( optimizer: &impl OptimizerRule, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result { let new_exprs = plan.expressions(); let mut new_inputs = Vec::with_capacity(plan.inputs().len()); for input in plan.inputs() { - let new_input = optimizer.try_optimize(input, optimizer_config)?; + let new_input = optimizer.try_optimize(input, config)?; new_inputs.push(new_input.unwrap_or_else(|| input.clone())) } from_plan(plan, &new_exprs, &new_inputs) diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 701d1a84c604b..62a8f1ef2cd98 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -20,7 +20,7 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource}; use datafusion_optimizer::optimizer::Optimizer; -use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; +use datafusion_optimizer::{OptimizerContext, OptimizerRule}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::ast::Statement; use datafusion_sql::sqlparser::dialect::GenericDialect; @@ -330,12 +330,12 @@ fn test_sql(sql: &str) -> Result { // hard code the return value of now() let ts = NaiveDateTime::from_timestamp_opt(1666615693, 0).unwrap(); let now_time = DateTime::::from_utc(ts, Utc); - let mut config = OptimizerConfig::new() + let config = OptimizerContext::new() .with_skip_failing_rules(false) .with_query_execution_start_time(now_time); - let optimizer = Optimizer::new(&config); + let optimizer = Optimizer::new(); // optimize the logical plan - optimizer.optimize(&plan, &mut config, &observe) + optimizer.optimize(&plan, &config, &observe) } struct MySchemaProvider {} From 367c21ec60377182a889a99c682952c1ad1e717d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 15 Dec 2022 12:59:16 +0000 Subject: [PATCH 2/3] Format --- datafusion/optimizer/src/push_down_projection.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index e50f515297d77..757ba07f23c5b 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -1015,8 +1015,6 @@ mod tests { fn optimize(plan: &LogicalPlan) -> Result { let rule = PushDownProjection::new(); - Ok(rule - .try_optimize(plan, &OptimizerContext::new())? - .unwrap()) + Ok(rule.try_optimize(plan, &OptimizerContext::new())?.unwrap()) } } From bd9b3c94695fddfea1ce69f4752ab526151518a4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 16 Dec 2022 15:22:17 +0000 Subject: [PATCH 3/3] Review feedback --- datafusion/optimizer/src/optimizer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 43fb34b19bd4d..0dc651da2a5bc 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -250,6 +250,7 @@ impl Optimizer { for rule in &self.rules { if !config.rule_enabled(rule.name()) { + debug!("Skipping rule {} due to optimizer config", rule.name()); continue; }