From d65ac7b16787f7fc24fa0219f6e50185488a80fd Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 26 Oct 2022 23:39:51 +0800 Subject: [PATCH] draft: refactor optimizer to avoid every rule must recursive children. --- datafusion/expr/src/logical_plan/plan.rs | 61 +++++++++++++++++++++++- datafusion/optimizer/src/merge_filter.rs | 53 ++++++++++++++++++++ datafusion/optimizer/src/optimizer.rs | 50 ++++++++++++++++++- 3 files changed, 161 insertions(+), 3 deletions(-) create mode 100644 datafusion/optimizer/src/merge_filter.rs diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index cb3f7f97df547..f1d659f4a9857 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -342,6 +342,53 @@ impl LogicalPlan { self.accept(&mut visitor)?; Ok(visitor.using_columns) } + + pub fn clone_with_inputs(&self, inputs: Vec) -> Result { + match self { + LogicalPlan::Projection(project) => { + let inputs = inputs.get(1) + .ok_or(DataFusionError::Plan(format!("size < 1")))?; + Ok(LogicalPlan::Projection(Projection { + expr: project.expr.clone(), + input: Arc::from((*inputs).clone()), + schema: project.schema.clone(), + alias: project.alias.clone(), + })) + } + LogicalPlan::Filter(filter) => { + let inputs = inputs.get(1) + .ok_or(DataFusionError::Plan(format!("size < 1")))?; + Ok(LogicalPlan::Filter(Filter { + predicate: filter.predicate.clone(), + input: Arc::from((*inputs).clone()), + })) + } + LogicalPlan::Window(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Aggregate(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Sort(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Join(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::CrossJoin(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Repartition(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Union(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::TableScan(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::EmptyRelation(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Subquery(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::SubqueryAlias(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Limit(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::CreateExternalTable(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::CreateMemoryTable(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::CreateView(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::CreateCatalogSchema(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::CreateCatalog(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::DropTable(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::DropView(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Values(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Explain(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Analyze(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Extension(_) => { Err(DataFusionError::Plan(format!("todo"))) } + LogicalPlan::Distinct(_) => { Err(DataFusionError::Plan(format!("todo"))) } + } + } } /// Trait that implements the [Visitor @@ -1470,6 +1517,18 @@ pub struct Join { pub null_equals_null: bool, } +// TODO: use macro_rules! to implement impl_down_cast_fn +impl LogicalPlan { + pub fn as_projection(&self) -> Option<&Projection> { + match self { + LogicalPlan::Projection(projection) => { + Some(projection) + } + _ => None, + } + } +} + /// Subquery #[derive(Clone)] pub struct Subquery { @@ -1688,7 +1747,7 @@ mod tests { plan.display_graphviz() ); assert!(graphviz.contains(r#"[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]"#), - "\n{}", plan.display_graphviz()); + "\n{}", plan.display_graphviz()); assert!( graphviz.contains(r#"// End DataFusion GraphViz Plan"#), "\n{}", diff --git a/datafusion/optimizer/src/merge_filter.rs b/datafusion/optimizer/src/merge_filter.rs new file mode 100644 index 0000000000000..c6b401711ccdf --- /dev/null +++ b/datafusion/optimizer/src/merge_filter.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::{logical_plan::{EmptyRelation, LogicalPlan}, utils::from_plan, Expr, Filter, Projection}; + +#[derive(Default)] +pub struct MergeProject; + +impl MergeProject { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for MergeProject { + fn optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { + let outer_project = plan.as_projection()?; + let inner_project = plan.as_projection()?; + + return Ok(Some(LogicalPlan::Projection(Projection { + expr: merge_projection_expr(outer_project.expr, inner_project.expr), + input: inner_project.input.clone(), + schema: outer_project.schema, + alias: outer_project.alias, + }))); + } + + fn name(&self) -> &str { + "merge_filter" + } +} diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 7c37284e6fe87..5112a3bd6557e 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -145,6 +145,11 @@ pub struct Optimizer { pub rules: Vec>, } +pub enum ApplyOrder { + TopDown, + BottomUp, +} + impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new(config: &OptimizerConfig) -> Self { @@ -209,10 +214,12 @@ impl Optimizer { log_plan(&format!("Optimizer input (pass {})", i), &new_plan); for rule in &self.rules { - let result = rule.optimize(&new_plan, optimizer_config); + let result = &self.optimize_recursively(rule, plan, optimizer_config); + + // let result = rule.optimize(&new_plan, optimizer_config); match result { Ok(plan) => { - new_plan = plan; + new_plan = plan.clone(); observer(&new_plan, rule.as_ref()); log_plan(rule.name(), &new_plan); } @@ -254,6 +261,45 @@ impl Optimizer { debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } + + fn optimize_node(&self, rule: &Arc, plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig) -> Result { + /// We can do Batch optimize + /// for rule in self.rules { + /// let result = rule.optimize(&plan, optimizer_config); + /// plan = result?; + /// self.stats.count_rule(rule); + /// } + /// } + let result = rule.optimize(&plan, optimizer_config); + result + } + + fn optimize_inputs(&self, rule: &Arc, plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig) -> Result { + let result: Result> = plan + .inputs() + .into_iter() + .map(|sub_plan| self.optimize_recursively(rule, sub_plan, optimizer_config)) + .collect(); + let inputs = result?; + plan.clone_with_inputs(inputs) + } + + pub fn optimize_recursively(&self, rule: &Arc, plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig) -> Result { + match self.apply_order { + ApplyOrder::TopDown => { + // optimize current node. + let plan_optimize_node = self.optimize_node(&rule, plan, optimizer_config)?; + // optimize inputs node. + self.optimize_inputs(rule, &plan_optimize_node, optimizer_config) + } + ApplyOrder::BottomUp => { + // optimize inputs node. + let plan_optimize_inputs =self.optimize_inputs(rule, plan_optimize_node, optimizer_config)?; + // optimize current node. + self.optimize_node(&rule, &plan_optimize_inputs, optimizer_config) + } + } + } } /// Log the plan in debug/tracing mode after some part of the optimizer runs