diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 4d579776e6660..b484ea59086ae 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -68,7 +68,7 @@ use crate::logical_plan::{ use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; -use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::simplify_expressions::SimplifyExpressions; use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; @@ -1376,7 +1376,9 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let execution_props = &mut self.execution_props.clone(); + let mut optimizer_config = OptimizerConfig::new(); + optimizer_config.query_execution_start_time = + self.execution_props.query_execution_start_time; if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); @@ -1384,7 +1386,7 @@ impl SessionState { // optimize the child plan, capturing the output of each optimizer let plan = self.optimizer.optimize( e.plan.as_ref(), - execution_props, + &optimizer_config, |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; @@ -1399,7 +1401,7 @@ impl SessionState { schema: e.schema.clone(), })) } else { - self.optimizer.optimize(plan, execution_props, |_, _| {}) + self.optimizer.optimize(plan, &optimizer_config, |_, _| {}) } } diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index 81183f56d97c0..fa99db2e95132 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -18,7 +18,6 @@ //! Eliminate common sub-expression. use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Filter, Projection, Window}; use crate::logical_plan::{ col, @@ -26,6 +25,7 @@ use crate::logical_plan::{ DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable, ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use arrow::datatypes::DataType; use datafusion_expr::expr::GroupingSet; @@ -60,9 +60,9 @@ impl OptimizerRule for CommonSubexprEliminate { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { - optimize(plan, execution_props) + optimize(plan, optimizer_config) } fn name(&self) -> &str { @@ -83,7 +83,10 @@ impl CommonSubexprEliminate { } } -fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { +fn optimize( + plan: &LogicalPlan, + optimizer_config: &OptimizerConfig, +) -> Result { let mut expr_set = ExprSet::new(); match plan { @@ -101,7 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result Result Result Result Result Result>>()?; from_plan(plan, &expr, &new_inputs) @@ -302,7 +305,7 @@ fn rewrite_expr( input: &LogicalPlan, expr_set: &mut ExprSet, schema: &DFSchema, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result<(Vec>, LogicalPlan)> { let mut affected_id = HashSet::::new(); @@ -327,7 +330,7 @@ fn rewrite_expr( }) .collect::>>()?; - let mut new_input = optimize(input, execution_props)?; + let mut new_input = optimize(input, optimizer_config)?; if !affected_id.is_empty() { new_input = build_project_plan(new_input, affected_id, expr_set)?; } @@ -702,7 +705,7 @@ mod test { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let optimizer = CommonSubexprEliminate {}; let optimized_plan = optimizer - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs b/datafusion/core/src/optimizer/eliminate_filter.rs index fb99a97988d5a..a3c3e03412f86 100644 --- a/datafusion/core/src/optimizer/eliminate_filter.rs +++ b/datafusion/core/src/optimizer/eliminate_filter.rs @@ -27,7 +27,7 @@ use crate::logical_plan::plan::Filter; use crate::logical_plan::{EmptyRelation, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; -use crate::execution::context::ExecutionProps; +use crate::optimizer::optimizer::OptimizerConfig; /// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation] #[derive(Default)] @@ -44,7 +44,7 @@ impl OptimizerRule for EliminateFilter { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { match plan { LogicalPlan::Filter(Filter { @@ -65,7 +65,7 @@ impl OptimizerRule for EliminateFilter { let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| self.optimize(plan, execution_props)) + .map(|plan| self.optimize(plan, optimizer_config)) .collect::>>()?; from_plan(plan, &plan.expressions(), &new_inputs) @@ -88,7 +88,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateFilter::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs b/datafusion/core/src/optimizer/eliminate_limit.rs index a7acf7ca6b561..b07f6f224a94e 100644 --- a/datafusion/core/src/optimizer/eliminate_limit.rs +++ b/datafusion/core/src/optimizer/eliminate_limit.rs @@ -22,7 +22,7 @@ use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use datafusion_expr::utils::from_plan; -use crate::execution::context::ExecutionProps; +use crate::optimizer::optimizer::OptimizerConfig; /// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] #[derive(Default)] @@ -39,7 +39,7 @@ impl OptimizerRule for EliminateLimit { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { match plan { LogicalPlan::Limit(Limit { n, input }) if *n == 0 => { @@ -56,7 +56,7 @@ impl OptimizerRule for EliminateLimit { let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| self.optimize(plan, execution_props)) + .map(|plan| self.optimize(plan, optimizer_config)) .collect::>>()?; from_plan(plan, &expr, &new_inputs) @@ -79,7 +79,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateLimit::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index abfb15dd1ba2c..a07155c2f0bb2 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -14,10 +14,8 @@ //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan -use crate::{ - execution::context::ExecutionProps, - optimizer::{optimizer::OptimizerRule, utils}, -}; +use crate::optimizer::optimizer::OptimizerConfig; +use crate::optimizer::{optimizer::OptimizerRule, utils}; use datafusion_common::{Column, DFSchema, Result}; use datafusion_expr::{ col, @@ -530,7 +528,7 @@ impl OptimizerRule for FilterPushDown { "filter_push_down" } - fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result { + fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result { optimize(plan, State::default()) } } @@ -578,7 +576,7 @@ mod tests { fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterPushDown::new(); - rule.optimize(plan, &ExecutionProps::new()) + rule.optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan") } diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 725e00a004d84..19578b2bc5d77 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -18,10 +18,10 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::Projection; use crate::logical_plan::{Limit, TableScan}; use crate::logical_plan::{LogicalPlan, Union}; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use datafusion_common::DataFusionError; use datafusion_expr::logical_plan::{Join, JoinType, Offset}; @@ -44,7 +44,7 @@ fn limit_push_down( _optimizer: &LimitPushDown, upper_limit: Option, plan: &LogicalPlan, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, is_offset: bool, ) -> Result { match (plan, upper_limit) { @@ -61,7 +61,7 @@ fn limit_push_down( _optimizer, Some(new_limit), input.as_ref(), - _execution_props, + _optimizer_config, false, )?), })) @@ -102,7 +102,7 @@ fn limit_push_down( _optimizer, upper_limit, input.as_ref(), - _execution_props, + _optimizer_config, false, )?), schema: schema.clone(), @@ -127,7 +127,7 @@ fn limit_push_down( _optimizer, Some(upper_limit), x, - _execution_props, + _optimizer_config, false, )?), })) @@ -153,7 +153,7 @@ fn limit_push_down( _optimizer, Some(new_limit), input.as_ref(), - _execution_props, + _optimizer_config, true, )?), })) @@ -163,7 +163,7 @@ fn limit_push_down( //if LeftOuter join push limit to left generate_push_down_join( _optimizer, - _execution_props, + _optimizer_config, plan, upper_limit, None, @@ -174,23 +174,23 @@ fn limit_push_down( { generate_push_down_join( _optimizer, - _execution_props, + _optimizer_config, plan, None, upper_limit, ) } - _ => generate_push_down_join(_optimizer, _execution_props, plan, None, None), + _ => generate_push_down_join(_optimizer, _optimizer_config, plan, None, None), }, // For other nodes we can't push down the limit // But try to recurse and find other limit nodes to push down - _ => push_down_children_limit(_optimizer, _execution_props, plan), + _ => push_down_children_limit(_optimizer, _optimizer_config, plan), } } fn generate_push_down_join( _optimizer: &LimitPushDown, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, join: &LogicalPlan, left_limit: Option, right_limit: Option, @@ -211,14 +211,14 @@ fn generate_push_down_join( _optimizer, left_limit, left.as_ref(), - _execution_props, + _optimizer_config, true, )?), right: Arc::new(limit_push_down( _optimizer, right_limit, right.as_ref(), - _execution_props, + _optimizer_config, true, )?), on: on.clone(), @@ -238,7 +238,7 @@ fn generate_push_down_join( fn push_down_children_limit( _optimizer: &LimitPushDown, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, plan: &LogicalPlan, ) -> Result { let expr = plan.expressions(); @@ -247,7 +247,7 @@ fn push_down_children_limit( let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props, false)) + .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false)) .collect::>>()?; from_plan(plan, &expr, &new_inputs) @@ -257,9 +257,9 @@ impl OptimizerRule for LimitPushDown { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { - limit_push_down(self, None, plan, execution_props, false) + limit_push_down(self, None, plan, optimizer_config, false) } fn name(&self) -> &str { @@ -280,7 +280,7 @@ mod test { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = LimitPushDown::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs index cc00eb8f830c8..8b602f4231b4b 100644 --- a/datafusion/core/src/optimizer/optimizer.rs +++ b/datafusion/core/src/optimizer/optimizer.rs @@ -17,12 +17,12 @@ //! Query optimizer traits +use chrono::{DateTime, Utc}; use std::sync::Arc; use log::{debug, trace}; use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which @@ -33,13 +33,37 @@ pub trait OptimizerRule { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result; /// A human readable name for this optimizer rule fn name(&self) -> &str; } +/// Placeholder for optimizer configuration options +#[derive(Debug)] +pub struct OptimizerConfig { + /// Query execution start time that can be used to rewrite expressions such as `now()` + /// to use a literal value instead + pub query_execution_start_time: DateTime, +} + +impl OptimizerConfig { + /// Create optimizer config + pub fn new() -> Self { + Self { + query_execution_start_time: chrono::Utc::now(), + } + } +} + +impl Default for OptimizerConfig { + /// Create optimizer config + fn default() -> Self { + Self::new() + } +} + /// A rule-based optimizer. #[derive(Clone)] pub struct Optimizer { @@ -58,19 +82,17 @@ impl Optimizer { pub fn optimize( &self, plan: &LogicalPlan, - execution_props: &mut ExecutionProps, + optimizer_config: &OptimizerConfig, mut observer: F, ) -> Result where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { - let execution_props = execution_props.start_execution(); - let mut new_plan = plan.clone(); debug!("Input logical plan:\n{}\n", plan.display_indent()); trace!("Full input logical plan:\n{:?}", plan); for rule in &self.rules { - new_plan = rule.optimize(&new_plan, execution_props)?; + new_plan = rule.optimize(&new_plan, optimizer_config)?; observer(&new_plan, rule.as_ref()); } debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 4feef4f99057f..977a34b534880 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -19,7 +19,6 @@ //! loaded into memory use crate::error::{DataFusionError, Result}; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window, }; @@ -27,6 +26,7 @@ use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, Union, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; @@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { // set of all columns refered by the plan (and thus considered required by the root) let required_columns = plan @@ -57,7 +57,7 @@ impl OptimizerRule for ProjectionPushDown { .iter() .map(|f| f.qualified_column()) .collect::>(); - optimize_plan(self, plan, &required_columns, false, execution_props) + optimize_plan(self, plan, &required_columns, false, optimizer_config) } fn name(&self) -> &str { @@ -132,7 +132,7 @@ fn optimize_plan( plan: &LogicalPlan, required_columns: &HashSet, // set of columns required up to this step has_projection: bool, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, ) -> Result { let mut new_required_columns = required_columns.clone(); match plan { @@ -171,7 +171,7 @@ fn optimize_plan( input, &new_required_columns, true, - _execution_props, + _optimizer_config, )?; let new_required_columns_optimized = new_input @@ -226,7 +226,7 @@ fn optimize_plan( left, &new_required_columns, true, - _execution_props, + _optimizer_config, )?); let optimized_right = Arc::new(optimize_plan( @@ -234,7 +234,7 @@ fn optimize_plan( right, &new_required_columns, true, - _execution_props, + _optimizer_config, )?); let schema = build_join_schema( @@ -284,7 +284,7 @@ fn optimize_plan( input, required_columns, true, - _execution_props, + _optimizer_config, )?) .build(); }; @@ -300,7 +300,7 @@ fn optimize_plan( input, &new_required_columns, true, - _execution_props, + _optimizer_config, )?) .window(new_window_expr)? .build() @@ -352,7 +352,7 @@ fn optimize_plan( input, &new_required_columns, true, - _execution_props, + _optimizer_config, )?), schema: DFSchemaRef::new(new_schema), })) @@ -401,7 +401,7 @@ fn optimize_plan( &a.input, &required_columns, false, - _execution_props, + _optimizer_config, )?), verbose: a.verbose, schema: a.schema.clone(), @@ -437,7 +437,7 @@ fn optimize_plan( input_plan, &new_required_columns, has_projection, - _execution_props, + _optimizer_config, ) }) .collect::>>()?; @@ -474,7 +474,7 @@ fn optimize_plan( input, &new_required_columns, has_projection, - _execution_props, + _optimizer_config, )?]; let expr = vec![]; from_plan(plan, &expr, &new_inputs) @@ -516,7 +516,7 @@ fn optimize_plan( input_plan, &new_required_columns, has_projection, - _execution_props, + _optimizer_config, ) }) .collect::>>()?; @@ -1005,6 +1005,6 @@ mod tests { fn optimize(plan: &LogicalPlan) -> Result { let rule = ProjectionPushDown::new(); - rule.optimize(plan, &ExecutionProps::new()) + rule.optimize(plan, &OptimizerConfig::new()) } } diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index 8c628906ac99c..49a5a1295f239 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -24,6 +24,7 @@ use crate::logical_plan::{ lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable, LogicalPlan, RewriteRecursion, SimplifyInfo, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use crate::physical_plan::planner::create_physical_expr; use crate::scalar::ScalarValue; @@ -192,6 +193,19 @@ impl OptimizerRule for SimplifyExpressions { } fn optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &OptimizerConfig, + ) -> Result { + let mut execution_props = ExecutionProps::new(); + execution_props.query_execution_start_time = + optimizer_config.query_execution_start_time; + self.optimize_internal(plan, &execution_props) + } +} + +impl SimplifyExpressions { + fn optimize_internal( &self, plan: &LogicalPlan, execution_props: &ExecutionProps, @@ -206,7 +220,7 @@ impl OptimizerRule for SimplifyExpressions { let new_inputs = plan .inputs() .iter() - .map(|input| self.optimize(input, execution_props)) + .map(|input| self.optimize_internal(input, execution_props)) .collect::>>()?; let expr = plan @@ -257,8 +271,8 @@ impl SimplifyExpressions { /// # use datafusion::optimizer::simplify_expressions::ConstEvaluator; /// # use datafusion::execution::context::ExecutionProps; /// -/// let execution_props = ExecutionProps::new(); -/// let mut const_evaluator = ConstEvaluator::new(&execution_props); +/// let optimizer_config = ExecutionProps::new(); +/// let mut const_evaluator = ConstEvaluator::new(&optimizer_config); /// /// // (1 + 2) + a /// let expr = (lit(1) + lit(2)) + col("a"); @@ -282,7 +296,7 @@ pub struct ConstEvaluator<'a> { /// descendants) so this Expr can be evaluated can_evaluate: Vec, - execution_props: &'a ExecutionProps, + optimizer_config: &'a ExecutionProps, input_schema: DFSchema, input_batch: RecordBatch, } @@ -328,8 +342,8 @@ impl<'a> ExprRewriter for ConstEvaluator<'a> { impl<'a> ConstEvaluator<'a> { /// Create a new `ConstantEvaluator`. Session constants (such as /// the time for `now()` are taken from the passed - /// `execution_props`. - pub fn new(execution_props: &'a ExecutionProps) -> Self { + /// `optimizer_config`. + pub fn new(optimizer_config: &'a ExecutionProps) -> Self { let input_schema = DFSchema::empty(); // The dummy column name is unused and doesn't matter as only @@ -344,7 +358,7 @@ impl<'a> ConstEvaluator<'a> { Self { can_evaluate: vec![], - execution_props, + optimizer_config, input_schema, input_batch, } @@ -410,7 +424,7 @@ impl<'a> ConstEvaluator<'a> { &expr, &self.input_schema, &self.input_batch.schema(), - self.execution_props, + self.optimizer_config, )?; let col_val = phys_expr.evaluate(&self.input_batch)?; match col_val { @@ -1179,12 +1193,12 @@ mod tests { expected_expr: Expr, date_time: &DateTime, ) { - let execution_props = ExecutionProps { + let optimizer_config = ExecutionProps { query_execution_start_time: *date_time, var_providers: None, }; - let mut const_evaluator = ConstEvaluator::new(&execution_props); + let mut const_evaluator = ConstEvaluator::new(&optimizer_config); let evaluated_expr = input_expr .clone() .rewrite(&mut const_evaluator) @@ -1207,8 +1221,8 @@ mod tests { fn simplify(expr: Expr) -> Expr { let schema = expr_test_schema(); - let execution_props = ExecutionProps::new(); - let info = SimplifyContext::new(vec![&schema], &execution_props); + let optimizer_config = ExecutionProps::new(); + let info = SimplifyContext::new(vec![&schema], &optimizer_config); expr.simplify(&info).unwrap() } @@ -1518,7 +1532,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SimplifyExpressions::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); @@ -1736,14 +1750,12 @@ mod tests { // expect optimizing will result in an error, returning the error string fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime) -> String { + let mut config = OptimizerConfig::new(); + config.query_execution_start_time = *date_time; let rule = SimplifyExpressions::new(); - let execution_props = ExecutionProps { - query_execution_start_time: *date_time, - var_providers: None, - }; let err = rule - .optimize(plan, &execution_props) + .optimize(plan, &config) .expect_err("expected optimization to fail"); err.to_string() @@ -1753,14 +1765,12 @@ mod tests { plan: &LogicalPlan, date_time: &DateTime, ) -> String { + let mut config = OptimizerConfig::new(); + config.query_execution_start_time = *date_time; let rule = SimplifyExpressions::new(); - let execution_props = ExecutionProps { - query_execution_start_time: *date_time, - var_providers: None, - }; let optimized_plan = rule - .optimize(plan, &execution_props) + .optimize(plan, &config) .expect("failed to optimize plan"); return format!("{:?}", optimized_plan); } diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index 1748f9af624f9..65458c4dba887 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -18,10 +18,10 @@ //! single distinct to group by optimizer rule use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Aggregate, Projection}; use crate::logical_plan::ExprSchemable; use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan}; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use datafusion_expr::utils::{columnize_expr, from_plan}; use hashbrown::HashSet; @@ -188,7 +188,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { fn optimize( &self, plan: &LogicalPlan, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, ) -> Result { optimize(plan) } @@ -207,7 +207,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SingleDistinctToGroupBy::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs index 6454425191946..add03c72d5880 100644 --- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -29,11 +29,11 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Filter, Join}; use crate::logical_plan::{ build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -52,12 +52,12 @@ impl OptimizerRule for SubqueryFilterToJoin { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { match plan { LogicalPlan::Filter(Filter { predicate, input }) => { // Apply optimizer rule to current input - let optimized_input = self.optimize(input, execution_props)?; + let optimized_input = self.optimize(input, optimizer_config)?; // Splitting filter expression into components by AND let mut filters = vec![]; @@ -97,7 +97,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } => { let right_input = self.optimize( &*subquery.subquery, - execution_props + optimizer_config )?; let right_schema = right_input.schema(); if right_schema.fields().len() != 1 { @@ -167,7 +167,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, execution_props) + utils::optimize_children(self, plan, optimizer_config) } } } @@ -201,7 +201,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SubqueryFilterToJoin::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 81d4b76a26eba..006613328674b 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -18,7 +18,7 @@ //! Collection of utility functions that are leveraged by the query optimizer rules use super::optimizer::OptimizerRule; -use crate::execution::context::ExecutionProps; +use crate::optimizer::optimizer::OptimizerConfig; use datafusion_expr::logical_plan::Filter; use crate::error::{DataFusionError, Result}; @@ -42,13 +42,13 @@ const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__"; pub fn optimize_children( optimizer: &impl OptimizerRule, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { let new_exprs = plan.expressions(); let new_inputs = plan .inputs() .into_iter() - .map(|plan| optimizer.optimize(plan, execution_props)) + .map(|plan| optimizer.optimize(plan, optimizer_config)) .collect::>>()?; from_plan(plan, &new_exprs, &new_inputs) diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index d062cf3a37583..a1bbb2419fc46 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -86,10 +86,11 @@ use std::task::{Context, Poll}; use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; -use datafusion::execution::context::{ExecutionProps, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::logical_plan::plan::{Extension, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; +use datafusion::optimizer::optimizer::OptimizerConfig; /// Execute the specified sql and return the resulting record batches /// pretty printed as a String. @@ -284,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { // Note: this code simply looks for the pattern of a Limit followed by a // Sort and replaces it by a TopK node. It does not handle many @@ -300,7 +301,7 @@ impl OptimizerRule for TopKOptimizerRule { return Ok(LogicalPlan::Extension(Extension { node: Arc::new(TopKPlanNode { k: *n, - input: self.optimize(input.as_ref(), execution_props)?, + input: self.optimize(input.as_ref(), optimizer_config)?, expr: expr[0].clone(), }), })); @@ -310,7 +311,7 @@ impl OptimizerRule for TopKOptimizerRule { // If we didn't find the Limit/Sort combination, recurse as // normal and build the result. - optimize_children(self, plan, execution_props) + optimize_children(self, plan, optimizer_config) } fn name(&self) -> &str {