diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 597507a044a20..ded608941e20b 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -488,6 +488,19 @@ impl DFSchema { }) } + pub fn equivalent_names_and_types_v2(&self, fields: &Vec) -> bool { + if self.fields().len() != fields.len() { + return false; + } + let self_fields = self.fields().iter(); + let other_fields = fields.iter(); + self_fields.zip(other_fields).all(|(f1, f2)| { + f1.qualifier() == f2.qualifier() + && f1.name() == f2.name() + && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) + }) + } + /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint /// than datatype_is_semantically_equal in that a Dictionary type is logically /// equal to a plain V type, but not semantically equal. Dictionary is also diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index da7d6579bfe68..237d557bd6fd2 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -35,6 +35,7 @@ pub mod file_options; pub mod format; pub mod hash_utils; pub mod instant; +pub mod optimize_node; pub mod parsers; pub mod rounding; pub mod scalar; diff --git a/datafusion/common/src/optimize_node.rs b/datafusion/common/src/optimize_node.rs new file mode 100644 index 0000000000000..1cf6cd7464437 --- /dev/null +++ b/datafusion/common/src/optimize_node.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::DataFusionError; + +#[derive(Debug, PartialEq, Eq)] +pub enum OptimizedState { + Yes, + No, + Fail, +} + +#[derive(Debug)] +pub struct Optimized { + pub optimzied_data: Option, + // Used to store the original data if optimized successfully + pub original_data: T, + pub optimized_state: OptimizedState, + // Used to store the error if optimized failed, so we can early return but preserve the original data + pub error: Option, +} + +impl Optimized { + pub fn yes(optimzied_data: T, original_data: T) -> Self { + Self { + optimzied_data: Some(optimzied_data), + original_data, + optimized_state: OptimizedState::Yes, + error: None, + } + } + + pub fn no(original_data: T) -> Self { + Self { + optimzied_data: None, + original_data, + optimized_state: OptimizedState::No, + error: None, + } + } + + pub fn fail(original_data: T, e: E) -> Self { + Self { + optimzied_data: None, + original_data, + optimized_state: OptimizedState::Fail, + error: Some(e), + } + } +} diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 2d653a27c47ba..18c88c8cf68cd 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -469,6 +469,10 @@ impl Transformed { } } + pub fn is_transformed(&self) -> bool { + self.transformed + } + /// Wrapper for transformed data with [`TreeNodeRecursion::Continue`] statement. pub fn yes(data: T) -> Self { Self::new(data, true, TreeNodeRecursion::Continue) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 116e45c8c1302..e2e090242d898 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1875,16 +1875,29 @@ impl SessionState { stringified_plans .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan)); - // optimize the child plan, capturing the output of each optimizer - let optimized_plan = self.optimizer.optimize( - &analyzed_plan, - self, - |optimized_plan, optimizer| { - let optimizer_name = optimizer.name().to_string(); - let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; - stringified_plans.push(optimized_plan.to_stringified(plan_type)); - }, - ); + let optimized_plan = if self.options().optimizer.skip_failed_rules { + self.optimizer.optimize( + &analyzed_plan, + self, + |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + }, + ) + } else { + // optimize the child plan, capturing the output of each optimizer + self.optimizer.optimize_owned( + analyzed_plan, + self, + |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + }, + ) + }; + let (plan, logical_optimization_succeeded) = match optimized_plan { Ok(plan) => (Arc::new(plan), true), Err(DataFusionError::Context(optimizer_name, err)) => { @@ -1904,10 +1917,18 @@ impl SessionState { logical_optimization_succeeded, })) } else { - let analyzed_plan = - self.analyzer - .execute_and_check(plan, self.options(), |_, _| {})?; - self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) + if self.options().optimizer.skip_failed_rules { + let analyzed_plan = + self.analyzer + .execute_and_check(plan, self.options(), |_, _| {})?; + self.optimizer.optimize(&analyzed_plan, self, |_, _| {}) + } else { + let analyzed_plan = + self.analyzer + .execute_and_check(plan, self.options(), |_, _| {})?; + self.optimizer + .optimize_owned(analyzed_plan, self, |_, _| {}) + } } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index fe63766fc265d..31c5085de7eb7 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -48,7 +48,8 @@ use crate::utils::log_plan; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{DFSchemaRef, DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use chrono::{DateTime, Utc}; @@ -85,6 +86,25 @@ pub trait OptimizerRule { fn apply_order(&self) -> Option { None } + + /// does this rule support rewriting owned plans? + fn supports_owned(&self) -> bool { + false + } + + /// if supports_owned returns true, calls try_optimize_owned + fn try_optimize_owned( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + let res = self.try_optimize(&plan, config); + match res { + Ok(Some(new_plan)) => Ok(Transformed::yes(new_plan)), + Ok(None) => Ok(Transformed::no(plan)), + Err(e) => Err(e), + } + } } /// Options to control the DataFusion Optimizer. @@ -357,6 +377,105 @@ impl Optimizer { Ok(new_plan) } + /// Optimizes the logical plan by applying optimizer rules, and + /// invoking observer function after each call + pub fn optimize_owned( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + mut observer: F, + ) -> Result + where + F: FnMut(&LogicalPlan, &dyn OptimizerRule), + { + let options = config.options(); + + let mut cur_plan = plan; + + let start_time = Instant::now(); + + let mut historical_plans = HashSet::with_capacity(16); + historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); + + let mut i = 0; + while i < options.optimizer.max_passes { + log_plan(&format!("Optimizer input (pass {i})"), &cur_plan); + + for rule in &self.rules { + // Copy the plan to preserve the original plan in case the rule fails + let copied_plan: Option = + if options.optimizer.skip_failed_rules { + Some(cur_plan.clone()) + } else { + None + }; + + let orig_schema = cur_plan.schema().clone(); + + let optimized = self.optimize_owned_recursively(rule, cur_plan, config); + match optimized { + Ok(Transformed { + data: plan, + transformed, + tnr: _, + }) => { + if transformed { + // TODO: need to fix this + let schema = plan.schema(); + assert_schema_is_the_same_v2( + rule.name(), + orig_schema, + schema, + )?; + observer(&plan, rule.as_ref()); + log_plan(rule.name(), &plan); + } else { + observer(&plan, rule.as_ref()); + debug!( + "Plan unchanged by optimizer rule '{}' (pass {})", + rule.name(), + i + ); + } + cur_plan = plan; + } + Err(e) => { + if options.optimizer.skip_failed_rules { + // Note to future readers: if you see this warning it signals a + // bug in the DataFusion optimizer. Please consider filing a ticket + // https://github.com/apache/arrow-datafusion + warn!( + "Skipping optimizer rule '{}' due to unexpected error: {}", + rule.name(), + e + ); + cur_plan = copied_plan.unwrap(); + } else { + return Err(DataFusionError::Context( + format!("Optimizer rule '{}' failed", rule.name(),), + Box::new(e), + )); + } + } + } + } + log_plan(&format!("Optimized plan (pass {i})"), &cur_plan); + + // HashSet::insert returns, whether the value was newly inserted. + let plan_is_fresh = + historical_plans.insert(LogicalPlanSignature::new(&cur_plan)); + if !plan_is_fresh { + // plan did not change, so no need to continue trying to optimize + debug!("optimizer pass {} did not make changes", i); + break; + } + i += 1; + } + log_plan("Final optimized plan", &cur_plan); + debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); + Ok(cur_plan) + } + fn optimize_node( &self, rule: &Arc, @@ -367,6 +486,16 @@ impl Optimizer { rule.try_optimize(plan, config) } + fn optimize_owned_node( + &self, + rule: &Arc, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + // TODO: future feature: We can do Batch optimize + rule.try_optimize_owned(plan, config) + } + fn optimize_inputs( &self, rule: &Arc, @@ -395,6 +524,36 @@ impl Optimizer { plan.with_new_exprs(exprs, new_inputs).map(Some) } + fn optimize_owned_inputs( + &self, + rule: &Arc, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + let inputs = plan.inputs(); + let result = inputs + .iter() + .map(|sub_plan| self.optimize_recursively(rule, sub_plan, config)) + .collect::>>()?; + + if result.is_empty() || result.iter().all(|o| o.is_none()) { + return Ok(Transformed::no(plan)); + } + + let new_inputs = result + .into_iter() + .zip(inputs) + .map(|(new_plan, old_plan)| match new_plan { + Some(plan) => plan, + None => old_plan.clone(), + }) + .collect(); + + // TODO: build the new plan based on the expressions deconstructed from the old plan + let exprs = plan.expressions(); + plan.with_new_exprs(exprs, new_inputs).map(Transformed::yes) + } + /// Use a rule to optimize the whole plan. /// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule. pub fn optimize_recursively( @@ -429,6 +588,71 @@ impl Optimizer { _ => rule.try_optimize(plan, config), } } + + /// Use a rule to optimize the whole plan. + /// If the rule with `ApplyOrder`, we don't need to recursively handle children in rule. + pub fn optimize_owned_recursively( + &self, + rule: &Arc, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + match rule.apply_order() { + Some(order) => match order { + ApplyOrder::TopDown => { + let mut is_transformed = false; + let Transformed { + data: node, + transformed, + tnr: _, + } = self.optimize_owned_node(rule, plan, config)?; + if transformed { + is_transformed = true; + } + let Transformed { + data: inputs, + transformed, + tnr: _, + } = self.optimize_owned_inputs(rule, node, config)?; + if transformed { + is_transformed = true; + } + + if is_transformed { + Ok(Transformed::yes(inputs)) + } else { + Ok(Transformed::no(inputs)) + } + } + ApplyOrder::BottomUp => { + let mut is_transformed = false; + let Transformed { + data: inputs, + transformed, + tnr: _, + } = self.optimize_owned_inputs(rule, plan, config)?; + if transformed { + is_transformed = true; + } + let Transformed { + data: node, + transformed, + tnr: _, + } = self.optimize_owned_node(rule, inputs, config)?; + if transformed { + is_transformed = true; + } + + if is_transformed { + Ok(Transformed::yes(node)) + } else { + Ok(Transformed::no(node)) + } + } + }, + _ => rule.try_optimize_owned(plan, config), + } + } } /// Returns an error if plans have different schemas. @@ -458,6 +682,28 @@ pub(crate) fn assert_schema_is_the_same( } } +pub(crate) fn assert_schema_is_the_same_v2( + rule_name: &str, + orig_schema: DFSchemaRef, + new_schema: &DFSchemaRef, +) -> Result<()> { + let equivalent = orig_schema.equivalent_names_and_types(new_schema); + + if !equivalent { + let e = DataFusionError::Internal(format!( + "Failed due to a difference in schemas, original fields: {:?}, new fields: {:?}", + orig_schema.fields(), + new_schema.fields() + )); + Err(DataFusionError::Context( + String::from(rule_name), + Box::new(e), + )) + } else { + Ok(()) + } +} + #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 70b163acc2082..2c1be6fac04f1 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,11 +19,12 @@ use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use datafusion_common::{tree_node::Transformed, DFSchema, DFSchemaRef, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::merge_schema; +use datafusion_expr::{Aggregate, Expr, Projection}; use crate::{OptimizerConfig, OptimizerRule}; @@ -59,6 +60,21 @@ impl OptimizerRule for SimplifyExpressions { execution_props.query_execution_start_time = config.query_execution_start_time(); Ok(Some(Self::optimize_internal(plan, &execution_props)?)) } + + fn supports_owned(&self) -> bool { + true + } + + fn try_optimize_owned( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + let mut execution_props = ExecutionProps::new(); + execution_props.query_execution_start_time = config.query_execution_start_time(); + + Self::optimize_internal_owned(plan, &execution_props) + } } impl SimplifyExpressions { @@ -122,6 +138,135 @@ impl SimplifyExpressions { plan.with_new_exprs(exprs, new_inputs) } + + fn optimize_internal_owned( + plan: LogicalPlan, + execution_props: &ExecutionProps, + ) -> Result> { + let schema = if !plan.inputs().is_empty() { + DFSchemaRef::new(merge_schema(plan.inputs())) + } else if let LogicalPlan::TableScan(scan) = &plan { + // When predicates are pushed into a table scan, there is no input + // schema to resolve predicates against, so it must be handled specially + // + // Note that this is not `plan.schema()` which is the *output* + // schema, and reflects any pushed down projection. The output schema + // will not contain columns that *only* appear in pushed down predicates + // (and no where else) in the plan. + // + // Thus, use the full schema of the inner provider without any + // projection applied for simplification + + let schema = DFSchema::try_from_qualified_schema( + &scan.table_name, + &scan.source.schema(), + )?; + + Arc::new(schema) + } else { + Arc::new(DFSchema::empty()) + }; + + let info = SimplifyContext::new(execution_props).with_schema(schema); + + let simplifier = ExprSimplifier::new(info); + + // The left and right expressions in a Join on clause are not + // commutative, for reasons that are not entirely clear. Thus, do not + // reorder expressions in Join while simplifying. + // + // This is likely related to the fact that order of the columns must + // match the order of the children. see + // https://github.com/apache/arrow-datafusion/pull/8780 for more details + let simplifier = if matches!(plan, LogicalPlan::Join(_)) { + simplifier.with_canonicalize(false) + } else { + simplifier + }; + + fn simplify_expr( + simplifier: ExprSimplifier>, + exprs: Vec, + ) -> Result> { + exprs + .into_iter() + .map(|e| { + // TODO: unify with `rewrite_preserving_name` + let original_name = e.name_for_alias()?; + let new_e = simplifier.simplify(e)?; + new_e.alias_if_changed(original_name) + }) + .collect::>>() + } + + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + // `into_inner` fails if more than one clone is created + // 'skip failed rule' path fail because we need to clone before optimization and it fails because of `into_inner` + let input = Arc::into_inner(input).unwrap(); + let Transformed { + data, + transformed: _, + tnr: _, + } = Self::optimize_internal_owned(input, execution_props)?; + + let new_input = Arc::new(data); + let new_expr = simplify_expr(simplifier, expr)?; + Projection::try_new(new_expr, new_input) + .map(LogicalPlan::Projection) + .map(Transformed::yes) + } + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + .. + }) => { + let input = Arc::into_inner(input).unwrap(); + let Transformed { + data, + transformed: _, + tnr: _, + } = Self::optimize_internal_owned(input, execution_props)?; + + let new_input = Arc::new(data); + let group_expr_len = group_expr.len(); + let expr = group_expr + .into_iter() + .chain(aggr_expr.into_iter()) + .collect(); + let new_expr = simplify_expr(simplifier, expr)?; + + // group exprs are the first expressions + let mut group_expr = new_expr; + let agg_expr = group_expr.split_off(group_expr_len); + + Aggregate::try_new(new_input, group_expr, agg_expr) + .map(LogicalPlan::Aggregate) + .map(Transformed::yes) + } + _ => { + let new_inputs = plan + .inputs() + .iter() + .map(|input| Self::optimize_internal(input, execution_props)) + .collect::>>()?; + + let exprs = plan + .expressions() + .into_iter() + .map(|e| { + // TODO: unify with `rewrite_preserving_name` + let original_name = e.name_for_alias()?; + let new_e = simplifier.simplify(e)?; + new_e.alias_if_changed(original_name) + }) + .collect::>>()?; + + plan.with_new_exprs(exprs, new_inputs).map(Transformed::yes) + } + } + } } impl SimplifyExpressions {