From 4acd283203eb0aae50312a3ebbedcecd4c0fd240 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 10:32:11 +0800 Subject: [PATCH 01/12] rewrite project Signed-off-by: jayzhan211 --- datafusion/expr/src/logical_plan/plan.rs | 157 ++++++++++++++++++ datafusion/optimizer/src/optimizer.rs | 17 +- .../simplify_expressions/simplify_exprs.rs | 122 +++++++++++++- 3 files changed, 294 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 05d7ac5394585..dd6c1e5152bb6 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -369,6 +369,97 @@ impl LogicalPlan { } } + pub fn rewrite_expressions(self, mut f: F) -> Result> + where + F: FnMut(Expr) -> Result, + { + match self { + LogicalPlan::Projection(Projection { expr, .. }) => { + let rewrited_expr = + expr.into_iter().map(|e| f(e)).collect::>>()?; + Ok(rewrited_expr) + // projection.expr = rewrited_expr; + // let input = projection.input + // Ok(LogicalPlan::Projection(projection)) + } + _ => unimplemented!(""), + // LogicalPlan::Values(Values { values, .. }) => { + // values.iter().flatten().try_for_each(f) + // } + // LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate), + // LogicalPlan::Repartition(Repartition { + // partitioning_scheme, + // .. + // }) => match partitioning_scheme { + // Partitioning::Hash(expr, _) => expr.iter().try_for_each(f), + // Partitioning::DistributeBy(expr) => expr.iter().try_for_each(f), + // Partitioning::RoundRobinBatch(_) => Ok(()), + // }, + // LogicalPlan::Window(Window { window_expr, .. }) => { + // window_expr.iter().try_for_each(f) + // } + // LogicalPlan::Aggregate(Aggregate { + // group_expr, + // aggr_expr, + // .. + // }) => group_expr.iter().chain(aggr_expr.iter()).try_for_each(f), + // // There are two part of expression for join, equijoin(on) and non-equijoin(filter). + // // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. + // // 2. the second part is non-equijoin(filter). + // LogicalPlan::Join(Join { on, filter, .. }) => { + // on.iter() + // // it not ideal to create an expr here to analyze them, but could cache it on the Join itself + // .map(|(l, r)| Expr::eq(l.clone(), r.clone())) + // .try_for_each(|e| f(&e))?; + + // if let Some(filter) = filter.as_ref() { + // f(filter) + // } else { + // Ok(()) + // } + // } + // LogicalPlan::Sort(Sort { expr, .. }) => expr.iter().try_for_each(f), + // LogicalPlan::Extension(extension) => { + // // would be nice to avoid this copy -- maybe can + // // update extension to just observer Exprs + // extension.node.expressions().iter().try_for_each(f) + // } + // LogicalPlan::TableScan(TableScan { filters, .. }) => { + // filters.iter().try_for_each(f) + // } + // LogicalPlan::Unnest(Unnest { column, .. }) => { + // f(&Expr::Column(column.clone())) + // } + // LogicalPlan::Distinct(Distinct::On(DistinctOn { + // on_expr, + // select_expr, + // sort_expr, + // .. + // })) => on_expr + // .iter() + // .chain(select_expr.iter()) + // .chain(sort_expr.clone().unwrap_or(vec![]).iter()) + // .try_for_each(f), + // // plans without expressions + // LogicalPlan::EmptyRelation(_) + // | LogicalPlan::RecursiveQuery(_) + // | LogicalPlan::Subquery(_) + // | LogicalPlan::SubqueryAlias(_) + // | LogicalPlan::Limit(_) + // | LogicalPlan::Statement(_) + // | LogicalPlan::CrossJoin(_) + // | LogicalPlan::Analyze(_) + // | LogicalPlan::Explain(_) + // | LogicalPlan::Union(_) + // | LogicalPlan::Distinct(Distinct::All(_)) + // | LogicalPlan::Dml(_) + // | LogicalPlan::Ddl(_) + // | LogicalPlan::Copy(_) + // | LogicalPlan::DescribeTable(_) + // | LogicalPlan::Prepare(_) => Ok(()), + } + } + /// returns all inputs of this `LogicalPlan` node. Does not /// include inputs to inputs, or subqueries. pub fn inputs(&self) -> Vec<&LogicalPlan> { @@ -412,6 +503,72 @@ impl LogicalPlan { } } + /// returns all inputs of this `LogicalPlan` node. Does not + /// include inputs to inputs, or subqueries. + pub fn owned_inputs(self) -> Vec { + match self { + LogicalPlan::Projection(Projection { input, .. }) + | LogicalPlan::Filter(Filter { input, .. }) + | LogicalPlan::Repartition(Repartition { input, .. }) + | LogicalPlan::Window(Window { input, .. }) + | LogicalPlan::Aggregate(Aggregate { input, .. }) + | LogicalPlan::Limit(Limit { input, .. }) + | LogicalPlan::Sort(Sort { input, .. }) + | LogicalPlan::Distinct( + Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), + ) + | LogicalPlan::Unnest(Unnest { input, .. }) + | LogicalPlan::Prepare(Prepare { input, .. }) + | LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { + vec![Arc::into_inner(input).unwrap()] + } + LogicalPlan::Join(Join { left, right, .. }) + | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![ + Arc::into_inner(left).unwrap(), + Arc::into_inner(right).unwrap(), + ], + LogicalPlan::Subquery(Subquery { subquery, .. }) => { + vec![Arc::into_inner(subquery).unwrap()] + } + LogicalPlan::Extension(extension) => { + let inputs = extension.node.inputs(); + inputs.into_iter().map(|p| p.clone()).collect() + } + LogicalPlan::Union(Union { inputs, .. }) => inputs + .iter() + .map(|arc| { + let p = Arc::into_inner(arc.to_owned()).unwrap(); + p + }) + .collect(), + LogicalPlan::Explain(explain) => vec![Arc::into_inner(explain.plan).unwrap()], + LogicalPlan::Analyze(analyze) => { + vec![Arc::into_inner(analyze.input).unwrap()] + } + LogicalPlan::Dml(write) => vec![Arc::into_inner(write.input).unwrap()], + LogicalPlan::Copy(copy) => vec![Arc::into_inner(copy.input).unwrap()], + LogicalPlan::Ddl(ddl) => { + let inputs = ddl.inputs(); + inputs.into_iter().map(|p| p.clone()).collect() + } + + LogicalPlan::RecursiveQuery(RecursiveQuery { + static_term, + recursive_term, + .. + }) => vec![ + Arc::into_inner(static_term).unwrap(), + Arc::into_inner(recursive_term).unwrap(), + ], + // plans without inputs + LogicalPlan::TableScan { .. } + | LogicalPlan::Statement { .. } + | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::Values { .. } + | LogicalPlan::DescribeTable(_) => vec![], + } + } + /// returns all `Using` join columns in a logical plan pub fn using_columns(&self) -> Result>, DataFusionError> { let mut using_columns: Vec> = vec![]; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index fe63766fc265d..94dc7a14b08d8 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,7 +48,8 @@ use crate::utils::log_plan; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; @@ -85,6 +86,20 @@ pub trait OptimizerRule { fn apply_order(&self) -> Option { None } + + /// does this rule support rewriting owned plans? + fn supports_owned(&self) -> bool { + return false; + } + + /// if supports_owned returns true, calls try_optimize_owned + fn try_optimize_owned( + &self, + _plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + not_impl_err!("try_optimized_owned is not implemented for this rule") + } } /// Options to control the DataFusion Optimizer. diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 70b163acc2082..18876d1154617 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,11 +19,12 @@ use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use datafusion_common::{tree_node::Transformed, DFSchema, DFSchemaRef, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::merge_schema; +use datafusion_expr::{Expr, Projection}; use crate::{OptimizerConfig, OptimizerRule}; @@ -59,6 +60,18 @@ impl OptimizerRule for SimplifyExpressions { execution_props.query_execution_start_time = config.query_execution_start_time(); Ok(Some(Self::optimize_internal(plan, &execution_props)?)) } + + fn supports_owned(&self) -> bool { + true + } + + fn try_optimize_owned( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + todo!("todo") + } } impl SimplifyExpressions { @@ -122,6 +135,113 @@ impl SimplifyExpressions { plan.with_new_exprs(exprs, new_inputs) } + + fn optimize_internal_owned( + plan: LogicalPlan, + execution_props: &ExecutionProps, + ) -> Result { + let schema = if !plan.inputs().is_empty() { + DFSchemaRef::new(merge_schema(plan.inputs())) + } else if let LogicalPlan::TableScan(scan) = &plan { + // When predicates are pushed into a table scan, there is no input + // schema to resolve predicates against, so it must be handled specially + // + // Note that this is not `plan.schema()` which is the *output* + // schema, and reflects any pushed down projection. The output schema + // will not contain columns that *only* appear in pushed down predicates + // (and no where else) in the plan. + // + // Thus, use the full schema of the inner provider without any + // projection applied for simplification + Arc::new(DFSchema::try_from_qualified_schema( + &scan.table_name, + &scan.source.schema(), + )?) + } else { + Arc::new(DFSchema::empty()) + }; + let info = SimplifyContext::new(execution_props).with_schema(schema); + + // let new_inputs = plan + // .inputs() + // .iter() + // .map(|input| Self::optimize_internal(input, execution_props)) + // .collect::>>()?; + + let simplifier = ExprSimplifier::new(info); + + // The left and right expressions in a Join on clause are not + // commutative, for reasons that are not entirely clear. Thus, do not + // reorder expressions in Join while simplifying. + // + // This is likely related to the fact that order of the columns must + // match the order of the children. see + // https://github.com/apache/arrow-datafusion/pull/8780 for more details + let simplifier = if matches!(plan, LogicalPlan::Join(_)) { + simplifier.with_canonicalize(false) + } else { + simplifier + }; + + fn simplify_expr( + simplifier: ExprSimplifier>, + exprs: Vec, + ) -> Result> { + exprs + .into_iter() + .map(|e| { + // TODO: unify with `rewrite_preserving_name` + let original_name = e.name_for_alias()?; + let new_e = simplifier.simplify(e)?; + new_e.alias_if_changed(original_name) + }) + .collect::>>() + } + + match plan { + LogicalPlan::Projection(Projection { + expr, + input, + .. + }) => { + // fails if more than one arc is created + let input = Arc::into_inner(input).unwrap(); + let new_input = Self::optimize_internal_owned(input, execution_props)?; + let new_expr = simplify_expr(simplifier, expr)?; + Projection::try_new(new_expr, Arc::new(new_input)) + .map(LogicalPlan::Projection) + } + _ => { + let new_inputs = plan + .inputs() + .iter() + .map(|input| Self::optimize_internal(input, execution_props)) + .collect::>>()?; + let exprs = plan + .expressions() + .into_iter() + .map(|e| { + // TODO: unify with `rewrite_preserving_name` + let original_name = e.name_for_alias()?; + let new_e = simplifier.simplify(e)?; + new_e.alias_if_changed(original_name) + }) + .collect::>>()?; + + plan.with_new_exprs(exprs, new_inputs) + } + } + // let new_inputs = plan.owned_inputs().into_iter().map(|input| { + // Self::optimize_internal_owned(input, execution_props) + // }).collect::>>()?; + + // let exprs = plan.rewrite_expressions(|e| { + // // TODO: unify with `rewrite_preserving_name` + // let original_name = e.name_for_alias()?; + // let new_e = simplifier.simplify(e)?; + // new_e.alias_if_changed(original_name) + // })?; + } } impl SimplifyExpressions { From caee1e9eceb478a318f95579efc39b2585c5a485 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 15:32:21 +0800 Subject: [PATCH 02/12] backup failed Signed-off-by: jayzhan211 --- datafusion/common/src/lib.rs | 1 + datafusion/common/src/optimize_node.rs | 61 ++++++ datafusion/common/src/tree_node.rs | 4 + datafusion/expr/src/logical_plan/plan.rs | 157 --------------- datafusion/optimizer/src/optimizer.rs | 184 ++++++++++++++---- .../simplify_expressions/simplify_exprs.rs | 97 +++++---- 6 files changed, 277 insertions(+), 227 deletions(-) create mode 100644 datafusion/common/src/optimize_node.rs diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index da7d6579bfe68..b906c5921417c 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -41,6 +41,7 @@ pub mod scalar; pub mod stats; pub mod test_util; pub mod tree_node; +pub mod optimize_node; pub mod utils; /// Reexport arrow crate diff --git a/datafusion/common/src/optimize_node.rs b/datafusion/common/src/optimize_node.rs new file mode 100644 index 0000000000000..4056cdbfdefdf --- /dev/null +++ b/datafusion/common/src/optimize_node.rs @@ -0,0 +1,61 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::DataFusionError; + +#[derive(Debug, PartialEq, Eq)] +pub enum OptimizedState { + Yes, + No, + Fail, +} + +#[derive(Debug)] +pub struct Optimized { + pub data: T, + pub optimized_state: OptimizedState, + pub error: Option, +} + +impl Optimized { + /// Create a new `Transformed` object with the given information. + pub fn new(data: T, optimized_state: OptimizedState) -> Self { + Self { + data, + optimized_state, + error: None, + } + } + + + pub fn yes(data: T) -> Self { + Self::new(data, OptimizedState::Yes) + } + + pub fn no(data: T) -> Self { + Self::new(data, OptimizedState::No) + } + + pub fn fail(data: T, e: E) -> Self { + Self { + data, + optimized_state: OptimizedState::Fail, + error: Some(e), + } + } +} \ No newline at end of file diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 2d653a27c47ba..18c88c8cf68cd 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -469,6 +469,10 @@ impl Transformed { } } + pub fn is_transformed(&self) -> bool { + self.transformed + } + /// Wrapper for transformed data with [`TreeNodeRecursion::Continue`] statement. pub fn yes(data: T) -> Self { Self::new(data, true, TreeNodeRecursion::Continue) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index dd6c1e5152bb6..05d7ac5394585 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -369,97 +369,6 @@ impl LogicalPlan { } } - pub fn rewrite_expressions(self, mut f: F) -> Result> - where - F: FnMut(Expr) -> Result, - { - match self { - LogicalPlan::Projection(Projection { expr, .. }) => { - let rewrited_expr = - expr.into_iter().map(|e| f(e)).collect::>>()?; - Ok(rewrited_expr) - // projection.expr = rewrited_expr; - // let input = projection.input - // Ok(LogicalPlan::Projection(projection)) - } - _ => unimplemented!(""), - // LogicalPlan::Values(Values { values, .. }) => { - // values.iter().flatten().try_for_each(f) - // } - // LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate), - // LogicalPlan::Repartition(Repartition { - // partitioning_scheme, - // .. - // }) => match partitioning_scheme { - // Partitioning::Hash(expr, _) => expr.iter().try_for_each(f), - // Partitioning::DistributeBy(expr) => expr.iter().try_for_each(f), - // Partitioning::RoundRobinBatch(_) => Ok(()), - // }, - // LogicalPlan::Window(Window { window_expr, .. }) => { - // window_expr.iter().try_for_each(f) - // } - // LogicalPlan::Aggregate(Aggregate { - // group_expr, - // aggr_expr, - // .. - // }) => group_expr.iter().chain(aggr_expr.iter()).try_for_each(f), - // // There are two part of expression for join, equijoin(on) and non-equijoin(filter). - // // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. - // // 2. the second part is non-equijoin(filter). - // LogicalPlan::Join(Join { on, filter, .. }) => { - // on.iter() - // // it not ideal to create an expr here to analyze them, but could cache it on the Join itself - // .map(|(l, r)| Expr::eq(l.clone(), r.clone())) - // .try_for_each(|e| f(&e))?; - - // if let Some(filter) = filter.as_ref() { - // f(filter) - // } else { - // Ok(()) - // } - // } - // LogicalPlan::Sort(Sort { expr, .. }) => expr.iter().try_for_each(f), - // LogicalPlan::Extension(extension) => { - // // would be nice to avoid this copy -- maybe can - // // update extension to just observer Exprs - // extension.node.expressions().iter().try_for_each(f) - // } - // LogicalPlan::TableScan(TableScan { filters, .. }) => { - // filters.iter().try_for_each(f) - // } - // LogicalPlan::Unnest(Unnest { column, .. }) => { - // f(&Expr::Column(column.clone())) - // } - // LogicalPlan::Distinct(Distinct::On(DistinctOn { - // on_expr, - // select_expr, - // sort_expr, - // .. - // })) => on_expr - // .iter() - // .chain(select_expr.iter()) - // .chain(sort_expr.clone().unwrap_or(vec![]).iter()) - // .try_for_each(f), - // // plans without expressions - // LogicalPlan::EmptyRelation(_) - // | LogicalPlan::RecursiveQuery(_) - // | LogicalPlan::Subquery(_) - // | LogicalPlan::SubqueryAlias(_) - // | LogicalPlan::Limit(_) - // | LogicalPlan::Statement(_) - // | LogicalPlan::CrossJoin(_) - // | LogicalPlan::Analyze(_) - // | LogicalPlan::Explain(_) - // | LogicalPlan::Union(_) - // | LogicalPlan::Distinct(Distinct::All(_)) - // | LogicalPlan::Dml(_) - // | LogicalPlan::Ddl(_) - // | LogicalPlan::Copy(_) - // | LogicalPlan::DescribeTable(_) - // | LogicalPlan::Prepare(_) => Ok(()), - } - } - /// returns all inputs of this `LogicalPlan` node. Does not /// include inputs to inputs, or subqueries. pub fn inputs(&self) -> Vec<&LogicalPlan> { @@ -503,72 +412,6 @@ impl LogicalPlan { } } - /// returns all inputs of this `LogicalPlan` node. Does not - /// include inputs to inputs, or subqueries. - pub fn owned_inputs(self) -> Vec { - match self { - LogicalPlan::Projection(Projection { input, .. }) - | LogicalPlan::Filter(Filter { input, .. }) - | LogicalPlan::Repartition(Repartition { input, .. }) - | LogicalPlan::Window(Window { input, .. }) - | LogicalPlan::Aggregate(Aggregate { input, .. }) - | LogicalPlan::Limit(Limit { input, .. }) - | LogicalPlan::Sort(Sort { input, .. }) - | LogicalPlan::Distinct( - Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), - ) - | LogicalPlan::Unnest(Unnest { input, .. }) - | LogicalPlan::Prepare(Prepare { input, .. }) - | LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { - vec![Arc::into_inner(input).unwrap()] - } - LogicalPlan::Join(Join { left, right, .. }) - | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![ - Arc::into_inner(left).unwrap(), - Arc::into_inner(right).unwrap(), - ], - LogicalPlan::Subquery(Subquery { subquery, .. }) => { - vec![Arc::into_inner(subquery).unwrap()] - } - LogicalPlan::Extension(extension) => { - let inputs = extension.node.inputs(); - inputs.into_iter().map(|p| p.clone()).collect() - } - LogicalPlan::Union(Union { inputs, .. }) => inputs - .iter() - .map(|arc| { - let p = Arc::into_inner(arc.to_owned()).unwrap(); - p - }) - .collect(), - LogicalPlan::Explain(explain) => vec![Arc::into_inner(explain.plan).unwrap()], - LogicalPlan::Analyze(analyze) => { - vec![Arc::into_inner(analyze.input).unwrap()] - } - LogicalPlan::Dml(write) => vec![Arc::into_inner(write.input).unwrap()], - LogicalPlan::Copy(copy) => vec![Arc::into_inner(copy.input).unwrap()], - LogicalPlan::Ddl(ddl) => { - let inputs = ddl.inputs(); - inputs.into_iter().map(|p| p.clone()).collect() - } - - LogicalPlan::RecursiveQuery(RecursiveQuery { - static_term, - recursive_term, - .. - }) => vec![ - Arc::into_inner(static_term).unwrap(), - Arc::into_inner(recursive_term).unwrap(), - ], - // plans without inputs - LogicalPlan::TableScan { .. } - | LogicalPlan::Statement { .. } - | LogicalPlan::EmptyRelation { .. } - | LogicalPlan::Values { .. } - | LogicalPlan::DescribeTable(_) => vec![], - } - } - /// returns all `Using` join columns in a logical plan pub fn using_columns(&self) -> Result>, DataFusionError> { let mut using_columns: Vec> = vec![]; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 94dc7a14b08d8..70fbc5f4add65 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,11 +48,13 @@ use crate::utils::log_plan; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; +use datafusion_common::optimize_node::{Optimized, OptimizedState}; use datafusion_common::tree_node::Transformed; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, DFSchema, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; +use datafusion_expr::{EmptyRelation, Projection}; use log::{debug, warn}; /// `OptimizerRule` transforms one [`LogicalPlan`] into another which @@ -95,10 +97,17 @@ pub trait OptimizerRule { /// if supports_owned returns true, calls try_optimize_owned fn try_optimize_owned( &self, - _plan: LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> Result, DataFusionError> { - not_impl_err!("try_optimized_owned is not implemented for this rule") + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Optimized { + let res = self.try_optimize(&plan, config); + match res { + Ok(Some(plan)) => Optimized::yes(plan), + Ok(None) => Optimized::no(plan), + Err(e) => { + Optimized::fail(plan, e) + } + } } } @@ -302,41 +311,53 @@ impl Optimizer { F: FnMut(&LogicalPlan, &dyn OptimizerRule), { let options = config.options(); - let mut new_plan = plan.clone(); + let mut cur_plan = plan.clone(); let start_time = Instant::now(); - let mut previous_plans = HashSet::with_capacity(16); - previous_plans.insert(LogicalPlanSignature::new(&new_plan)); + let mut historical_plans = HashSet::with_capacity(16); + historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); let mut i = 0; while i < options.optimizer.max_passes { - log_plan(&format!("Optimizer input (pass {i})"), &new_plan); + // log_plan(&format!("Optimizer input (pass {i})"), &new_plan); for rule in &self.rules { - let result = - self.optimize_recursively(rule, &new_plan, config) - .and_then(|plan| { - if let Some(plan) = &plan { - assert_schema_is_the_same(rule.name(), plan, &new_plan)?; - } - Ok(plan) - }); - match result { - Ok(Some(plan)) => { - new_plan = plan; - observer(&new_plan, rule.as_ref()); - log_plan(rule.name(), &new_plan); + let optimized = self.optimize_owned_recursively(rule, cur_plan, config); + match optimized { + Optimized { + data: plan, + optimized_state: OptimizedState::Yes, + error, + } => { + // TODO: need to fix this + // assert_schema_is_the_same(rule.name(), &new_plan, &plan)?; + observer(&plan, rule.as_ref()); + log_plan(rule.name(), &plan); + + cur_plan = plan; } - Ok(None) => { - observer(&new_plan, rule.as_ref()); + Optimized { + data: plan, + optimized_state: OptimizedState::No, + error, + } => { + observer(&plan, rule.as_ref()); debug!( "Plan unchanged by optimizer rule '{}' (pass {})", rule.name(), i ); + + cur_plan = plan; } - Err(e) => { + Optimized { + data: plan, + optimized_state: OptimizedState::Fail, + error, + } => { + let e = error.unwrap(); + if options.optimizer.skip_failed_rules { // Note to future readers: if you see this warning it signals a // bug in the DataFusion optimizer. Please consider filing a ticket @@ -346,6 +367,8 @@ impl Optimizer { rule.name(), e ); + + cur_plan = plan; } else { return Err(DataFusionError::Context( format!("Optimizer rule '{}' failed", rule.name(),), @@ -355,21 +378,21 @@ impl Optimizer { } } } - log_plan(&format!("Optimized plan (pass {i})"), &new_plan); - - // HashSet::insert returns, whether the value was newly inserted. - let plan_is_fresh = - previous_plans.insert(LogicalPlanSignature::new(&new_plan)); - if !plan_is_fresh { - // plan did not change, so no need to continue trying to optimize - debug!("optimizer pass {} did not make changes", i); - break; - } + // log_plan(&format!("Optimized plan (pass {i})"), &cur_plan); + + // // HashSet::insert returns, whether the value was newly inserted. + // let plan_is_fresh = + // historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); + // if !plan_is_fresh { + // // plan did not change, so no need to continue trying to optimize + // debug!("optimizer pass {} did not make changes", i); + // break; + // } i += 1; } - log_plan("Final optimized plan", &new_plan); + // log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); - Ok(new_plan) + Ok(cur_plan) } fn optimize_node( @@ -382,6 +405,16 @@ impl Optimizer { rule.try_optimize(plan, config) } + fn optimize_owned_node( + &self, + rule: &Arc, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Optimized { + // TODO: future feature: We can do Batch optimize + rule.try_optimize_owned(plan, config) + } + fn optimize_inputs( &self, rule: &Arc, @@ -410,6 +443,46 @@ impl Optimizer { plan.with_new_exprs(exprs, new_inputs).map(Some) } + fn optimize_owned_inputs( + &self, + rule: &Arc, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Optimized { + let inputs = plan.inputs(); + let result = inputs + .iter() + .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config)) + .collect::>>(); + + if result.is_err() { + return Optimized::fail(plan, result.err().unwrap()); + } + let result = result.unwrap(); + + if result.is_empty() || result.iter().all(|o| o.is_none()) { + return Optimized::no(plan); + } + + let new_inputs = result + .into_iter() + .zip(inputs) + .map(|(new_plan, old_plan)| match new_plan { + Some(plan) => plan, + None => old_plan.clone(), + }) + .collect(); + + let exprs = plan.expressions(); + let new_plan = plan.with_new_exprs(exprs, new_inputs); + if new_plan.is_err() { + return Optimized::fail(plan, new_plan.err().unwrap()); + } + let new_plan = new_plan.unwrap(); + + Optimized::yes(new_plan) + } + /// Use a rule to optimize the whole plan. /// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule. pub fn optimize_recursively( @@ -444,6 +517,43 @@ impl Optimizer { _ => rule.try_optimize(plan, config), } } + + /// Use a rule to optimize the whole plan. + /// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule. + pub fn optimize_owned_recursively( + &self, + rule: &Arc, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Optimized { + match rule.apply_order() { + Some(order) => match order { + ApplyOrder::TopDown => { + let res = self.optimize_owned_node(rule, plan, config); + match res { + Optimized { data: node, optimized_state: OptimizedState::Yes | OptimizedState::No, error } => { + self.optimize_owned_inputs(rule, node, config) + } + _ => { + res + } + } + } + ApplyOrder::BottomUp => { + let res = self.optimize_owned_inputs(rule, plan, config); + match res { + Optimized { data: node, optimized_state: OptimizedState::Yes | OptimizedState::No, error } => { + self.optimize_owned_node(rule, node, config) + } + _ => { + res + } + } + } + }, + _ => rule.try_optimize_owned(plan, config), + } + } } /// Returns an error if plans have different schemas. diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 18876d1154617..950b48ec8742c 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use datafusion_common::optimize_node::{Optimized, OptimizedState}; use datafusion_common::{tree_node::Transformed, DFSchema, DFSchemaRef, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; @@ -69,8 +70,11 @@ impl OptimizerRule for SimplifyExpressions { &self, plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Result> { - todo!("todo") + ) -> Optimized { + let mut execution_props = ExecutionProps::new(); + execution_props.query_execution_start_time = config.query_execution_start_time(); + + Self::optimize_internal_owned(plan, &execution_props) } } @@ -139,7 +143,7 @@ impl SimplifyExpressions { fn optimize_internal_owned( plan: LogicalPlan, execution_props: &ExecutionProps, - ) -> Result { + ) -> Optimized { let schema = if !plan.inputs().is_empty() { DFSchemaRef::new(merge_schema(plan.inputs())) } else if let LogicalPlan::TableScan(scan) = &plan { @@ -153,20 +157,24 @@ impl SimplifyExpressions { // // Thus, use the full schema of the inner provider without any // projection applied for simplification - Arc::new(DFSchema::try_from_qualified_schema( + + let schema = DFSchema::try_from_qualified_schema( &scan.table_name, &scan.source.schema(), - )?) + ); + + if schema.is_err() { + return Optimized::fail(plan, schema.unwrap_err()); + } + + let schema = schema.unwrap(); + + Arc::new(schema) } else { Arc::new(DFSchema::empty()) }; - let info = SimplifyContext::new(execution_props).with_schema(schema); - // let new_inputs = plan - // .inputs() - // .iter() - // .map(|input| Self::optimize_internal(input, execution_props)) - // .collect::>>()?; + let info = SimplifyContext::new(execution_props).with_schema(schema); let simplifier = ExprSimplifier::new(info); @@ -199,24 +207,45 @@ impl SimplifyExpressions { } match plan { - LogicalPlan::Projection(Projection { - expr, - input, - .. - }) => { + LogicalPlan::Projection(Projection { expr, input, schema }) => { // fails if more than one arc is created let input = Arc::into_inner(input).unwrap(); - let new_input = Self::optimize_internal_owned(input, execution_props)?; - let new_expr = simplify_expr(simplifier, expr)?; - Projection::try_new(new_expr, Arc::new(new_input)) - .map(LogicalPlan::Projection) + let Optimized { data: new_input, optimized_state, error } = Self::optimize_internal_owned(input, execution_props); + + if optimized_state == OptimizedState::Fail { + return Optimized::fail(plan, error.unwrap()); + } + + let new_expr = simplify_expr(simplifier, expr); + if new_expr.is_err() { + return Optimized::fail(plan, new_expr.unwrap_err()); + } + let new_expr = new_expr.unwrap(); + let res = Projection::try_new(new_expr, Arc::new(new_input)) + .map(LogicalPlan::Projection); + if res.is_err() { + let restored_plan = LogicalPlan::Projection(Projection { + expr, + input: Arc::new(new_input), + schema, + }); + + return Optimized::fail(plan, res.unwrap_err()); + } + + Optimized::yes(res.unwrap()) } _ => { let new_inputs = plan .inputs() .iter() .map(|input| Self::optimize_internal(input, execution_props)) - .collect::>>()?; + .collect::>>(); + + if new_inputs.is_err() { + return Optimized::fail(plan, new_inputs.unwrap_err()); + } + let exprs = plan .expressions() .into_iter() @@ -226,21 +255,23 @@ impl SimplifyExpressions { let new_e = simplifier.simplify(e)?; new_e.alias_if_changed(original_name) }) - .collect::>>()?; + .collect::>>(); + + if exprs.is_err() { + return Optimized::fail(plan, exprs.unwrap_err()); + } + + let new_inputs = new_inputs.unwrap(); + let exprs = exprs.unwrap(); + + let res = plan.with_new_exprs(exprs, new_inputs); + if res.is_err() { + return Optimized::fail(plan, res.unwrap_err()); + } - plan.with_new_exprs(exprs, new_inputs) + Optimized::yes(res.unwrap()) } } - // let new_inputs = plan.owned_inputs().into_iter().map(|input| { - // Self::optimize_internal_owned(input, execution_props) - // }).collect::>>()?; - - // let exprs = plan.rewrite_expressions(|e| { - // // TODO: unify with `rewrite_preserving_name` - // let original_name = e.name_for_alias()?; - // let new_e = simplifier.simplify(e)?; - // new_e.alias_if_changed(original_name) - // })?; } } From bdd7f3ebd464b866fddcea2ef9da16ee0003e230 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 19:50:06 +0800 Subject: [PATCH 03/12] backup Signed-off-by: jayzhan211 --- datafusion/common/src/optimize_node.rs | 32 +++++++++++-------- datafusion/optimizer/src/optimizer.rs | 2 +- .../simplify_expressions/simplify_exprs.rs | 26 +++++++++------ 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/datafusion/common/src/optimize_node.rs b/datafusion/common/src/optimize_node.rs index 4056cdbfdefdf..e13841d6d935e 100644 --- a/datafusion/common/src/optimize_node.rs +++ b/datafusion/common/src/optimize_node.rs @@ -27,33 +27,37 @@ pub enum OptimizedState { #[derive(Debug)] pub struct Optimized { - pub data: T, + pub optimzied_data: Option, + // Used to store the original data if optimized successfully + pub original_data: T, pub optimized_state: OptimizedState, + // Used to store the error if optimized failed, so we can early return but preserve the original data pub error: Option, } impl Optimized { - /// Create a new `Transformed` object with the given information. - pub fn new(data: T, optimized_state: OptimizedState) -> Self { + pub fn yes(optimzied_data: T, original_data: T) -> Self { Self { - data, - optimized_state, + optimzied_data: Some(optimzied_data), + original_data, + optimized_state: OptimizedState::Yes, error: None, } } - - pub fn yes(data: T) -> Self { - Self::new(data, OptimizedState::Yes) - } - - pub fn no(data: T) -> Self { - Self::new(data, OptimizedState::No) + pub fn no(original_data: T) -> Self { + Self { + optimzied_data: None, + original_data, + optimized_state: OptimizedState::No, + error: None, + } } - pub fn fail(data: T, e: E) -> Self { + pub fn fail(original_data: T, e: E) -> Self { Self { - data, + optimzied_data: None, + original_data, optimized_state: OptimizedState::Fail, error: Some(e), } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 70fbc5f4add65..463918200f817 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -102,7 +102,7 @@ pub trait OptimizerRule { ) -> Optimized { let res = self.try_optimize(&plan, config); match res { - Ok(Some(plan)) => Optimized::yes(plan), + Ok(Some(new_plan)) => Optimized::yes(new_plan, plan), Ok(None) => Optimized::no(plan), Err(e) => { Optimized::fail(plan, e) diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 950b48ec8742c..a22c41cb0a036 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -207,33 +207,41 @@ impl SimplifyExpressions { } match plan { - LogicalPlan::Projection(Projection { expr, input, schema }) => { + LogicalPlan::Projection(Projection { expr, input, .. }) => { // fails if more than one arc is created let input = Arc::into_inner(input).unwrap(); - let Optimized { data: new_input, optimized_state, error } = Self::optimize_internal_owned(input, execution_props); + let Optimized { optimzied_data: new_input, original_data, optimized_state, error } = Self::optimize_internal_owned(input, execution_props); if optimized_state == OptimizedState::Fail { return Optimized::fail(plan, error.unwrap()); } + let new_input = if optimized_state == OptimizedState::Yes { + Arc::new(new_input.unwrap()) + } else { + Arc::new(original_data) + }; + let new_expr = simplify_expr(simplifier, expr); if new_expr.is_err() { return Optimized::fail(plan, new_expr.unwrap_err()); } let new_expr = new_expr.unwrap(); - let res = Projection::try_new(new_expr, Arc::new(new_input)) + let res = Projection::try_new(new_expr, new_input) .map(LogicalPlan::Projection); if res.is_err() { - let restored_plan = LogicalPlan::Projection(Projection { - expr, - input: Arc::new(new_input), - schema, - }); + // let restored_plan = LogicalPlan::Projection(Projection { + // expr, + // input: Arc::new(new_input), + // schema, + // }); return Optimized::fail(plan, res.unwrap_err()); } + + let new_plan = res.unwrap(); - Optimized::yes(res.unwrap()) + Optimized::yes(new_plan, plan) } _ => { let new_inputs = plan From d29edeacb5f187de10ef062229de48fd6a25d8a9 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 21:03:40 +0800 Subject: [PATCH 04/12] fix error Signed-off-by: jayzhan211 --- datafusion/common/src/lib.rs | 2 +- datafusion/common/src/optimize_node.rs | 3 +- datafusion/core/src/execution/context/mod.rs | 52 +++- datafusion/optimizer/src/optimizer.rs | 266 ++++++++++++------ .../simplify_expressions/simplify_exprs.rs | 88 ++---- 5 files changed, 252 insertions(+), 159 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index b906c5921417c..237d557bd6fd2 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -35,13 +35,13 @@ pub mod file_options; pub mod format; pub mod hash_utils; pub mod instant; +pub mod optimize_node; pub mod parsers; pub mod rounding; pub mod scalar; pub mod stats; pub mod test_util; pub mod tree_node; -pub mod optimize_node; pub mod utils; /// Reexport arrow crate diff --git a/datafusion/common/src/optimize_node.rs b/datafusion/common/src/optimize_node.rs index e13841d6d935e..1cf6cd7464437 100644 --- a/datafusion/common/src/optimize_node.rs +++ b/datafusion/common/src/optimize_node.rs @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -62,4 +61,4 @@ impl Optimized { error: Some(e), } } -} \ No newline at end of file +} diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 116e45c8c1302..955982a6e636c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1875,16 +1875,29 @@ impl SessionState { stringified_plans .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan)); - // optimize the child plan, capturing the output of each optimizer - let optimized_plan = self.optimizer.optimize( - &analyzed_plan, - self, - |optimized_plan, optimizer| { - let optimizer_name = optimizer.name().to_string(); - let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; - stringified_plans.push(optimized_plan.to_stringified(plan_type)); - }, - ); + let optimized_plan = if self.options().optimizer.skip_failed_rules { + self.optimizer.optimize( + &analyzed_plan, + self, + |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + }, + ) + } else { + // optimize the child plan, capturing the output of each optimizer + self.optimizer.optimize_owned( + analyzed_plan, + self, + |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + }, + ) + }; + let (plan, logical_optimization_succeeded) = match optimized_plan { Ok(plan) => (Arc::new(plan), true), Err(DataFusionError::Context(optimizer_name, err)) => { @@ -1904,10 +1917,21 @@ impl SessionState { logical_optimization_succeeded, })) } else { - let analyzed_plan = - self.analyzer - .execute_and_check(plan, self.options(), |_, _| {})?; - self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) + + if self.options().optimizer.skip_failed_rules { + let analyzed_plan = + self.analyzer + .execute_and_check(plan, self.options(), |_, _| {})?; + self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) + + } else { + + let analyzed_plan = + self.analyzer + .execute_and_check(plan, self.options(), |_, _| {})?; + self.optimizer.optimize_owned(analyzed_plan, self, |_, _| {}) + } + } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 463918200f817..e49b89671e835 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -99,14 +99,12 @@ pub trait OptimizerRule { &self, plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Optimized { + ) -> Result> { let res = self.try_optimize(&plan, config); match res { - Ok(Some(new_plan)) => Optimized::yes(new_plan, plan), - Ok(None) => Optimized::no(plan), - Err(e) => { - Optimized::fail(plan, e) - } + Ok(Some(new_plan)) => Ok(Transformed::yes(new_plan)), + Ok(None) => Ok(Transformed::no(plan)), + Err(e) => Err(e), } } } @@ -311,53 +309,140 @@ impl Optimizer { F: FnMut(&LogicalPlan, &dyn OptimizerRule), { let options = config.options(); - let mut cur_plan = plan.clone(); + let mut new_plan = plan.clone(); let start_time = Instant::now(); - let mut historical_plans = HashSet::with_capacity(16); - historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); + let mut previous_plans = HashSet::with_capacity(16); + previous_plans.insert(LogicalPlanSignature::new(&new_plan)); let mut i = 0; while i < options.optimizer.max_passes { - // log_plan(&format!("Optimizer input (pass {i})"), &new_plan); + log_plan(&format!("Optimizer input (pass {i})"), &new_plan); for rule in &self.rules { - let optimized = self.optimize_owned_recursively(rule, cur_plan, config); - match optimized { - Optimized { - data: plan, - optimized_state: OptimizedState::Yes, - error, - } => { - // TODO: need to fix this - // assert_schema_is_the_same(rule.name(), &new_plan, &plan)?; - observer(&plan, rule.as_ref()); - log_plan(rule.name(), &plan); - - cur_plan = plan; + let result = + self.optimize_recursively(rule, &new_plan, config) + .and_then(|plan| { + if let Some(plan) = &plan { + assert_schema_is_the_same(rule.name(), plan, &new_plan)?; + } + Ok(plan) + }); + match result { + Ok(Some(plan)) => { + new_plan = plan; + observer(&new_plan, rule.as_ref()); + log_plan(rule.name(), &new_plan); } - Optimized { - data: plan, - optimized_state: OptimizedState::No, - error, - } => { - observer(&plan, rule.as_ref()); + Ok(None) => { + observer(&new_plan, rule.as_ref()); debug!( "Plan unchanged by optimizer rule '{}' (pass {})", rule.name(), i ); + } + Err(e) => { + if options.optimizer.skip_failed_rules { + // Note to future readers: if you see this warning it signals a + // bug in the DataFusion optimizer. Please consider filing a ticket + // https://github.com/apache/arrow-datafusion + warn!( + "Skipping optimizer rule '{}' due to unexpected error: {}", + rule.name(), + e + ); + } else { + return Err(DataFusionError::Context( + format!("Optimizer rule '{}' failed", rule.name(),), + Box::new(e), + )); + } + } + } + } + log_plan(&format!("Optimized plan (pass {i})"), &new_plan); + + // HashSet::insert returns, whether the value was newly inserted. + let plan_is_fresh = + previous_plans.insert(LogicalPlanSignature::new(&new_plan)); + if !plan_is_fresh { + // plan did not change, so no need to continue trying to optimize + debug!("optimizer pass {} did not make changes", i); + break; + } + i += 1; + } + log_plan("Final optimized plan", &new_plan); + debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); + Ok(new_plan) + } + + /// Optimizes the logical plan by applying optimizer rules, and + /// invoking observer function after each call + pub fn optimize_owned( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + mut observer: F, + ) -> Result + where + F: FnMut(&LogicalPlan, &dyn OptimizerRule), + { + let options = config.options(); + + // match plan { + // LogicalPlan::Projection(Projection { expr, input, .. }) => { + // let strong_cnt = Arc::strong_count(input); + // println!("strong count 1: {}", strong_cnt); + // } + // _ => {} + // } + + let mut cur_plan = plan; + + let start_time = Instant::now(); + + let mut historical_plans = HashSet::with_capacity(16); + historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); + let mut i = 0; + while i < options.optimizer.max_passes { + log_plan(&format!("Optimizer input (pass {i})"), &cur_plan); + + for rule in &self.rules { + // Copy the plan to preserve the original plan in case the rule fails + let copied_plan: Option = + if options.optimizer.skip_failed_rules { + Some(cur_plan.clone()) + } else { + None + }; + + let optimized = self.optimize_owned_recursively(rule, cur_plan, config); + match optimized { + Ok(Transformed { + data: plan, + transformed, + tnr, + }) => { + if transformed { + // TODO: need to fix this + // assert_schema_is_the_same(rule.name(), &new_plan, &plan)?; + observer(&plan, rule.as_ref()); + log_plan(rule.name(), &plan); + } else { + observer(&plan, rule.as_ref()); + debug!( + "Plan unchanged by optimizer rule '{}' (pass {})", + rule.name(), + i + ); + } cur_plan = plan; } - Optimized { - data: plan, - optimized_state: OptimizedState::Fail, - error, - } => { - let e = error.unwrap(); - + Err(e) => { if options.optimizer.skip_failed_rules { // Note to future readers: if you see this warning it signals a // bug in the DataFusion optimizer. Please consider filing a ticket @@ -367,8 +452,7 @@ impl Optimizer { rule.name(), e ); - - cur_plan = plan; + cur_plan = copied_plan.unwrap(); } else { return Err(DataFusionError::Context( format!("Optimizer rule '{}' failed", rule.name(),), @@ -378,19 +462,19 @@ impl Optimizer { } } } - // log_plan(&format!("Optimized plan (pass {i})"), &cur_plan); - - // // HashSet::insert returns, whether the value was newly inserted. - // let plan_is_fresh = - // historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); - // if !plan_is_fresh { - // // plan did not change, so no need to continue trying to optimize - // debug!("optimizer pass {} did not make changes", i); - // break; - // } + log_plan(&format!("Optimized plan (pass {i})"), &cur_plan); + + // HashSet::insert returns, whether the value was newly inserted. + let plan_is_fresh = + historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); + if !plan_is_fresh { + // plan did not change, so no need to continue trying to optimize + debug!("optimizer pass {} did not make changes", i); + break; + } i += 1; } - // log_plan("Final optimized plan", &new_plan); + log_plan("Final optimized plan", &cur_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); Ok(cur_plan) } @@ -410,7 +494,7 @@ impl Optimizer { rule: &Arc, plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Optimized { + ) -> Result> { // TODO: future feature: We can do Batch optimize rule.try_optimize_owned(plan, config) } @@ -448,20 +532,15 @@ impl Optimizer { rule: &Arc, plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Optimized { + ) -> Result> { let inputs = plan.inputs(); let result = inputs .iter() .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config)) - .collect::>>(); - - if result.is_err() { - return Optimized::fail(plan, result.err().unwrap()); - } - let result = result.unwrap(); + .collect::>>()?; if result.is_empty() || result.iter().all(|o| o.is_none()) { - return Optimized::no(plan); + return Ok(Transformed::no(plan)); } let new_inputs = result @@ -473,14 +552,9 @@ impl Optimizer { }) .collect(); + // TODO: build the new plan based on the expressions deconstructed from the old plan let exprs = plan.expressions(); - let new_plan = plan.with_new_exprs(exprs, new_inputs); - if new_plan.is_err() { - return Optimized::fail(plan, new_plan.err().unwrap()); - } - let new_plan = new_plan.unwrap(); - - Optimized::yes(new_plan) + plan.with_new_exprs(exprs, new_inputs).map(Transformed::yes) } /// Use a rule to optimize the whole plan. @@ -525,29 +599,57 @@ impl Optimizer { rule: &Arc, plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Optimized { + ) -> Result> { match rule.apply_order() { Some(order) => match order { ApplyOrder::TopDown => { - let res = self.optimize_owned_node(rule, plan, config); - match res { - Optimized { data: node, optimized_state: OptimizedState::Yes | OptimizedState::No, error } => { - self.optimize_owned_inputs(rule, node, config) - } - _ => { - res - } + let mut is_transformed = false; + let Transformed { + data: node, + transformed, + tnr, + } = self.optimize_owned_node(rule, plan, config)?; + if transformed { + is_transformed = true; + } + let Transformed { + data: inputs, + transformed, + tnr, + } = self.optimize_owned_inputs(rule, node, config)?; + if transformed { + is_transformed = true; + } + + if is_transformed { + Ok(Transformed::yes(inputs)) + } else { + Ok(Transformed::no(inputs)) } } ApplyOrder::BottomUp => { - let res = self.optimize_owned_inputs(rule, plan, config); - match res { - Optimized { data: node, optimized_state: OptimizedState::Yes | OptimizedState::No, error } => { - self.optimize_owned_node(rule, node, config) - } - _ => { - res - } + let mut is_transformed = false; + let Transformed { + data: inputs, + transformed, + tnr, + } = self.optimize_owned_inputs(rule, plan, config)?; + if transformed { + is_transformed = true; + } + let Transformed { + data: node, + transformed, + tnr, + } = self.optimize_owned_node(rule, inputs, config)?; + if transformed { + is_transformed = true; + } + + if is_transformed { + Ok(Transformed::yes(node)) + } else { + Ok(Transformed::no(node)) } } }, diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index a22c41cb0a036..56925dd89ccfa 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -70,7 +70,7 @@ impl OptimizerRule for SimplifyExpressions { &self, plan: LogicalPlan, config: &dyn OptimizerConfig, - ) -> Optimized { + ) -> Result> { let mut execution_props = ExecutionProps::new(); execution_props.query_execution_start_time = config.query_execution_start_time(); @@ -143,7 +143,15 @@ impl SimplifyExpressions { fn optimize_internal_owned( plan: LogicalPlan, execution_props: &ExecutionProps, - ) -> Optimized { + ) -> Result> { + // match plan { + // LogicalPlan::Projection(Projection { ref expr, ref input, .. }) => { + // let strong_cnt = Arc::strong_count(input); + // println!("strong count 2: {}", strong_cnt); + // } + // _ => {} + // } + let schema = if !plan.inputs().is_empty() { DFSchemaRef::new(merge_schema(plan.inputs())) } else if let LogicalPlan::TableScan(scan) = &plan { @@ -161,13 +169,7 @@ impl SimplifyExpressions { let schema = DFSchema::try_from_qualified_schema( &scan.table_name, &scan.source.schema(), - ); - - if schema.is_err() { - return Optimized::fail(plan, schema.unwrap_err()); - } - - let schema = schema.unwrap(); + )?; Arc::new(schema) } else { @@ -208,51 +210,29 @@ impl SimplifyExpressions { match plan { LogicalPlan::Projection(Projection { expr, input, .. }) => { + // println!("strong count: {}", Arc::strong_count(&input)); // fails if more than one arc is created let input = Arc::into_inner(input).unwrap(); - let Optimized { optimzied_data: new_input, original_data, optimized_state, error } = Self::optimize_internal_owned(input, execution_props); - - if optimized_state == OptimizedState::Fail { - return Optimized::fail(plan, error.unwrap()); - } - - let new_input = if optimized_state == OptimizedState::Yes { - Arc::new(new_input.unwrap()) - } else { - Arc::new(original_data) - }; - - let new_expr = simplify_expr(simplifier, expr); - if new_expr.is_err() { - return Optimized::fail(plan, new_expr.unwrap_err()); - } - let new_expr = new_expr.unwrap(); - let res = Projection::try_new(new_expr, new_input) - .map(LogicalPlan::Projection); - if res.is_err() { - // let restored_plan = LogicalPlan::Projection(Projection { - // expr, - // input: Arc::new(new_input), - // schema, - // }); - - return Optimized::fail(plan, res.unwrap_err()); - } - - let new_plan = res.unwrap(); - - Optimized::yes(new_plan, plan) + // let input = input.; + // let input = Arc::try_unwrap(input) + let Transformed { + data, + transformed, + tnr, + } = Self::optimize_internal_owned(input, execution_props)?; + + let new_input = Arc::new(data); + let new_expr = simplify_expr(simplifier, expr)?; + Projection::try_new(new_expr, new_input) + .map(LogicalPlan::Projection) + .map(Transformed::yes) } _ => { let new_inputs = plan .inputs() .iter() .map(|input| Self::optimize_internal(input, execution_props)) - .collect::>>(); - - if new_inputs.is_err() { - return Optimized::fail(plan, new_inputs.unwrap_err()); - } + .collect::>>()?; let exprs = plan .expressions() @@ -263,21 +243,9 @@ impl SimplifyExpressions { let new_e = simplifier.simplify(e)?; new_e.alias_if_changed(original_name) }) - .collect::>>(); - - if exprs.is_err() { - return Optimized::fail(plan, exprs.unwrap_err()); - } - - let new_inputs = new_inputs.unwrap(); - let exprs = exprs.unwrap(); - - let res = plan.with_new_exprs(exprs, new_inputs); - if res.is_err() { - return Optimized::fail(plan, res.unwrap_err()); - } + .collect::>>()?; - Optimized::yes(res.unwrap()) + plan.with_new_exprs(exprs, new_inputs).map(Transformed::yes) } } } From f5331dae6828a456aff749a147d552dad3e98a9c Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 21:41:17 +0800 Subject: [PATCH 05/12] add schema check back Signed-off-by: jayzhan211 --- datafusion/core/src/execution/context/mod.rs | 7 +-- datafusion/optimizer/src/optimizer.rs | 45 +++++++++++++++----- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 955982a6e636c..e2e090242d898 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1917,21 +1917,18 @@ impl SessionState { logical_optimization_succeeded, })) } else { - if self.options().optimizer.skip_failed_rules { let analyzed_plan = self.analyzer .execute_and_check(plan, self.options(), |_, _| {})?; self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) - } else { - let analyzed_plan = self.analyzer .execute_and_check(plan, self.options(), |_, _| {})?; - self.optimizer.optimize_owned(analyzed_plan, self, |_, _| {}) + self.optimizer + .optimize_owned(analyzed_plan, self, |_, _| {}) } - } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index e49b89671e835..ccb3b3238e03c 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -50,7 +50,9 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; use datafusion_common::optimize_node::{Optimized, OptimizedState}; use datafusion_common::tree_node::Transformed; -use datafusion_common::{not_impl_err, DFSchema, DataFusionError, Result}; +use datafusion_common::{ + not_impl_err, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, +}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; @@ -392,14 +394,6 @@ impl Optimizer { { let options = config.options(); - // match plan { - // LogicalPlan::Projection(Projection { expr, input, .. }) => { - // let strong_cnt = Arc::strong_count(input); - // println!("strong count 1: {}", strong_cnt); - // } - // _ => {} - // } - let mut cur_plan = plan; let start_time = Instant::now(); @@ -420,16 +414,23 @@ impl Optimizer { None }; + let cur_schema = cur_plan.schema().clone(); + let optimized = self.optimize_owned_recursively(rule, cur_plan, config); match optimized { Ok(Transformed { data: plan, transformed, - tnr, + tnr: _, }) => { if transformed { // TODO: need to fix this - // assert_schema_is_the_same(rule.name(), &new_plan, &plan)?; + let schema = plan.schema(); + assert_schema_is_the_same_v2( + rule.name(), + cur_schema.as_ref(), + schema, + )?; observer(&plan, rule.as_ref()); log_plan(rule.name(), &plan); } else { @@ -685,6 +686,28 @@ pub(crate) fn assert_schema_is_the_same( } } +pub(crate) fn assert_schema_is_the_same_v2( + rule_name: &str, + prev_schema: &DFSchema, + new_schema: &DFSchemaRef, +) -> Result<()> { + let equivalent = prev_schema.equivalent_names_and_types(new_schema); + + if !equivalent { + let e = DataFusionError::Internal(format!( + "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", + prev_schema, + new_schema + )); + Err(DataFusionError::Context( + String::from(rule_name), + Box::new(e), + )) + } else { + Ok(()) + } +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; From 5fe77ccf69e85f061f810ae14548101331af0acc Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 21:50:13 +0800 Subject: [PATCH 06/12] fix clippy Signed-off-by: jayzhan211 --- datafusion/optimizer/src/optimizer.rs | 14 +++++--------- .../src/simplify_expressions/simplify_exprs.rs | 18 ++++-------------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index ccb3b3238e03c..0d9fbe010d121 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,15 +48,11 @@ use crate::utils::log_plan; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::optimize_node::{Optimized, OptimizedState}; use datafusion_common::tree_node::Transformed; -use datafusion_common::{ - not_impl_err, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, -}; +use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; -use datafusion_expr::{EmptyRelation, Projection}; use log::{debug, warn}; /// `OptimizerRule` transforms one [`LogicalPlan`] into another which @@ -93,7 +89,7 @@ pub trait OptimizerRule { /// does this rule support rewriting owned plans? fn supports_owned(&self) -> bool { - return false; + false } /// if supports_owned returns true, calls try_optimize_owned @@ -608,7 +604,7 @@ impl Optimizer { let Transformed { data: node, transformed, - tnr, + tnr: _, } = self.optimize_owned_node(rule, plan, config)?; if transformed { is_transformed = true; @@ -616,7 +612,7 @@ impl Optimizer { let Transformed { data: inputs, transformed, - tnr, + tnr: _, } = self.optimize_owned_inputs(rule, node, config)?; if transformed { is_transformed = true; @@ -633,7 +629,7 @@ impl Optimizer { let Transformed { data: inputs, transformed, - tnr, + tnr: _, } = self.optimize_owned_inputs(rule, plan, config)?; if transformed { is_transformed = true; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 56925dd89ccfa..318f1b4990bf5 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -144,14 +144,6 @@ impl SimplifyExpressions { plan: LogicalPlan, execution_props: &ExecutionProps, ) -> Result> { - // match plan { - // LogicalPlan::Projection(Projection { ref expr, ref input, .. }) => { - // let strong_cnt = Arc::strong_count(input); - // println!("strong count 2: {}", strong_cnt); - // } - // _ => {} - // } - let schema = if !plan.inputs().is_empty() { DFSchemaRef::new(merge_schema(plan.inputs())) } else if let LogicalPlan::TableScan(scan) = &plan { @@ -210,15 +202,13 @@ impl SimplifyExpressions { match plan { LogicalPlan::Projection(Projection { expr, input, .. }) => { - // println!("strong count: {}", Arc::strong_count(&input)); - // fails if more than one arc is created + // `into_inner` fails if more than one clone is created + // 'skip failed rule' path fail because we need to clone before optimization and it fails because of `into_inner` let input = Arc::into_inner(input).unwrap(); - // let input = input.; - // let input = Arc::try_unwrap(input) let Transformed { data, - transformed, - tnr, + transformed: _, + tnr: _, } = Self::optimize_internal_owned(input, execution_props)?; let new_input = Arc::new(data); From 1c3ef1898bb4db2d605f92e45f320de1dff440d0 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 22:06:24 +0800 Subject: [PATCH 07/12] fmt Signed-off-by: jayzhan211 --- datafusion/optimizer/src/optimizer.rs | 2 +- datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 0d9fbe010d121..8ebd46682c0d7 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -637,7 +637,7 @@ impl Optimizer { let Transformed { data: node, transformed, - tnr, + tnr: _, } = self.optimize_owned_node(rule, inputs, config)?; if transformed { is_transformed = true; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 318f1b4990bf5..321d01c55d74b 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,7 +19,6 @@ use std::sync::Arc; -use datafusion_common::optimize_node::{Optimized, OptimizedState}; use datafusion_common::{tree_node::Transformed, DFSchema, DFSchemaRef, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; From 738ab45e1980a0bfff7c4cbc15ebfb426601cdbe Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 22:17:06 +0800 Subject: [PATCH 08/12] aggregate Signed-off-by: jayzhan211 --- .../simplify_expressions/simplify_exprs.rs | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 321d01c55d74b..127fb3e147829 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -24,7 +24,7 @@ use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::merge_schema; -use datafusion_expr::{Expr, Projection}; +use datafusion_expr::{Aggregate, Expr, Projection}; use crate::{OptimizerConfig, OptimizerRule}; @@ -216,6 +216,27 @@ impl SimplifyExpressions { .map(LogicalPlan::Projection) .map(Transformed::yes) } + LogicalPlan::Aggregate(Aggregate { input, group_expr, aggr_expr, .. }) => { + let input = Arc::into_inner(input).unwrap(); + let Transformed { + data, + transformed: _, + tnr: _, + } = Self::optimize_internal_owned(input, execution_props)?; + + let new_input = Arc::new(data); + let group_expr_len = group_expr.len(); + let expr = group_expr.into_iter().chain(aggr_expr.into_iter()).collect(); + let new_expr = simplify_expr(simplifier, expr)?; + + // group exprs are the first expressions + let mut group_expr = new_expr; + let agg_expr = group_expr.split_off(group_expr_len); + + Aggregate::try_new(new_input, group_expr, agg_expr) + .map(LogicalPlan::Aggregate) + .map(Transformed::yes) + } _ => { let new_inputs = plan .inputs() From d4d9bfb32c8cc82d0e7b64199e55f79f2a6bdf17 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Mar 2024 22:35:54 +0800 Subject: [PATCH 09/12] clone fields only Signed-off-by: jayzhan211 --- datafusion/common/src/dfschema.rs | 13 +++++++++++++ datafusion/optimizer/src/optimizer.rs | 16 ++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 597507a044a20..ded608941e20b 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -488,6 +488,19 @@ impl DFSchema { }) } + pub fn equivalent_names_and_types_v2(&self, fields: &Vec) -> bool { + if self.fields().len() != fields.len() { + return false; + } + let self_fields = self.fields().iter(); + let other_fields = fields.iter(); + self_fields.zip(other_fields).all(|(f1, f2)| { + f1.qualifier() == f2.qualifier() + && f1.name() == f2.name() + && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) + }) + } + /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint /// than datatype_is_semantically_equal in that a Dictionary type is logically /// equal to a plain V type, but not semantically equal. Dictionary is also diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 8ebd46682c0d7..1bd6a4ba90d37 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -49,7 +49,7 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; use datafusion_common::tree_node::Transformed; -use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result}; +use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; @@ -410,7 +410,7 @@ impl Optimizer { None }; - let cur_schema = cur_plan.schema().clone(); + let cur_fields = cur_plan.schema().fields().clone(); let optimized = self.optimize_owned_recursively(rule, cur_plan, config); match optimized { @@ -424,7 +424,7 @@ impl Optimizer { let schema = plan.schema(); assert_schema_is_the_same_v2( rule.name(), - cur_schema.as_ref(), + &cur_fields, schema, )?; observer(&plan, rule.as_ref()); @@ -684,16 +684,16 @@ pub(crate) fn assert_schema_is_the_same( pub(crate) fn assert_schema_is_the_same_v2( rule_name: &str, - prev_schema: &DFSchema, + prev_fields: &Vec, new_schema: &DFSchemaRef, ) -> Result<()> { - let equivalent = prev_schema.equivalent_names_and_types(new_schema); + let equivalent = new_schema.equivalent_names_and_types_v2(prev_fields); if !equivalent { let e = DataFusionError::Internal(format!( - "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", - prev_schema, - new_schema + "Failed due to a difference in schemas, original fields: {:?}, new fields: {:?}", + prev_fields, + new_schema.fields() )); Err(DataFusionError::Context( String::from(rule_name), From ab0127513e5777c43326a0e7fabe9aee9d5adb40 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Mon, 18 Mar 2024 07:50:49 +0800 Subject: [PATCH 10/12] optimize exprlist to fields Signed-off-by: jayzhan211 --- datafusion/expr/src/utils.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index c7907d0db16ab..b3d882ae89c5f 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -737,7 +737,7 @@ fn agg_cols(agg: &Aggregate) -> Vec { .collect() } -fn exprlist_to_fields_aggregate(exprs: &[Expr], agg: &Aggregate) -> Result> { +fn exprlist_to_fields_aggregate(exprs: &[&Expr], agg: &Aggregate) -> Result> { let agg_cols = agg_cols(agg); let mut fields = vec![]; for expr in exprs { @@ -754,12 +754,14 @@ fn exprlist_to_fields_aggregate(exprs: &[Expr], agg: &Aggregate) -> Result Result> { + let exprs = expr.into_iter().collect::>(); + // when dealing with aggregate plans we cannot simply look in the aggregate output schema // because it will contain columns representing complex expressions (such a column named // `GROUPING(person.state)` so in order to resolve `person.state` in this case we need to // look at the input to the aggregate instead. let fields = match plan { - LogicalPlan::Aggregate(agg) => Some(exprlist_to_fields_aggregate(exprs, agg)), + LogicalPlan::Aggregate(agg) => Some(exprlist_to_fields_aggregate(exprs.as_slice(), agg)), _ => None, }; if let Some(fields) = fields { From 3e063e9c2cfdeaba30ee8dea03a84b5249077489 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 21 Mar 2024 08:04:12 +0800 Subject: [PATCH 11/12] fix rebase Signed-off-by: jayzhan211 --- datafusion/expr/src/utils.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index b3d882ae89c5f..c7907d0db16ab 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -737,7 +737,7 @@ fn agg_cols(agg: &Aggregate) -> Vec { .collect() } -fn exprlist_to_fields_aggregate(exprs: &[&Expr], agg: &Aggregate) -> Result> { +fn exprlist_to_fields_aggregate(exprs: &[Expr], agg: &Aggregate) -> Result> { let agg_cols = agg_cols(agg); let mut fields = vec![]; for expr in exprs { @@ -754,14 +754,12 @@ fn exprlist_to_fields_aggregate(exprs: &[&Expr], agg: &Aggregate) -> Result Result> { - let exprs = expr.into_iter().collect::>(); - // when dealing with aggregate plans we cannot simply look in the aggregate output schema // because it will contain columns representing complex expressions (such a column named // `GROUPING(person.state)` so in order to resolve `person.state` in this case we need to // look at the input to the aggregate instead. let fields = match plan { - LogicalPlan::Aggregate(agg) => Some(exprlist_to_fields_aggregate(exprs.as_slice(), agg)), + LogicalPlan::Aggregate(agg) => Some(exprlist_to_fields_aggregate(exprs, agg)), _ => None, }; if let Some(fields) = fields { From cc3a6b464e63a6b52822d7e1bd0f4a51e2a0c617 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 21 Mar 2024 08:23:45 +0800 Subject: [PATCH 12/12] use schema instead of fields Signed-off-by: jayzhan211 --- datafusion/optimizer/src/optimizer.rs | 12 ++++++------ .../src/simplify_expressions/simplify_exprs.rs | 12 ++++++++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1bd6a4ba90d37..31c5085de7eb7 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -49,7 +49,7 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; use datafusion_common::tree_node::Transformed; -use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result}; +use datafusion_common::{DFSchemaRef, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; @@ -410,7 +410,7 @@ impl Optimizer { None }; - let cur_fields = cur_plan.schema().fields().clone(); + let orig_schema = cur_plan.schema().clone(); let optimized = self.optimize_owned_recursively(rule, cur_plan, config); match optimized { @@ -424,7 +424,7 @@ impl Optimizer { let schema = plan.schema(); assert_schema_is_the_same_v2( rule.name(), - &cur_fields, + orig_schema, schema, )?; observer(&plan, rule.as_ref()); @@ -684,15 +684,15 @@ pub(crate) fn assert_schema_is_the_same( pub(crate) fn assert_schema_is_the_same_v2( rule_name: &str, - prev_fields: &Vec, + orig_schema: DFSchemaRef, new_schema: &DFSchemaRef, ) -> Result<()> { - let equivalent = new_schema.equivalent_names_and_types_v2(prev_fields); + let equivalent = orig_schema.equivalent_names_and_types(new_schema); if !equivalent { let e = DataFusionError::Internal(format!( "Failed due to a difference in schemas, original fields: {:?}, new fields: {:?}", - prev_fields, + orig_schema.fields(), new_schema.fields() )); Err(DataFusionError::Context( diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 127fb3e147829..2c1be6fac04f1 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -216,7 +216,12 @@ impl SimplifyExpressions { .map(LogicalPlan::Projection) .map(Transformed::yes) } - LogicalPlan::Aggregate(Aggregate { input, group_expr, aggr_expr, .. }) => { + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + .. + }) => { let input = Arc::into_inner(input).unwrap(); let Transformed { data, @@ -226,7 +231,10 @@ impl SimplifyExpressions { let new_input = Arc::new(data); let group_expr_len = group_expr.len(); - let expr = group_expr.into_iter().chain(aggr_expr.into_iter()).collect(); + let expr = group_expr + .into_iter() + .chain(aggr_expr.into_iter()) + .collect(); let new_expr = simplify_expr(simplifier, expr)?; // group exprs are the first expressions