From e5596598cb33934e1fdb6b07cb7e4445a39489fc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:16:03 -0600 Subject: [PATCH 01/16] initial refactor --- datafusion/core/src/execution/context.rs | 10 ++-- .../src/optimizer/common_subexpr_eliminate.rs | 29 ++++++----- .../core/src/optimizer/eliminate_filter.rs | 8 +-- .../core/src/optimizer/eliminate_limit.rs | 8 +-- .../core/src/optimizer/filter_push_down.rs | 10 ++-- .../core/src/optimizer/limit_push_down.rs | 36 ++++++------- datafusion/core/src/optimizer/optimizer.rs | 25 +++++++-- .../src/optimizer/projection_push_down.rs | 30 +++++------ .../src/optimizer/simplify_expressions.rs | 51 ++++++++++--------- .../optimizer/single_distinct_to_groupby.rs | 6 +-- .../src/optimizer/subquery_filter_to_join.rs | 12 ++--- datafusion/core/src/optimizer/utils.rs | 6 +-- datafusion/core/tests/user_defined_plan.rs | 9 ++-- 13 files changed, 130 insertions(+), 110 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 4d579776e6660..5acab1babeb9f 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -68,7 +68,7 @@ use crate::logical_plan::{ use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; -use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::simplify_expressions::SimplifyExpressions; use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; @@ -1271,7 +1271,7 @@ impl SessionState { optimizer: Optimizer::new(vec![ // Simplify expressions first to maximize the chance // of applying other optimizations - Arc::new(SimplifyExpressions::new()), + Arc::new(SimplifyExpressions::new(ExecutionProps::new())), Arc::new(SubqueryFilterToJoin::new()), Arc::new(EliminateFilter::new()), Arc::new(CommonSubexprEliminate::new()), @@ -1376,7 +1376,7 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let execution_props = &mut self.execution_props.clone(); + let optimizer_config = OptimizerConfig::default(); if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); @@ -1384,7 +1384,7 @@ impl SessionState { // optimize the child plan, capturing the output of each optimizer let plan = self.optimizer.optimize( e.plan.as_ref(), - execution_props, + &optimizer_config, |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; @@ -1399,7 +1399,7 @@ impl SessionState { schema: e.schema.clone(), })) } else { - self.optimizer.optimize(plan, execution_props, |_, _| {}) + self.optimizer.optimize(plan, &optimizer_config, |_, _| {}) } } diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index 81183f56d97c0..fa99db2e95132 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -18,7 +18,6 @@ //! Eliminate common sub-expression. use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Filter, Projection, Window}; use crate::logical_plan::{ col, @@ -26,6 +25,7 @@ use crate::logical_plan::{ DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable, ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use arrow::datatypes::DataType; use datafusion_expr::expr::GroupingSet; @@ -60,9 +60,9 @@ impl OptimizerRule for CommonSubexprEliminate { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { - optimize(plan, execution_props) + optimize(plan, optimizer_config) } fn name(&self) -> &str { @@ -83,7 +83,10 @@ impl CommonSubexprEliminate { } } -fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { +fn optimize( + plan: &LogicalPlan, + optimizer_config: &OptimizerConfig, +) -> Result { let mut expr_set = ExprSet::new(); match plan { @@ -101,7 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result Result Result Result Result Result>>()?; from_plan(plan, &expr, &new_inputs) @@ -302,7 +305,7 @@ fn rewrite_expr( input: &LogicalPlan, expr_set: &mut ExprSet, schema: &DFSchema, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result<(Vec>, LogicalPlan)> { let mut affected_id = HashSet::::new(); @@ -327,7 +330,7 @@ fn rewrite_expr( }) .collect::>>()?; - let mut new_input = optimize(input, execution_props)?; + let mut new_input = optimize(input, optimizer_config)?; if !affected_id.is_empty() { new_input = build_project_plan(new_input, affected_id, expr_set)?; } @@ -702,7 +705,7 @@ mod test { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let optimizer = CommonSubexprEliminate {}; let optimized_plan = optimizer - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs b/datafusion/core/src/optimizer/eliminate_filter.rs index fb99a97988d5a..a3c3e03412f86 100644 --- a/datafusion/core/src/optimizer/eliminate_filter.rs +++ b/datafusion/core/src/optimizer/eliminate_filter.rs @@ -27,7 +27,7 @@ use crate::logical_plan::plan::Filter; use crate::logical_plan::{EmptyRelation, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; -use crate::execution::context::ExecutionProps; +use crate::optimizer::optimizer::OptimizerConfig; /// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation] #[derive(Default)] @@ -44,7 +44,7 @@ impl OptimizerRule for EliminateFilter { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { match plan { LogicalPlan::Filter(Filter { @@ -65,7 +65,7 @@ impl OptimizerRule for EliminateFilter { let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| self.optimize(plan, execution_props)) + .map(|plan| self.optimize(plan, optimizer_config)) .collect::>>()?; from_plan(plan, &plan.expressions(), &new_inputs) @@ -88,7 +88,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateFilter::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs b/datafusion/core/src/optimizer/eliminate_limit.rs index a7acf7ca6b561..b07f6f224a94e 100644 --- a/datafusion/core/src/optimizer/eliminate_limit.rs +++ b/datafusion/core/src/optimizer/eliminate_limit.rs @@ -22,7 +22,7 @@ use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use datafusion_expr::utils::from_plan; -use crate::execution::context::ExecutionProps; +use crate::optimizer::optimizer::OptimizerConfig; /// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] #[derive(Default)] @@ -39,7 +39,7 @@ impl OptimizerRule for EliminateLimit { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { match plan { LogicalPlan::Limit(Limit { n, input }) if *n == 0 => { @@ -56,7 +56,7 @@ impl OptimizerRule for EliminateLimit { let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| self.optimize(plan, execution_props)) + .map(|plan| self.optimize(plan, optimizer_config)) .collect::>>()?; from_plan(plan, &expr, &new_inputs) @@ -79,7 +79,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateLimit::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index abfb15dd1ba2c..a07155c2f0bb2 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -14,10 +14,8 @@ //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan -use crate::{ - execution::context::ExecutionProps, - optimizer::{optimizer::OptimizerRule, utils}, -}; +use crate::optimizer::optimizer::OptimizerConfig; +use crate::optimizer::{optimizer::OptimizerRule, utils}; use datafusion_common::{Column, DFSchema, Result}; use datafusion_expr::{ col, @@ -530,7 +528,7 @@ impl OptimizerRule for FilterPushDown { "filter_push_down" } - fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result { + fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result { optimize(plan, State::default()) } } @@ -578,7 +576,7 @@ mod tests { fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterPushDown::new(); - rule.optimize(plan, &ExecutionProps::new()) + rule.optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan") } diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 725e00a004d84..19578b2bc5d77 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -18,10 +18,10 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::Projection; use crate::logical_plan::{Limit, TableScan}; use crate::logical_plan::{LogicalPlan, Union}; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use datafusion_common::DataFusionError; use datafusion_expr::logical_plan::{Join, JoinType, Offset}; @@ -44,7 +44,7 @@ fn limit_push_down( _optimizer: &LimitPushDown, upper_limit: Option, plan: &LogicalPlan, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, is_offset: bool, ) -> Result { match (plan, upper_limit) { @@ -61,7 +61,7 @@ fn limit_push_down( _optimizer, Some(new_limit), input.as_ref(), - _execution_props, + _optimizer_config, false, )?), })) @@ -102,7 +102,7 @@ fn limit_push_down( _optimizer, upper_limit, input.as_ref(), - _execution_props, + _optimizer_config, false, )?), schema: schema.clone(), @@ -127,7 +127,7 @@ fn limit_push_down( _optimizer, Some(upper_limit), x, - _execution_props, + _optimizer_config, false, )?), })) @@ -153,7 +153,7 @@ fn limit_push_down( _optimizer, Some(new_limit), input.as_ref(), - _execution_props, + _optimizer_config, true, )?), })) @@ -163,7 +163,7 @@ fn limit_push_down( //if LeftOuter join push limit to left generate_push_down_join( _optimizer, - _execution_props, + _optimizer_config, plan, upper_limit, None, @@ -174,23 +174,23 @@ fn limit_push_down( { generate_push_down_join( _optimizer, - _execution_props, + _optimizer_config, plan, None, upper_limit, ) } - _ => generate_push_down_join(_optimizer, _execution_props, plan, None, None), + _ => generate_push_down_join(_optimizer, _optimizer_config, plan, None, None), }, // For other nodes we can't push down the limit // But try to recurse and find other limit nodes to push down - _ => push_down_children_limit(_optimizer, _execution_props, plan), + _ => push_down_children_limit(_optimizer, _optimizer_config, plan), } } fn generate_push_down_join( _optimizer: &LimitPushDown, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, join: &LogicalPlan, left_limit: Option, right_limit: Option, @@ -211,14 +211,14 @@ fn generate_push_down_join( _optimizer, left_limit, left.as_ref(), - _execution_props, + _optimizer_config, true, )?), right: Arc::new(limit_push_down( _optimizer, right_limit, right.as_ref(), - _execution_props, + _optimizer_config, true, )?), on: on.clone(), @@ -238,7 +238,7 @@ fn generate_push_down_join( fn push_down_children_limit( _optimizer: &LimitPushDown, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, plan: &LogicalPlan, ) -> Result { let expr = plan.expressions(); @@ -247,7 +247,7 @@ fn push_down_children_limit( let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props, false)) + .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false)) .collect::>>()?; from_plan(plan, &expr, &new_inputs) @@ -257,9 +257,9 @@ impl OptimizerRule for LimitPushDown { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { - limit_push_down(self, None, plan, execution_props, false) + limit_push_down(self, None, plan, optimizer_config, false) } fn name(&self) -> &str { @@ -280,7 +280,7 @@ mod test { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = LimitPushDown::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs index cc00eb8f830c8..92ed5ca68469c 100644 --- a/datafusion/core/src/optimizer/optimizer.rs +++ b/datafusion/core/src/optimizer/optimizer.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use log::{debug, trace}; use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which @@ -33,13 +32,28 @@ pub trait OptimizerRule { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result; /// A human readable name for this optimizer rule fn name(&self) -> &str; } +/// Placeholder for optimizer configuration options +#[derive(Debug)] +pub struct OptimizerConfig {} + +impl OptimizerConfig { + /// Create optimizer config + pub fn new() -> Self { + Self {} + } + /// Create optimizer config + pub fn default() -> Self { + Self {} + } +} + /// A rule-based optimizer. #[derive(Clone)] pub struct Optimizer { @@ -58,19 +72,20 @@ impl Optimizer { pub fn optimize( &self, plan: &LogicalPlan, - execution_props: &mut ExecutionProps, + optimizer_config: &OptimizerConfig, mut observer: F, ) -> Result where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { - let execution_props = execution_props.start_execution(); + //TODO fix this regression + //let optimizer_config = optimizer_config.start_execution(); let mut new_plan = plan.clone(); debug!("Input logical plan:\n{}\n", plan.display_indent()); trace!("Full input logical plan:\n{:?}", plan); for rule in &self.rules { - new_plan = rule.optimize(&new_plan, execution_props)?; + new_plan = rule.optimize(&new_plan, optimizer_config)?; observer(&new_plan, rule.as_ref()); } debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 4feef4f99057f..977a34b534880 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -19,7 +19,6 @@ //! loaded into memory use crate::error::{DataFusionError, Result}; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{ Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window, }; @@ -27,6 +26,7 @@ use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, Union, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; @@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { // set of all columns refered by the plan (and thus considered required by the root) let required_columns = plan @@ -57,7 +57,7 @@ impl OptimizerRule for ProjectionPushDown { .iter() .map(|f| f.qualified_column()) .collect::>(); - optimize_plan(self, plan, &required_columns, false, execution_props) + optimize_plan(self, plan, &required_columns, false, optimizer_config) } fn name(&self) -> &str { @@ -132,7 +132,7 @@ fn optimize_plan( plan: &LogicalPlan, required_columns: &HashSet, // set of columns required up to this step has_projection: bool, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, ) -> Result { let mut new_required_columns = required_columns.clone(); match plan { @@ -171,7 +171,7 @@ fn optimize_plan( input, &new_required_columns, true, - _execution_props, + _optimizer_config, )?; let new_required_columns_optimized = new_input @@ -226,7 +226,7 @@ fn optimize_plan( left, &new_required_columns, true, - _execution_props, + _optimizer_config, )?); let optimized_right = Arc::new(optimize_plan( @@ -234,7 +234,7 @@ fn optimize_plan( right, &new_required_columns, true, - _execution_props, + _optimizer_config, )?); let schema = build_join_schema( @@ -284,7 +284,7 @@ fn optimize_plan( input, required_columns, true, - _execution_props, + _optimizer_config, )?) .build(); }; @@ -300,7 +300,7 @@ fn optimize_plan( input, &new_required_columns, true, - _execution_props, + _optimizer_config, )?) .window(new_window_expr)? .build() @@ -352,7 +352,7 @@ fn optimize_plan( input, &new_required_columns, true, - _execution_props, + _optimizer_config, )?), schema: DFSchemaRef::new(new_schema), })) @@ -401,7 +401,7 @@ fn optimize_plan( &a.input, &required_columns, false, - _execution_props, + _optimizer_config, )?), verbose: a.verbose, schema: a.schema.clone(), @@ -437,7 +437,7 @@ fn optimize_plan( input_plan, &new_required_columns, has_projection, - _execution_props, + _optimizer_config, ) }) .collect::>>()?; @@ -474,7 +474,7 @@ fn optimize_plan( input, &new_required_columns, has_projection, - _execution_props, + _optimizer_config, )?]; let expr = vec![]; from_plan(plan, &expr, &new_inputs) @@ -516,7 +516,7 @@ fn optimize_plan( input_plan, &new_required_columns, has_projection, - _execution_props, + _optimizer_config, ) }) .collect::>>()?; @@ -1005,6 +1005,6 @@ mod tests { fn optimize(plan: &LogicalPlan) -> Result { let rule = ProjectionPushDown::new(); - rule.optimize(plan, &ExecutionProps::new()) + rule.optimize(plan, &OptimizerConfig::new()) } } diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index 8c628906ac99c..a7a8ae78bb2a0 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -24,6 +24,7 @@ use crate::logical_plan::{ lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable, LogicalPlan, RewriteRecursion, SimplifyInfo, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use crate::physical_plan::planner::create_physical_expr; use crate::scalar::ScalarValue; @@ -95,7 +96,9 @@ impl<'a, 'b> SimplifyInfo for SimplifyContext<'a, 'b> { /// `Filter: b > 2` /// #[derive(Default)] -pub(crate) struct SimplifyExpressions {} +pub(crate) struct SimplifyExpressions { + props: ExecutionProps, +} /// returns true if `needle` is found in a chain of search_op /// expressions. Such as: (A AND B) AND C @@ -194,19 +197,19 @@ impl OptimizerRule for SimplifyExpressions { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { // We need to pass down the all schemas within the plan tree to `optimize_expr` in order to // to evaluate expression types. For example, a projection plan's schema will only include // projected columns. With just the projected schema, it's not possible to infer types for // expressions that references non-projected columns within the same project plan or its // children plans. - let info = SimplifyContext::new(plan.all_schemas(), execution_props); + let info = SimplifyContext::new(plan.all_schemas(), &self.props); let new_inputs = plan .inputs() .iter() - .map(|input| self.optimize(input, execution_props)) + .map(|input| self.optimize(input, optimizer_config)) .collect::>>()?; let expr = plan @@ -240,8 +243,8 @@ impl OptimizerRule for SimplifyExpressions { impl SimplifyExpressions { #[allow(missing_docs)] - pub fn new() -> Self { - Self {} + pub fn new(props: ExecutionProps) -> Self { + Self { props } } } @@ -257,8 +260,8 @@ impl SimplifyExpressions { /// # use datafusion::optimizer::simplify_expressions::ConstEvaluator; /// # use datafusion::execution::context::ExecutionProps; /// -/// let execution_props = ExecutionProps::new(); -/// let mut const_evaluator = ConstEvaluator::new(&execution_props); +/// let optimizer_config = ExecutionProps::new(); +/// let mut const_evaluator = ConstEvaluator::new(&optimizer_config); /// /// // (1 + 2) + a /// let expr = (lit(1) + lit(2)) + col("a"); @@ -282,7 +285,7 @@ pub struct ConstEvaluator<'a> { /// descendants) so this Expr can be evaluated can_evaluate: Vec, - execution_props: &'a ExecutionProps, + optimizer_config: &'a ExecutionProps, input_schema: DFSchema, input_batch: RecordBatch, } @@ -328,8 +331,8 @@ impl<'a> ExprRewriter for ConstEvaluator<'a> { impl<'a> ConstEvaluator<'a> { /// Create a new `ConstantEvaluator`. Session constants (such as /// the time for `now()` are taken from the passed - /// `execution_props`. - pub fn new(execution_props: &'a ExecutionProps) -> Self { + /// `optimizer_config`. + pub fn new(optimizer_config: &'a ExecutionProps) -> Self { let input_schema = DFSchema::empty(); // The dummy column name is unused and doesn't matter as only @@ -344,7 +347,7 @@ impl<'a> ConstEvaluator<'a> { Self { can_evaluate: vec![], - execution_props, + optimizer_config, input_schema, input_batch, } @@ -410,7 +413,7 @@ impl<'a> ConstEvaluator<'a> { &expr, &self.input_schema, &self.input_batch.schema(), - self.execution_props, + self.optimizer_config, )?; let col_val = phys_expr.evaluate(&self.input_batch)?; match col_val { @@ -1179,12 +1182,12 @@ mod tests { expected_expr: Expr, date_time: &DateTime, ) { - let execution_props = ExecutionProps { + let optimizer_config = ExecutionProps { query_execution_start_time: *date_time, var_providers: None, }; - let mut const_evaluator = ConstEvaluator::new(&execution_props); + let mut const_evaluator = ConstEvaluator::new(&optimizer_config); let evaluated_expr = input_expr .clone() .rewrite(&mut const_evaluator) @@ -1207,8 +1210,8 @@ mod tests { fn simplify(expr: Expr) -> Expr { let schema = expr_test_schema(); - let execution_props = ExecutionProps::new(); - let info = SimplifyContext::new(vec![&schema], &execution_props); + let optimizer_config = ExecutionProps::new(); + let info = SimplifyContext::new(vec![&schema], &optimizer_config); expr.simplify(&info).unwrap() } @@ -1516,9 +1519,9 @@ mod tests { } fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = SimplifyExpressions::new(); + let rule = SimplifyExpressions::new(ExecutionProps::default()); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); @@ -1736,14 +1739,14 @@ mod tests { // expect optimizing will result in an error, returning the error string fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime) -> String { - let rule = SimplifyExpressions::new(); - let execution_props = ExecutionProps { + let props = ExecutionProps { query_execution_start_time: *date_time, var_providers: None, }; + let rule = SimplifyExpressions::new(props); let err = rule - .optimize(plan, &execution_props) + .optimize(plan, &OptimizerConfig::default()) .expect_err("expected optimization to fail"); err.to_string() @@ -1753,14 +1756,14 @@ mod tests { plan: &LogicalPlan, date_time: &DateTime, ) -> String { - let rule = SimplifyExpressions::new(); let execution_props = ExecutionProps { query_execution_start_time: *date_time, var_providers: None, }; + let rule = SimplifyExpressions::new(execution_props); let optimized_plan = rule - .optimize(plan, &execution_props) + .optimize(plan, &OptimizerConfig::default()) .expect("failed to optimize plan"); return format!("{:?}", optimized_plan); } diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index 1748f9af624f9..65458c4dba887 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -18,10 +18,10 @@ //! single distinct to group by optimizer rule use crate::error::Result; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Aggregate, Projection}; use crate::logical_plan::ExprSchemable; use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan}; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use datafusion_expr::utils::{columnize_expr, from_plan}; use hashbrown::HashSet; @@ -188,7 +188,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { fn optimize( &self, plan: &LogicalPlan, - _execution_props: &ExecutionProps, + _optimizer_config: &OptimizerConfig, ) -> Result { optimize(plan) } @@ -207,7 +207,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SingleDistinctToGroupBy::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs index 6454425191946..add03c72d5880 100644 --- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -29,11 +29,11 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Filter, Join}; use crate::logical_plan::{ build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan, }; +use crate::optimizer::optimizer::OptimizerConfig; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -52,12 +52,12 @@ impl OptimizerRule for SubqueryFilterToJoin { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { match plan { LogicalPlan::Filter(Filter { predicate, input }) => { // Apply optimizer rule to current input - let optimized_input = self.optimize(input, execution_props)?; + let optimized_input = self.optimize(input, optimizer_config)?; // Splitting filter expression into components by AND let mut filters = vec![]; @@ -97,7 +97,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } => { let right_input = self.optimize( &*subquery.subquery, - execution_props + optimizer_config )?; let right_schema = right_input.schema(); if right_schema.fields().len() != 1 { @@ -167,7 +167,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, execution_props) + utils::optimize_children(self, plan, optimizer_config) } } } @@ -201,7 +201,7 @@ mod tests { fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SubqueryFilterToJoin::new(); let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) + .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); assert_eq!(formatted_plan, expected); diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 81d4b76a26eba..006613328674b 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -18,7 +18,7 @@ //! Collection of utility functions that are leveraged by the query optimizer rules use super::optimizer::OptimizerRule; -use crate::execution::context::ExecutionProps; +use crate::optimizer::optimizer::OptimizerConfig; use datafusion_expr::logical_plan::Filter; use crate::error::{DataFusionError, Result}; @@ -42,13 +42,13 @@ const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__"; pub fn optimize_children( optimizer: &impl OptimizerRule, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { let new_exprs = plan.expressions(); let new_inputs = plan .inputs() .into_iter() - .map(|plan| optimizer.optimize(plan, execution_props)) + .map(|plan| optimizer.optimize(plan, optimizer_config)) .collect::>>()?; from_plan(plan, &new_exprs, &new_inputs) diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index d062cf3a37583..a1bbb2419fc46 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -86,10 +86,11 @@ use std::task::{Context, Poll}; use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; -use datafusion::execution::context::{ExecutionProps, TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::logical_plan::plan::{Extension, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; +use datafusion::optimizer::optimizer::OptimizerConfig; /// Execute the specified sql and return the resulting record batches /// pretty printed as a String. @@ -284,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule { fn optimize( &self, plan: &LogicalPlan, - execution_props: &ExecutionProps, + optimizer_config: &OptimizerConfig, ) -> Result { // Note: this code simply looks for the pattern of a Limit followed by a // Sort and replaces it by a TopK node. It does not handle many @@ -300,7 +301,7 @@ impl OptimizerRule for TopKOptimizerRule { return Ok(LogicalPlan::Extension(Extension { node: Arc::new(TopKPlanNode { k: *n, - input: self.optimize(input.as_ref(), execution_props)?, + input: self.optimize(input.as_ref(), optimizer_config)?, expr: expr[0].clone(), }), })); @@ -310,7 +311,7 @@ impl OptimizerRule for TopKOptimizerRule { // If we didn't find the Limit/Sort combination, recurse as // normal and build the result. - optimize_children(self, plan, execution_props) + optimize_children(self, plan, optimizer_config) } fn name(&self) -> &str { From 48f8091408efe8743dceaf40423684e097ddf209 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:28:23 -0600 Subject: [PATCH 02/16] fix regression --- datafusion/core/src/optimizer/optimizer.rs | 16 ++++++++++------ .../core/src/optimizer/simplify_expressions.rs | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs index 92ed5ca68469c..19ed79536da54 100644 --- a/datafusion/core/src/optimizer/optimizer.rs +++ b/datafusion/core/src/optimizer/optimizer.rs @@ -17,6 +17,7 @@ //! Query optimizer traits +use chrono::{DateTime, Utc}; use std::sync::Arc; use log::{debug, trace}; @@ -41,16 +42,22 @@ pub trait OptimizerRule { /// Placeholder for optimizer configuration options #[derive(Debug)] -pub struct OptimizerConfig {} +pub struct OptimizerConfig { + /// Query execution start time that can be used to rewrite expressions such as `now()` + /// to use a literal value instead + pub query_execution_start_time: DateTime, +} impl OptimizerConfig { /// Create optimizer config pub fn new() -> Self { - Self {} + Self { + query_execution_start_time: chrono::Utc::now(), + } } /// Create optimizer config pub fn default() -> Self { - Self {} + Self::new() } } @@ -78,9 +85,6 @@ impl Optimizer { where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { - //TODO fix this regression - //let optimizer_config = optimizer_config.start_execution(); - let mut new_plan = plan.clone(); debug!("Input logical plan:\n{}\n", plan.display_indent()); trace!("Full input logical plan:\n{:?}", plan); diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index a7a8ae78bb2a0..5823e8b68a76a 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -198,6 +198,19 @@ impl OptimizerRule for SimplifyExpressions { &self, plan: &LogicalPlan, optimizer_config: &OptimizerConfig, + ) -> Result { + let mut execution_props = ExecutionProps::new(); + execution_props.query_execution_start_time = + optimizer_config.query_execution_start_time; + self.optimize_internal(plan, &execution_props) + } +} + +impl SimplifyExpressions { + fn optimize_internal( + &self, + plan: &LogicalPlan, + execution_props: &ExecutionProps, ) -> Result { // We need to pass down the all schemas within the plan tree to `optimize_expr` in order to // to evaluate expression types. For example, a projection plan's schema will only include @@ -209,7 +222,7 @@ impl OptimizerRule for SimplifyExpressions { let new_inputs = plan .inputs() .iter() - .map(|input| self.optimize(input, optimizer_config)) + .map(|input| self.optimize_internal(input, execution_props)) .collect::>>()?; let expr = plan From 3fba480ead1c9491aa94d967c4a939f7205605ff Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:38:46 -0600 Subject: [PATCH 03/16] improve --- datafusion/core/src/execution/context.rs | 6 ++-- .../src/optimizer/simplify_expressions.rs | 32 ++++++++----------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 5acab1babeb9f..e42dd4296fa98 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1271,7 +1271,7 @@ impl SessionState { optimizer: Optimizer::new(vec![ // Simplify expressions first to maximize the chance // of applying other optimizations - Arc::new(SimplifyExpressions::new(ExecutionProps::new())), + Arc::new(SimplifyExpressions::new()), Arc::new(SubqueryFilterToJoin::new()), Arc::new(EliminateFilter::new()), Arc::new(CommonSubexprEliminate::new()), @@ -1376,7 +1376,9 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let optimizer_config = OptimizerConfig::default(); + let mut optimizer_config = OptimizerConfig::default(); + optimizer_config.query_execution_start_time = + self.execution_props.query_execution_start_time; if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index 5823e8b68a76a..f60c6ea046901 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -96,9 +96,7 @@ impl<'a, 'b> SimplifyInfo for SimplifyContext<'a, 'b> { /// `Filter: b > 2` /// #[derive(Default)] -pub(crate) struct SimplifyExpressions { - props: ExecutionProps, -} +pub(crate) struct SimplifyExpressions {} /// returns true if `needle` is found in a chain of search_op /// expressions. Such as: (A AND B) AND C @@ -217,7 +215,7 @@ impl SimplifyExpressions { // projected columns. With just the projected schema, it's not possible to infer types for // expressions that references non-projected columns within the same project plan or its // children plans. - let info = SimplifyContext::new(plan.all_schemas(), &self.props); + let info = SimplifyContext::new(plan.all_schemas(), execution_props); let new_inputs = plan .inputs() @@ -256,8 +254,8 @@ impl SimplifyExpressions { impl SimplifyExpressions { #[allow(missing_docs)] - pub fn new(props: ExecutionProps) -> Self { - Self { props } + pub fn new() -> Self { + Self {} } } @@ -1532,7 +1530,7 @@ mod tests { } fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = SimplifyExpressions::new(ExecutionProps::default()); + let rule = SimplifyExpressions::new(); let optimized_plan = rule .optimize(plan, &OptimizerConfig::new()) .expect("failed to optimize plan"); @@ -1752,14 +1750,12 @@ mod tests { // expect optimizing will result in an error, returning the error string fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime) -> String { - let props = ExecutionProps { - query_execution_start_time: *date_time, - var_providers: None, - }; - let rule = SimplifyExpressions::new(props); + let mut config = OptimizerConfig::default(); + config.query_execution_start_time = *date_time; + let rule = SimplifyExpressions::new(); let err = rule - .optimize(plan, &OptimizerConfig::default()) + .optimize(plan, &config) .expect_err("expected optimization to fail"); err.to_string() @@ -1769,14 +1765,12 @@ mod tests { plan: &LogicalPlan, date_time: &DateTime, ) -> String { - let execution_props = ExecutionProps { - query_execution_start_time: *date_time, - var_providers: None, - }; - let rule = SimplifyExpressions::new(execution_props); + let mut config = OptimizerConfig::default(); + config.query_execution_start_time = *date_time; + let rule = SimplifyExpressions::new(); let optimized_plan = rule - .optimize(plan, &OptimizerConfig::default()) + .optimize(plan, &config) .expect("failed to optimize plan"); return format!("{:?}", optimized_plan); } From 0e7f0c8c7814d6734a481fff2eea521fc3ad059f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:43:54 -0600 Subject: [PATCH 04/16] rewrite imports for common_subexpr_eliminate.rs --- .../src/optimizer/common_subexpr_eliminate.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index fa99db2e95132..622164b70f0ac 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -17,19 +17,18 @@ //! Eliminate common sub-expression. -use crate::error::Result; -use crate::logical_plan::plan::{Filter, Projection, Window}; -use crate::logical_plan::{ +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use arrow::datatypes::DataType; +use datafusion_common::{DFField, DFSchema, Result}; +use datafusion_expr::{ col, - plan::{Aggregate, Sort}, - DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable, - ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, + expr::GroupingSet, + expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}, + expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, + logical_plan::{Aggregate, Filter, LogicalPlan, Projection, Sort, Window}, + utils::from_plan, + Expr, ExprSchemable, }; -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::optimizer::OptimizerRule; -use arrow::datatypes::DataType; -use datafusion_expr::expr::GroupingSet; -use datafusion_expr::utils::from_plan; use std::collections::{HashMap, HashSet}; use std::sync::Arc; From 6f2720b8b2ba59f8936a8ded7ead88a10099cd44 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:45:20 -0600 Subject: [PATCH 05/16] rewrite imports for common_subexpr_eliminate.rs --- datafusion/core/src/optimizer/common_subexpr_eliminate.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs index 622164b70f0ac..916e99713d4a6 100644 --- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs @@ -695,10 +695,11 @@ fn replace_common_expr( #[cfg(test)] mod test { use super::*; - use crate::logical_plan::{ - avg, binary_expr, col, lit, sum, LogicalPlanBuilder, Operator, - }; use crate::test::*; + use datafusion_expr::{ + avg, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, sum, + Operator, + }; use std::iter; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { From 03a6fd917a17605fc5652f220779cb3b76593031 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:47:21 -0600 Subject: [PATCH 06/16] rewrite imports for eliminate_filter.rs --- .../core/src/optimizer/eliminate_filter.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs b/datafusion/core/src/optimizer/eliminate_filter.rs index a3c3e03412f86..4bbc2c4019e44 100644 --- a/datafusion/core/src/optimizer/eliminate_filter.rs +++ b/datafusion/core/src/optimizer/eliminate_filter.rs @@ -18,16 +18,14 @@ //! Optimizer rule to replace `where false` on a plan with an empty relation. //! This saves time in planning and executing the query. //! Note that this rule should be applied after simplify expressions optimizer rule. -use datafusion_common::ScalarValue; -use datafusion_expr::utils::from_plan; -use datafusion_expr::Expr; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::{ + logical_plan::{EmptyRelation, Filter, LogicalPlan}, + utils::from_plan, + Expr, +}; -use crate::error::Result; -use crate::logical_plan::plan::Filter; -use crate::logical_plan::{EmptyRelation, LogicalPlan}; -use crate::optimizer::optimizer::OptimizerRule; - -use crate::optimizer::optimizer::OptimizerConfig; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; /// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation] #[derive(Default)] @@ -81,9 +79,8 @@ impl OptimizerRule for EliminateFilter { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::LogicalPlanBuilder; - use crate::logical_plan::{col, sum}; use crate::test::*; + use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder, sum}; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateFilter::new(); From 888d8e15ce6b7e820e3760e7cd03e8f560fcd04a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:48:49 -0600 Subject: [PATCH 07/16] rewrite imports for eliminate_limit.rs --- datafusion/core/src/optimizer/eliminate_limit.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs b/datafusion/core/src/optimizer/eliminate_limit.rs index b07f6f224a94e..27e5fab1720d6 100644 --- a/datafusion/core/src/optimizer/eliminate_limit.rs +++ b/datafusion/core/src/optimizer/eliminate_limit.rs @@ -17,12 +17,12 @@ //! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation. //! This saves time in planning and executing the query. -use crate::error::Result; -use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan}; -use crate::optimizer::optimizer::OptimizerRule; -use datafusion_expr::utils::from_plan; - -use crate::optimizer::optimizer::OptimizerConfig; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::{ + logical_plan::{EmptyRelation, Limit, LogicalPlan}, + utils::from_plan, +}; /// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation] #[derive(Default)] @@ -72,9 +72,8 @@ impl OptimizerRule for EliminateLimit { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::LogicalPlanBuilder; - use crate::logical_plan::{col, sum}; use crate::test::*; + use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder, sum}; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = EliminateLimit::new(); From ad2f119bfdf2e054ede2b227ed5fb792154f992d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:50:27 -0600 Subject: [PATCH 08/16] rewrite imports for filter_push_down.rs --- datafusion/core/src/optimizer/filter_push_down.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index a07155c2f0bb2..9641c9ae5bd05 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -14,8 +14,10 @@ //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::{optimizer::OptimizerRule, utils}; +use crate::optimizer::{ + optimizer::{OptimizerConfig, OptimizerRule}, + utils, +}; use datafusion_common::{Column, DFSchema, Result}; use datafusion_expr::{ col, @@ -560,19 +562,17 @@ fn rewrite(expr: &Expr, projection: &HashMap) -> Result { #[cfg(test)] mod tests { - use std::sync::Arc; - use super::*; use crate::test::*; + use arrow::datatypes::SchemaRef; + use async_trait::async_trait; use datafusion_common::DFSchema; use datafusion_expr::{ and, col, lit, logical_plan::{builder::union_with_alias, JoinType}, sum, Expr, LogicalPlanBuilder, Operator, TableSource, TableType, }; - - use arrow::datatypes::SchemaRef; - use async_trait::async_trait; + use std::sync::Arc; fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterPushDown::new(); From 658a14c2cab12848b9251829f5ddacdc2ec6165c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:53:10 -0600 Subject: [PATCH 09/16] rewrite imports for limit_push_down.rs --- .../core/src/optimizer/limit_push_down.rs | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 19578b2bc5d77..990182c387380 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -17,15 +17,14 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) -use crate::error::Result; -use crate::logical_plan::plan::Projection; -use crate::logical_plan::{Limit, TableScan}; -use crate::logical_plan::{LogicalPlan, Union}; -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::optimizer::OptimizerRule; -use datafusion_common::DataFusionError; -use datafusion_expr::logical_plan::{Join, JoinType, Offset}; -use datafusion_expr::utils::from_plan; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::{ + logical_plan::{ + Join, JoinType, Limit, LogicalPlan, Offset, Projection, TableScan, Union, + }, + utils::from_plan, +}; use std::sync::Arc; /// Optimization rule that tries pushes down LIMIT n @@ -270,12 +269,12 @@ impl OptimizerRule for LimitPushDown { #[cfg(test)] mod test { use super::*; - use crate::{ - logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder}, - test::*, + use crate::test::*; + use datafusion_expr::{ + col, exists, + logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan}, + max, }; - use datafusion_expr::exists; - use datafusion_expr::logical_plan::JoinType; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = LimitPushDown::new(); From 93a4366fc1c2e5f8f57a4912ff624631c8f0ffd4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:53:51 -0600 Subject: [PATCH 10/16] rewrite imports for optimizer.rs --- datafusion/core/src/optimizer/optimizer.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs index 19ed79536da54..0024528e6ed7d 100644 --- a/datafusion/core/src/optimizer/optimizer.rs +++ b/datafusion/core/src/optimizer/optimizer.rs @@ -18,12 +18,10 @@ //! Query optimizer traits use chrono::{DateTime, Utc}; -use std::sync::Arc; - +use datafusion_common::Result; +use datafusion_expr::logical_plan::LogicalPlan; use log::{debug, trace}; - -use crate::error::Result; -use crate::logical_plan::LogicalPlan; +use std::sync::Arc; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which /// computes the same results, but in a potentially more efficient From d66f3970bd02d8c0f35a4fd03354e046d7fb69a9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 08:57:24 -0600 Subject: [PATCH 11/16] rewrite imports for projection_push_down.rs --- .../src/optimizer/projection_push_down.rs | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 977a34b534880..b99b81f5259fb 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -18,22 +18,21 @@ //! Projection Push Down optimizer rule ensures that only referenced columns are //! loaded into memory -use crate::error::{DataFusionError, Result}; -use crate::logical_plan::plan::{ - Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window, -}; -use crate::logical_plan::{ - build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, - LogicalPlanBuilder, ToDFSchema, Union, -}; -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; -use datafusion_expr::utils::{ - expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan, +use datafusion_common::{ + Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ToDFSchema, +}; +use datafusion_expr::{ + logical_plan::{ + builder::{build_join_schema, LogicalPlanBuilder}, + Aggregate, Analyze, Join, LogicalPlan, Projection, SubqueryAlias, TableScan, + Union, Window, + }, + utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan}, + Expr, }; -use datafusion_expr::Expr; use std::{ collections::{BTreeSet, HashSet}, sync::Arc, @@ -529,14 +528,18 @@ fn optimize_plan( #[cfg(test)] mod tests { - use std::collections::HashMap; - use super::*; - use crate::logical_plan::{col, lit, max, min, Expr, JoinType, LogicalPlanBuilder}; use crate::test::*; use crate::test_util::scan_empty; use arrow::datatypes::DataType; - use datafusion_expr::utils::exprlist_to_fields; + use datafusion_expr::{ + col, lit, + logical_plan::{builder::LogicalPlanBuilder, JoinType}, + max, min, + utils::exprlist_to_fields, + Expr, + }; + use std::collections::HashMap; #[test] fn aggregate_no_group_by() -> Result<()> { From 94a3671552c7f67d443934e6d664b2e3ef778c41 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 09:03:34 -0600 Subject: [PATCH 12/16] rewrite imports for simplify_expressions.rs --- .../src/optimizer/simplify_expressions.rs | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index f60c6ea046901..3bdb368b85a8c 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -17,23 +17,22 @@ //! Simplify expressions optimizer rule -use crate::error::DataFusionError; use crate::execution::context::ExecutionProps; -use crate::logical_plan::ExprSchemable; -use crate::logical_plan::{ - lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable, - LogicalPlan, RewriteRecursion, SimplifyInfo, -}; -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::optimizer::OptimizerRule; +use crate::logical_plan::{ExprSimplifiable, SimplifyInfo}; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; use crate::physical_plan::planner::create_physical_expr; -use crate::scalar::ScalarValue; -use crate::{error::Result, logical_plan::Operator}; use arrow::array::new_null_array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_expr::utils::from_plan; -use datafusion_expr::Volatility; +use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue}; +use datafusion_expr::{ + expr_rewriter::RewriteRecursion, + expr_rewriter::{ExprRewritable, ExprRewriter}, + lit, + logical_plan::LogicalPlan, + utils::from_plan, + Expr, ExprSchemable, Operator, Volatility, +}; /// Provides simplification information based on schema and properties pub(crate) struct SimplifyContext<'a, 'b> { @@ -748,22 +747,22 @@ impl<'a, S: SimplifyInfo> ExprRewriter for Simplifier<'a, S> { #[cfg(test)] mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use arrow::array::{ArrayRef, Int32Array}; - use chrono::{DateTime, TimeZone, Utc}; - use datafusion_expr::{BuiltinScalarFunction, ExprSchemable}; - use super::*; use crate::assert_contains; - use crate::logical_plan::{ - and, binary_expr, call_fn, col, create_udf, lit, lit_timestamp_nano, DFField, - Expr, LogicalPlanBuilder, - }; + use crate::logical_plan::{call_fn, create_udf}; use crate::physical_plan::functions::make_scalar_function; use crate::physical_plan::udf::ScalarUDF; use crate::test_util::scan_empty; + use arrow::array::{ArrayRef, Int32Array}; + use chrono::{DateTime, TimeZone, Utc}; + use datafusion_common::DFField; + use datafusion_expr::{ + and, binary_expr, col, lit, lit_timestamp_nano, + logical_plan::builder::LogicalPlanBuilder, BuiltinScalarFunction, Expr, + ExprSchemable, + }; + use std::collections::HashMap; + use std::sync::Arc; #[test] fn test_simplify_or_true() { From 6c5b0289785ee0ec4e11acc49f23eae8af782024 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 09:05:42 -0600 Subject: [PATCH 13/16] rewrite imports for single_distinct_to_groupby.rs --- .../optimizer/single_distinct_to_groupby.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs index 65458c4dba887..d29a2477b125d 100644 --- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs @@ -17,13 +17,14 @@ //! single distinct to group by optimizer rule -use crate::error::Result; -use crate::logical_plan::plan::{Aggregate, Projection}; -use crate::logical_plan::ExprSchemable; -use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan}; -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::optimizer::OptimizerRule; -use datafusion_expr::utils::{columnize_expr, from_plan}; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use datafusion_common::{DFSchema, Result}; +use datafusion_expr::{ + col, + logical_plan::{Aggregate, LogicalPlan, Projection}, + utils::{columnize_expr, from_plan}, + Expr, ExprSchemable, +}; use hashbrown::HashSet; use std::sync::Arc; @@ -200,9 +201,11 @@ impl OptimizerRule for SingleDistinctToGroupBy { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::{col, count, count_distinct, lit, max, LogicalPlanBuilder}; use crate::physical_plan::aggregates; use crate::test::*; + use datafusion_expr::{ + col, count, count_distinct, lit, logical_plan::builder::LogicalPlanBuilder, max, + }; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SingleDistinctToGroupBy::new(); From 26b627b149ff1cb21c5a2114a900443b5c484ba2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 09:08:29 -0600 Subject: [PATCH 14/16] rewrite imports for subquery_filter_to_join.rs --- .../src/optimizer/subquery_filter_to_join.rs | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs index add03c72d5880..bcbd9ae8a73a7 100644 --- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -26,16 +26,18 @@ //! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x' //! ``` //! won't -use std::sync::Arc; - -use crate::error::{DataFusionError, Result}; -use crate::logical_plan::plan::{Filter, Join}; -use crate::logical_plan::{ - build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan, +use crate::optimizer::{ + optimizer::{OptimizerConfig, OptimizerRule}, + utils, +}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::{ + logical_plan::{ + builder::build_join_schema, Filter, Join, JoinConstraint, JoinType, LogicalPlan, + }, + Expr, }; -use crate::optimizer::optimizer::OptimizerConfig; -use crate::optimizer::optimizer::OptimizerRule; -use crate::optimizer::utils; +use std::sync::Arc; /// Optimizer rule for rewriting subquery filters to joins #[derive(Default)] @@ -192,11 +194,11 @@ fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Res #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::{ - and, binary_expr, col, in_subquery, lit, not_in_subquery, or, LogicalPlanBuilder, - Operator, - }; use crate::test::*; + use datafusion_expr::{ + and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, + not_in_subquery, or, Operator, + }; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = SubqueryFilterToJoin::new(); From 507643bf933decf51503ecde0c3b3130197b1512 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 09:11:43 -0600 Subject: [PATCH 15/16] rewrite imports for utils.rs --- datafusion/core/src/optimizer/utils.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 006613328674b..863536972f735 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -17,16 +17,16 @@ //! Collection of utility functions that are leveraged by the query optimizer rules -use super::optimizer::OptimizerRule; -use crate::optimizer::optimizer::OptimizerConfig; -use datafusion_expr::logical_plan::Filter; - -use crate::error::{DataFusionError, Result}; -use crate::logical_plan::{and, Expr, LogicalPlan, Operator}; -use crate::prelude::lit; -use crate::scalar::ScalarValue; -use datafusion_expr::expr::GroupingSet; -use datafusion_expr::utils::from_plan; +use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::{ + and, + expr::GroupingSet, + lit, + logical_plan::{Filter, LogicalPlan}, + utils::from_plan, + Expr, Operator, +}; use std::sync::Arc; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; @@ -363,10 +363,9 @@ pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::col; use arrow::datatypes::DataType; use datafusion_common::Column; - use datafusion_expr::utils::expr_to_columns; + use datafusion_expr::{col, utils::expr_to_columns}; use std::collections::HashSet; #[test] From 8270b2fd6746558051b66906f3c1d2908292b4d4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 31 May 2022 09:13:54 -0600 Subject: [PATCH 16/16] clippy --- datafusion/core/src/execution/context.rs | 2 +- datafusion/core/src/optimizer/optimizer.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index e42dd4296fa98..b484ea59086ae 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1376,7 +1376,7 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let mut optimizer_config = OptimizerConfig::default(); + let mut optimizer_config = OptimizerConfig::new(); optimizer_config.query_execution_start_time = self.execution_props.query_execution_start_time; diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs index 0024528e6ed7d..67f0cf8b6e411 100644 --- a/datafusion/core/src/optimizer/optimizer.rs +++ b/datafusion/core/src/optimizer/optimizer.rs @@ -53,8 +53,11 @@ impl OptimizerConfig { query_execution_start_time: chrono::Utc::now(), } } +} + +impl Default for OptimizerConfig { /// Create optimizer config - pub fn default() -> Self { + fn default() -> Self { Self::new() } }