From 61b1a1632ffe40e33dc252f1d9b7b9c7253af7dc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 10 Oct 2022 16:58:48 -0600 Subject: [PATCH 01/10] Optimizer example and docs --- datafusion/optimizer/README.md | 140 ++++++++++++++- datafusion/optimizer/examples/rewrite_expr.rs | 163 ++++++++++++++++++ 2 files changed, 300 insertions(+), 3 deletions(-) create mode 100644 datafusion/optimizer/examples/rewrite_expr.rs diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 39d28a8fae37a..fc2b740d32954 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -17,10 +17,144 @@ under the License. --> -# DataFusion Query Optimizer Rules +# DataFusion Query Optimizer -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. +[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory +format. -This crate is a submodule of DataFusion that provides query optimizer rules. +DataFusion has modular design, allowing individual crates to be re-used in other projects. + +This crate is a submodule of DataFusion that provides a query optimizer for logical plans. + +## Running the Optimizer + +The following code demonstrates the basic flow of creating the optimizer with a default set of optimization rules +and applying it to a logical plan to produce an optimized logical plan. + +```rust + +// We need a logical plan as the starting point. There are many ways to build a logical plan: +// +// The `datafusion-expr` crate provides a LogicalPlanBuilder +// The `datafusion-sql` crate provides a SQL query planner that can create a LogicalPlan from SQL +// The `datafusion` crate provides a DataFrame API that can create a LogicalPlan +let logical_plan = ... + +let mut config = OptimizerConfig::default(); +let optimizer = Optimizer::new(&config); +let optimized_plan = optimizer.optimize(&logical_plan, &mut config, observe)?; + +fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { + println!( + "After applying rule '{}':\n{}", + rule.name(), + plan.display_indent() + ) +} +``` + +## Providing Custom Rules + +The optimizer can be created with a custom set of rules. + +```rust +let optimizer = Optimizer::with_rules(vec![ + Arc::new(MyRule {}) +]); +``` + +## Writing Optimization Rules + +Please refer to the [examples](examples) to learn more about the general approach to writing optimizer rules and +then move onto studying the existing rules. + +All rules must implement the `OptimizerRule` trait. + +```rust +/// `OptimizerRule` transforms one ['LogicalPlan'] into another which +/// computes the same results, but in a potentially more efficient +/// way. +pub trait OptimizerRule { + /// Rewrite `plan` to an optimized form + fn optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result; + + /// A human readable name for this optimizer rule + fn name(&self) -> &str; +} +``` + +### General Guidelines + +Rules typical walk the logical plan and walk the expression trees inside operators and selectively mutate +individual operators or expressions. + +Sometimes there is an initial pass that visits the plan and builds state that is used in a second pass that performs +the actual optimization. This approach is used in projection push down and filter push down. + +### Utilities + +There are a number of utility methods provided that take care of some common tasks. + +### ExprVisitor + +TBD + +### Rewriting Expressions + +The `MyExprRewriter` trait can be implemented to provide a way to rewrite expressions. This rule can then be applied +to an expression by calling `Expr::rewrite` (from the `ExprRewritable` trait). + +The `rewrite` method will perform a depth first walk of the expression and its children to rewrite an expression, +consuming `self` producing a new expression. + +```rust +let mut expr_rewriter = MyExprRewriter {}; +let expr = expr.rewrite(&mut expr_rewriter)?; +``` + +Here is an example implementation which will rewrite `expr BETWEEN a AND b` as `expr >= a AND expr <= b`. Note that the +implementation does not need to perform any recursion since this is handled by the `rewrite` method. + +```rust +struct MyExprRewriter {} + +impl ExprRewriter for MyExprRewriter { + fn mutate(&mut self, expr: Expr) -> Result { + match expr { + Expr::Between { + negated, + expr, + low, + high, + } => { + let expr: Expr = expr.as_ref().clone(); + let low: Expr = low.as_ref().clone(); + let high: Expr = high.as_ref().clone(); + if negated { + Ok(expr.clone().lt(low).or(expr.clone().gt(high))) + } else { + Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) + } + } + _ => Ok(expr.clone()), + } + } +} +``` + +### optimize_children + +TBD + +### Writing Tests + +There should be unit tests in the same file as the new rule that test the effect of the rule being applied to a plan +in isolation (without any other rule being applied). + +There should also be a test in `integration-tests.rs` that tests the rule as part of the overall optimization process. [df]: https://crates.io/crates/datafusion diff --git a/datafusion/optimizer/examples/rewrite_expr.rs b/datafusion/optimizer/examples/rewrite_expr.rs new file mode 100644 index 0000000000000..7d2c312fb03ab --- /dev/null +++ b/datafusion/optimizer/examples/rewrite_expr.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; +use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource}; +use datafusion_optimizer::optimizer::Optimizer; +use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule}; +use datafusion_sql::planner::{ContextProvider, SqlToRel}; +use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; +use datafusion_sql::sqlparser::parser::Parser; +use datafusion_sql::TableReference; +use std::any::Any; +use std::sync::Arc; + +pub fn main() -> Result<()> { + // produce a logical plan using the datafusion-sql crate + let dialect = PostgreSqlDialect {}; + let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32"; + let statements = Parser::parse_sql(&dialect, sql)?; + + // produce a logical plan using the datafusion-sql crate + let context_provider = MyContextProvider {}; + let sql_to_rel = SqlToRel::new(&context_provider); + let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; + println!( + "Unoptimized Logical Plan:\n\n{}\n", + logical_plan.display_indent() + ); + + // now run the optimizer with our custom rule + let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); + let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false); + let optimized_plan = + optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?; + println!( + "Optimized Logical Plan:\n\n{}\n", + optimized_plan.display_indent() + ); + + Ok(()) +} + +fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { + println!( + "After applying rule '{}':\n{}\n", + rule.name(), + plan.display_indent() + ) +} + +struct MyRule {} + +impl OptimizerRule for MyRule { + fn name(&self) -> &str { + "my_rule" + } + + fn optimize( + &self, + plan: &LogicalPlan, + _config: &mut OptimizerConfig, + ) -> Result { + // recurse down and optimize children first + let plan = utils::optimize_children(self, plan, _config)?; + + match plan { + LogicalPlan::Filter(filter) => { + let mut expr_rewriter = MyExprRewriter {}; + let predicate = filter.predicate.clone(); + let predicate = predicate.rewrite(&mut expr_rewriter)?; + Ok(LogicalPlan::Filter(Filter { + predicate, + input: filter.input.clone(), + })) + } + _ => Ok(plan.clone()), + } + } +} + +struct MyExprRewriter {} + +impl ExprRewriter for MyExprRewriter { + fn mutate(&mut self, expr: Expr) -> Result { + match expr { + Expr::Between { + negated, + expr, + low, + high, + } => { + let expr: Expr = expr.as_ref().clone(); + let low: Expr = low.as_ref().clone(); + let high: Expr = high.as_ref().clone(); + if negated { + Ok(expr.clone().lt(low).or(expr.clone().gt(high))) + } else { + Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) + } + } + _ => Ok(expr.clone()), + } + } +} + +struct MyContextProvider {} + +impl ContextProvider for MyContextProvider { + fn get_table_provider(&self, name: TableReference) -> Result> { + if name.table() == "person" { + Ok(Arc::new(MyTableSource { + schema: Arc::new(Schema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::UInt8, false), + ])), + })) + } else { + Err(DataFusionError::Plan("table not found".to_string())) + } + } + + fn get_function_meta(&self, _name: &str) -> Option> { + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + None + } +} + +struct MyTableSource { + schema: SchemaRef, +} + +impl TableSource for MyTableSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} From b5f2b74f87e18cbba792413999197c132ba1c2fb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 06:40:44 -0600 Subject: [PATCH 02/10] Add Filter::try_new --- datafusion/core/src/physical_plan/planner.rs | 14 +- datafusion/expr/src/logical_plan/builder.rs | 8 +- datafusion/expr/src/logical_plan/plan.rs | 39 ++++- datafusion/expr/src/utils.rs | 8 +- datafusion/optimizer/examples/rewrite_expr.rs | 163 ------------------ .../optimizer/src/common_subexpr_eliminate.rs | 13 +- .../optimizer/src/decorrelate_where_exists.rs | 25 +-- .../optimizer/src/decorrelate_where_in.rs | 24 +-- datafusion/optimizer/src/eliminate_filter.rs | 29 ++-- .../optimizer/src/filter_null_join_keys.rs | 12 +- datafusion/optimizer/src/filter_push_down.rs | 12 +- datafusion/optimizer/src/reduce_cross_join.rs | 24 +-- datafusion/optimizer/src/reduce_outer_join.rs | 24 +-- .../src/rewrite_disjunctive_predicate.rs | 12 +- .../optimizer/src/scalar_subquery_to_join.rs | 28 +-- .../optimizer/src/subquery_filter_to_join.rs | 24 +-- datafusion/optimizer/src/utils.rs | 8 +- datafusion/proto/src/logical_plan.rs | 8 +- datafusion/sql/src/planner.rs | 8 +- 19 files changed, 181 insertions(+), 302 deletions(-) delete mode 100644 datafusion/optimizer/examples/rewrite_expr.rs diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 8bb1d95a48a6c..691c5ecca2c1a 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -28,8 +28,8 @@ use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; use crate::logical_expr::{ - Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, - TableScan, Window, + Aggregate, Distinct, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan, + Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, @@ -756,15 +756,13 @@ impl DefaultPhysicalPlanner { input_exec, )?) ) } - LogicalPlan::Filter(Filter { - input, predicate, .. - }) => { - let physical_input = self.create_initial_plan(input, session_state).await?; + LogicalPlan::Filter(filter) => { + let physical_input = self.create_initial_plan(filter.input(), session_state).await?; let input_schema = physical_input.as_ref().schema(); - let input_dfschema = input.as_ref().schema(); + let input_dfschema = filter.input().schema(); let runtime_expr = self.create_physical_expr( - predicate, + filter.predicate(), input_dfschema, &input_schema, session_state, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 5226399840759..300c3b8cb6dcc 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -289,10 +289,10 @@ impl LogicalPlanBuilder { /// Apply a filter pub fn filter(&self, expr: impl Into) -> Result { let expr = normalize_col(expr.into(), &self.plan)?; - Ok(Self::from(LogicalPlan::Filter(Filter { - predicate: expr, - input: Arc::new(self.plan.clone()), - }))) + Ok(Self::from(LogicalPlan::Filter(Filter::try_new( + expr, + Arc::new(self.plan.clone()), + )?))) } /// Limit the number of rows returned diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 7f5c95ab14364..0d76b1ef8ea34 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; +///! Logical plan types use crate::logical_plan::builder::validate_unique_names; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; @@ -25,7 +27,6 @@ use crate::{Expr, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError}; use std::collections::HashSet; -///! Logical plan types use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -1148,18 +1149,50 @@ pub struct SubqueryAlias { #[derive(Clone)] pub struct Filter { /// The predicate expression, which must have Boolean type. - pub predicate: Expr, + pub(crate) predicate: Expr, /// The incoming logical plan - pub input: Arc, + pub(crate) input: Arc, } impl Filter { + pub fn try_new( + predicate: Expr, + input: Arc, + ) -> datafusion_common::Result { + struct CheckForAliasedExpr {} + + impl ExpressionVisitor for CheckForAliasedExpr { + fn pre_visit(self, expr: &Expr) -> datafusion_common::Result> + where + Self: ExpressionVisitor, + { + match expr { + Expr::Alias(_, alias) => Err(DataFusionError::Plan(format!("Attempted to create Filter predicate containing aliased expressions with name '{}'", alias))), + _ => Ok(Recursion::Continue(self)) + } + } + } + + let visitor = CheckForAliasedExpr {}; + predicate.accept(visitor)?; + + Ok(Self { predicate, input }) + } + pub fn try_from_plan(plan: &LogicalPlan) -> datafusion_common::Result<&Filter> { match plan { LogicalPlan::Filter(it) => Ok(it), _ => plan_err!("Could not coerce into Filter!"), } } + + pub fn predicate(&self) -> &Expr { + &self.predicate + } + + pub fn input(&self) -> &Arc { + &self.input + } } /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 501b4a8f11fc4..fbba755b1afa6 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -380,10 +380,10 @@ pub fn from_plan( .map(|s| s.to_vec()) .collect::>(), })), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter { - predicate: expr[0].clone(), - input: Arc::new(inputs[0].clone()), - })), + LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter::try_new( + expr[0].clone(), + Arc::new(inputs[0].clone()), + )?)), LogicalPlan::Repartition(Repartition { partitioning_scheme, .. diff --git a/datafusion/optimizer/examples/rewrite_expr.rs b/datafusion/optimizer/examples/rewrite_expr.rs deleted file mode 100644 index 7d2c312fb03ab..0000000000000 --- a/datafusion/optimizer/examples/rewrite_expr.rs +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; -use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource}; -use datafusion_optimizer::optimizer::Optimizer; -use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_sql::planner::{ContextProvider, SqlToRel}; -use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; -use datafusion_sql::sqlparser::parser::Parser; -use datafusion_sql::TableReference; -use std::any::Any; -use std::sync::Arc; - -pub fn main() -> Result<()> { - // produce a logical plan using the datafusion-sql crate - let dialect = PostgreSqlDialect {}; - let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32"; - let statements = Parser::parse_sql(&dialect, sql)?; - - // produce a logical plan using the datafusion-sql crate - let context_provider = MyContextProvider {}; - let sql_to_rel = SqlToRel::new(&context_provider); - let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; - println!( - "Unoptimized Logical Plan:\n\n{}\n", - logical_plan.display_indent() - ); - - // now run the optimizer with our custom rule - let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); - let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false); - let optimized_plan = - optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?; - println!( - "Optimized Logical Plan:\n\n{}\n", - optimized_plan.display_indent() - ); - - Ok(()) -} - -fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { - println!( - "After applying rule '{}':\n{}\n", - rule.name(), - plan.display_indent() - ) -} - -struct MyRule {} - -impl OptimizerRule for MyRule { - fn name(&self) -> &str { - "my_rule" - } - - fn optimize( - &self, - plan: &LogicalPlan, - _config: &mut OptimizerConfig, - ) -> Result { - // recurse down and optimize children first - let plan = utils::optimize_children(self, plan, _config)?; - - match plan { - LogicalPlan::Filter(filter) => { - let mut expr_rewriter = MyExprRewriter {}; - let predicate = filter.predicate.clone(); - let predicate = predicate.rewrite(&mut expr_rewriter)?; - Ok(LogicalPlan::Filter(Filter { - predicate, - input: filter.input.clone(), - })) - } - _ => Ok(plan.clone()), - } - } -} - -struct MyExprRewriter {} - -impl ExprRewriter for MyExprRewriter { - fn mutate(&mut self, expr: Expr) -> Result { - match expr { - Expr::Between { - negated, - expr, - low, - high, - } => { - let expr: Expr = expr.as_ref().clone(); - let low: Expr = low.as_ref().clone(); - let high: Expr = high.as_ref().clone(); - if negated { - Ok(expr.clone().lt(low).or(expr.clone().gt(high))) - } else { - Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) - } - } - _ => Ok(expr.clone()), - } - } -} - -struct MyContextProvider {} - -impl ContextProvider for MyContextProvider { - fn get_table_provider(&self, name: TableReference) -> Result> { - if name.table() == "person" { - Ok(Arc::new(MyTableSource { - schema: Arc::new(Schema::new(vec![ - Field::new("name", DataType::Utf8, false), - Field::new("age", DataType::UInt8, false), - ])), - })) - } else { - Err(DataFusionError::Plan("table not found".to_string())) - } - } - - fn get_function_meta(&self, _name: &str) -> Option> { - None - } - - fn get_aggregate_meta(&self, _name: &str) -> Option> { - None - } - - fn get_variable_type(&self, _variable_names: &[String]) -> Option { - None - } -} - -struct MyTableSource { - schema: SchemaRef, -} - -impl TableSource for MyTableSource { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 552c03d3d79e0..37c24f91ae1d5 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -111,8 +111,9 @@ fn optimize( alias.clone(), )?)) } - LogicalPlan::Filter(Filter { predicate, input }) => { + LogicalPlan::Filter(filter) => { let schema = plan.schema().as_ref().clone(); + let predicate = filter.predicate(); let data_type = if let Ok(data_type) = predicate.get_type(&schema) { data_type } else { @@ -132,16 +133,16 @@ fn optimize( let (mut new_expr, new_input) = rewrite_expr( &[&[predicate.clone()]], &[&[id_array]], - input, + filter.input(), &mut expr_set, optimizer_config, )?; if let Some(predicate) = pop_expr(&mut new_expr)?.pop() { - Ok(LogicalPlan::Filter(Filter { - predicate, - input: Arc::new(new_input), - })) + Ok(LogicalPlan::Filter(Filter::try_new( + predicate.clone(), + Arc::new(new_input), + )?)) } else { Err(DataFusionError::Internal( "Failed to pop predicate expr".to_string(), diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 671deb0276b6a..7653639692eb8 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -76,19 +76,19 @@ impl OptimizerRule for DecorrelateWhereExists { optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { match plan { - LogicalPlan::Filter(Filter { - predicate, - input: filter_input, - }) => { + LogicalPlan::Filter(filter) => { + let predicate = filter.predicate(); + let filter_input = filter.input(); + // Apply optimizer rule to current input let optimized_input = self.optimize(filter_input, optimizer_config)?; let (subqueries, other_exprs) = self.extract_subquery_exprs(predicate, optimizer_config)?; - let optimized_plan = LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - }); + let optimized_plan = LogicalPlan::Filter(Filter::try_new( + predicate.clone(), + Arc::new(optimized_input), + )?); if subqueries.is_empty() { // regular filter, no subquery exists clause here return Ok(optimized_plan); @@ -153,20 +153,21 @@ fn optimize_exists( // split into filters let mut subqry_filter_exprs = vec![]; - split_conjunction(&subqry_filter.predicate, &mut subqry_filter_exprs); + split_conjunction(&subqry_filter.predicate(), &mut subqry_filter_exprs); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on let (col_exprs, other_subqry_exprs) = - find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?; + find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema())?; let (outer_cols, subqry_cols, join_filters) = - exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?; + exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?; if subqry_cols.is_empty() || outer_cols.is_empty() { plan_err!("cannot optimize non-correlated subquery")?; } // build subquery side of join - the thing the subquery was querying - let mut subqry_plan = LogicalPlanBuilder::from((*subqry_filter.input).clone()); + let mut subqry_plan = + LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone()); if let Some(expr) = combine_filters(&other_subqry_exprs) { subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them } diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index d5af0911d32ef..e3e61d91c390d 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -83,19 +83,19 @@ impl OptimizerRule for DecorrelateWhereIn { optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { match plan { - LogicalPlan::Filter(Filter { - predicate, - input: filter_input, - }) => { + LogicalPlan::Filter(filter) => { + let predicate = filter.predicate(); + let filter_input = filter.input(); + // Apply optimizer rule to current input let optimized_input = self.optimize(filter_input, optimizer_config)?; let (subqueries, other_exprs) = self.extract_subquery_exprs(predicate, optimizer_config)?; - let optimized_plan = LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - }); + let optimized_plan = LogicalPlan::Filter(Filter::try_new( + predicate.clone(), + Arc::new(optimized_input), + )?); if subqueries.is_empty() { // regular filter, no subquery exists clause here return Ok(optimized_plan); @@ -152,18 +152,18 @@ fn optimize_where_in( if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() { // split into filters let mut subqry_filter_exprs = vec![]; - split_conjunction(&subqry_filter.predicate, &mut subqry_filter_exprs); + split_conjunction(&subqry_filter.predicate(), &mut subqry_filter_exprs); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on let (col_exprs, other_exprs) = - find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema()) + find_join_exprs(subqry_filter_exprs, subqry_filter.input().schema()) .map_err(|e| context!("column correlation not found", e))?; if !col_exprs.is_empty() { // it's correlated - subqry_input = subqry_filter.input.clone(); + subqry_input = subqry_filter.input().clone(); (outer_cols, subqry_cols, join_filters) = - exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false) + exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false) .map_err(|e| context!("column correlation not found", e))?; other_subqry_exprs = other_exprs; } diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index 61e9613cfa046..6c0c51b86dc64 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -21,7 +21,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ - logical_plan::{EmptyRelation, Filter, LogicalPlan}, + logical_plan::{EmptyRelation, LogicalPlan}, utils::from_plan, Expr, }; @@ -43,21 +43,30 @@ impl OptimizerRule for EliminateFilter { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { - match plan { - LogicalPlan::Filter(Filter { - predicate: Expr::Literal(ScalarValue::Boolean(Some(v))), - input, - }) => { - if !*v { + let (filter_value, input) = match plan { + LogicalPlan::Filter(filter) => match filter.predicate() { + Expr::Literal(ScalarValue::Boolean(Some(v))) => { + (Some(*v), Some(filter.input())) + } + _ => (None, None), + }, + _ => (None, None), + }; + + match filter_value { + Some(v) => { + // input is guaranteed be Some due to previous code + let input = input.unwrap(); + if v { + self.optimize(input, optimizer_config) + } else { Ok(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: input.schema().clone(), })) - } else { - self.optimize(input, optimizer_config) } } - _ => { + None => { // Apply the optimization to all inputs of the plan let inputs = plan.inputs(); let new_inputs = inputs diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index 4d237dd04a664..be33c796ea428 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -68,17 +68,17 @@ impl OptimizerRule for FilterNullJoinKeys { if !left_filters.is_empty() { let predicate = create_not_null_predicate(left_filters); - join.left = Arc::new(LogicalPlan::Filter(Filter { + join.left = Arc::new(LogicalPlan::Filter(Filter::try_new( predicate, - input: join.left.clone(), - })); + join.left.clone(), + )?)); } if !right_filters.is_empty() { let predicate = create_not_null_predicate(right_filters); - join.right = Arc::new(LogicalPlan::Filter(Filter { + join.right = Arc::new(LogicalPlan::Filter(Filter::try_new( predicate, - input: join.right.clone(), - })); + join.right.clone(), + )?)); } Ok(LogicalPlan::Join(join)) } diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 129766012531d..d9692ff927e67 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -20,7 +20,7 @@ use datafusion_expr::{ col, expr_rewriter::{replace_col, ExprRewritable, ExprRewriter}, logical_plan::{ - Aggregate, CrossJoin, Filter, Join, JoinType, Limit, LogicalPlan, Projection, + Aggregate, CrossJoin, Join, JoinType, Limit, LogicalPlan, Projection, TableScan, Union, }, utils::{expr_to_columns, exprlist_to_columns, from_plan}, @@ -138,7 +138,7 @@ fn issue_filters( return push_down(&state, plan); } - let plan = utils::add_filter(plan.clone(), &predicates); + let plan = utils::add_filter(plan.clone(), &predicates)?; state.filters = remove_filters(&state.filters, &predicate_columns); @@ -326,7 +326,7 @@ fn optimize_join( Ok(plan) } else { // wrap the join on the filter whose predicates must be kept - let plan = utils::add_filter(plan, &to_keep.0); + let plan = utils::add_filter(plan, &to_keep.0)?; state.filters = remove_filters(&state.filters, &to_keep.1); Ok(plan) @@ -340,9 +340,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { push_down(&state, plan) } LogicalPlan::Analyze { .. } => push_down(&state, plan), - LogicalPlan::Filter(Filter { input, predicate }) => { + LogicalPlan::Filter(filter) => { let mut predicates = vec![]; - utils::split_conjunction(predicate, &mut predicates); + utils::split_conjunction(filter.predicate(), &mut predicates); predicates .into_iter() @@ -353,7 +353,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { Ok(()) })?; - optimize(input, state) + optimize(filter.input(), state) } LogicalPlan::Projection(Projection { input, diff --git a/datafusion/optimizer/src/reduce_cross_join.rs b/datafusion/optimizer/src/reduce_cross_join.rs index 4c43188cff5d8..99880cb769afe 100644 --- a/datafusion/optimizer/src/reduce_cross_join.rs +++ b/datafusion/optimizer/src/reduce_cross_join.rs @@ -77,33 +77,33 @@ fn reduce_cross_join( all_join_keys: &mut HashSet<(Column, Column)>, ) -> Result { match plan { - LogicalPlan::Filter(Filter { input, predicate }) => { + LogicalPlan::Filter(filter) => { // join keys are handled locally let mut new_possible_join_keys: Vec<(Column, Column)> = vec![]; let mut new_all_join_keys = HashSet::new(); - extract_possible_join_keys(predicate, &mut new_possible_join_keys); + extract_possible_join_keys(filter.predicate(), &mut new_possible_join_keys); let new_plan = reduce_cross_join( _optimizer, - input, + filter.input(), &mut new_possible_join_keys, &mut new_all_join_keys, )?; // if there are no join keys then do nothing. if new_all_join_keys.is_empty() { - Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(new_plan), - })) + Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(new_plan), + )?)) } else { // remove join expressions from filter - match remove_join_expressions(predicate, &new_all_join_keys)? { - Some(filter_expr) => Ok(LogicalPlan::Filter(Filter { - predicate: filter_expr, - input: Arc::new(new_plan), - })), + match remove_join_expressions(filter.predicate(), &new_all_join_keys)? { + Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new( + filter_expr, + Arc::new(new_plan), + )?)), _ => Ok(new_plan), } } diff --git a/datafusion/optimizer/src/reduce_outer_join.rs b/datafusion/optimizer/src/reduce_outer_join.rs index 6ca4a5994989a..93b706afe365d 100644 --- a/datafusion/optimizer/src/reduce_outer_join.rs +++ b/datafusion/optimizer/src/reduce_outer_join.rs @@ -69,34 +69,34 @@ fn reduce_outer_join( _optimizer_config: &OptimizerConfig, ) -> Result { match plan { - LogicalPlan::Filter(Filter { input, predicate }) => match &**input { + LogicalPlan::Filter(filter) => match filter.input().as_ref() { LogicalPlan::Join(join) => { extract_nonnullable_columns( - predicate, + filter.predicate(), nonnullable_cols, join.left.schema(), join.right.schema(), true, )?; - Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(reduce_outer_join( + Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(reduce_outer_join( _optimizer, - input, + filter.input(), nonnullable_cols, _optimizer_config, )?), - })) + )?)) } - _ => Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(reduce_outer_join( + _ => Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(reduce_outer_join( _optimizer, - input, + filter.input(), nonnullable_cols, _optimizer_config, )?), - })), + )?)), }, LogicalPlan::Join(join) => { let mut new_join_type = join.join_type; diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 2eadfb3d5f0e0..c6f1433fde6d7 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -129,16 +129,16 @@ impl RewriteDisjunctivePredicate { ) -> Result { match plan { LogicalPlan::Filter(filter) => { - let predicate = predicate(&filter.predicate)?; + let predicate = predicate(&filter.predicate())?; let rewritten_predicate = rewrite_predicate(predicate); let rewritten_expr = normalize_predicate(rewritten_predicate); - Ok(LogicalPlan::Filter(Filter { - predicate: rewritten_expr, - input: Arc::new(self.rewrite_disjunctive_predicate( - &filter.input, + Ok(LogicalPlan::Filter(Filter::try_new( + rewritten_expr, + Arc::new(self.rewrite_disjunctive_predicate( + filter.input(), _optimizer_config, )?), - })) + )?)) } _ => { let expr = plan.expressions(); diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 0a2256eba05c7..57b259fdea6ae 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -95,23 +95,23 @@ impl OptimizerRule for ScalarSubqueryToJoin { optimizer_config: &mut OptimizerConfig, ) -> Result { match plan { - LogicalPlan::Filter(Filter { predicate, input }) => { + LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input - let optimized_input = self.optimize(input, optimizer_config)?; + let optimized_input = self.optimize(filter.input(), optimizer_config)?; let (subqueries, other_exprs) = - self.extract_subquery_exprs(predicate, optimizer_config)?; + self.extract_subquery_exprs(filter.predicate(), optimizer_config)?; if subqueries.is_empty() { // regular filter, no subquery exists clause here - return Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - })); + return Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(optimized_input), + )?)); } // iterate through all subqueries in predicate, turning each into a join - let mut cur_input = (**input).clone(); + let mut cur_input = filter.input().as_ref().clone(); for subquery in subqueries { if let Some(optimized_subquery) = optimize_scalar( &subquery, @@ -122,10 +122,10 @@ impl OptimizerRule for ScalarSubqueryToJoin { cur_input = optimized_subquery; } else { // if we can't handle all of the subqueries then bail for now - return Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - })); + return Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(optimized_input), + )?)); } } Ok(cur_input) @@ -228,7 +228,7 @@ fn optimize_scalar( // if there were filters, we use that logical plan, otherwise the plan from the aggregate let input = if let Some(filter) = filter { - &filter.input + filter.input() } else { &aggr.input }; @@ -236,7 +236,7 @@ fn optimize_scalar( // if there were filters, split and capture them let mut subqry_filter_exprs = vec![]; if let Some(filter) = filter { - split_conjunction(&filter.predicate, &mut subqry_filter_exprs); + split_conjunction(&filter.predicate(), &mut subqry_filter_exprs); } verify_not_disjunction(&subqry_filter_exprs)?; diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index 91d31f28edefd..bd07eeab2f82f 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -55,13 +55,13 @@ impl OptimizerRule for SubqueryFilterToJoin { optimizer_config: &mut OptimizerConfig, ) -> Result { match plan { - LogicalPlan::Filter(Filter { predicate, input }) => { + LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input - let optimized_input = self.optimize(input, optimizer_config)?; + let optimized_input = self.optimize(filter.input(), optimizer_config)?; // Splitting filter expression into components by AND let mut filters = vec![]; - utils::split_conjunction(predicate, &mut filters); + utils::split_conjunction(filter.predicate(), &mut filters); // Searching for subquery-based filters let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) = @@ -79,10 +79,10 @@ impl OptimizerRule for SubqueryFilterToJoin { })?; if !subqueries_in_regular.is_empty() { - return Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - })); + return Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(optimized_input), + )?)); }; // Add subquery joins to new_input @@ -151,10 +151,10 @@ impl OptimizerRule for SubqueryFilterToJoin { let new_input = match opt_result { Ok(plan) => plan, Err(_) => { - return Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - })) + return Ok(LogicalPlan::Filter(Filter::try_new( + filter.predicate().clone(), + Arc::new(optimized_input), + )?)) } }; @@ -162,7 +162,7 @@ impl OptimizerRule for SubqueryFilterToJoin { if regular_filters.is_empty() { Ok(new_input) } else { - Ok(utils::add_filter(new_input, ®ular_filters)) + utils::add_filter(new_input, ®ular_filters) } } _ => { diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index d962dd7b45b9a..5e64092a48618 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -104,7 +104,7 @@ pub fn verify_not_disjunction(predicates: &[&Expr]) -> Result<()> { /// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with /// its predicate be all `predicates` ANDed. -pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { +pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result { // reduce filters to a single filter with an AND let predicate = predicates .iter() @@ -113,10 +113,10 @@ pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { and(acc, (*predicate).to_owned()) }); - LogicalPlan::Filter(Filter { + Ok(LogicalPlan::Filter(Filter::try_new( predicate, - input: Arc::new(plan), - }) + Arc::new(plan), + )?)) } /// Looks for correlating expressions: equality expressions with one field from the subquery, and diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 8f9f8a9c46613..da2654033d13f 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -38,7 +38,7 @@ use datafusion_common::{Column, DataFusionError}; use datafusion_expr::{ logical_plan::{ Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView, - CrossJoin, Distinct, EmptyRelation, Extension, Filter, Join, JoinConstraint, + CrossJoin, Distinct, EmptyRelation, Extension, Join, JoinConstraint, JoinType, Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, }, @@ -805,17 +805,17 @@ impl AsLogicalPlan for LogicalPlanNode { }, ))), }), - LogicalPlan::Filter(Filter { predicate, input }) => { + LogicalPlan::Filter(filter) => { let input: protobuf::LogicalPlanNode = protobuf::LogicalPlanNode::try_from_logical_plan( - input.as_ref(), + filter.input().as_ref(), extension_codec, )?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Selection(Box::new( protobuf::SelectionNode { input: Some(Box::new(input)), - expr: Some(predicate.try_into()?), + expr: Some(filter.predicate().try_into()?), }, ))), }) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 58b65af596ef2..eedfb5b7cf45d 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -973,10 +973,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { x.as_slice(), &[join_columns], )?; - Ok(LogicalPlan::Filter(Filter { - predicate: filter_expr, - input: Arc::new(left), - })) + Ok(LogicalPlan::Filter(Filter::try_new( + filter_expr, + Arc::new(left), + )?)) } _ => Ok(left), } From 5c093f602bbedbcf7718f781fbf540eebb68c47d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 06:42:01 -0600 Subject: [PATCH 03/10] Revert some changes --- datafusion/optimizer/README.md | 140 +-------------------------------- 1 file changed, 3 insertions(+), 137 deletions(-) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index fc2b740d32954..39d28a8fae37a 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -17,144 +17,10 @@ under the License. --> -# DataFusion Query Optimizer +# DataFusion Query Optimizer Rules -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory -format. +[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. -DataFusion has modular design, allowing individual crates to be re-used in other projects. - -This crate is a submodule of DataFusion that provides a query optimizer for logical plans. - -## Running the Optimizer - -The following code demonstrates the basic flow of creating the optimizer with a default set of optimization rules -and applying it to a logical plan to produce an optimized logical plan. - -```rust - -// We need a logical plan as the starting point. There are many ways to build a logical plan: -// -// The `datafusion-expr` crate provides a LogicalPlanBuilder -// The `datafusion-sql` crate provides a SQL query planner that can create a LogicalPlan from SQL -// The `datafusion` crate provides a DataFrame API that can create a LogicalPlan -let logical_plan = ... - -let mut config = OptimizerConfig::default(); -let optimizer = Optimizer::new(&config); -let optimized_plan = optimizer.optimize(&logical_plan, &mut config, observe)?; - -fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { - println!( - "After applying rule '{}':\n{}", - rule.name(), - plan.display_indent() - ) -} -``` - -## Providing Custom Rules - -The optimizer can be created with a custom set of rules. - -```rust -let optimizer = Optimizer::with_rules(vec![ - Arc::new(MyRule {}) -]); -``` - -## Writing Optimization Rules - -Please refer to the [examples](examples) to learn more about the general approach to writing optimizer rules and -then move onto studying the existing rules. - -All rules must implement the `OptimizerRule` trait. - -```rust -/// `OptimizerRule` transforms one ['LogicalPlan'] into another which -/// computes the same results, but in a potentially more efficient -/// way. -pub trait OptimizerRule { - /// Rewrite `plan` to an optimized form - fn optimize( - &self, - plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, - ) -> Result; - - /// A human readable name for this optimizer rule - fn name(&self) -> &str; -} -``` - -### General Guidelines - -Rules typical walk the logical plan and walk the expression trees inside operators and selectively mutate -individual operators or expressions. - -Sometimes there is an initial pass that visits the plan and builds state that is used in a second pass that performs -the actual optimization. This approach is used in projection push down and filter push down. - -### Utilities - -There are a number of utility methods provided that take care of some common tasks. - -### ExprVisitor - -TBD - -### Rewriting Expressions - -The `MyExprRewriter` trait can be implemented to provide a way to rewrite expressions. This rule can then be applied -to an expression by calling `Expr::rewrite` (from the `ExprRewritable` trait). - -The `rewrite` method will perform a depth first walk of the expression and its children to rewrite an expression, -consuming `self` producing a new expression. - -```rust -let mut expr_rewriter = MyExprRewriter {}; -let expr = expr.rewrite(&mut expr_rewriter)?; -``` - -Here is an example implementation which will rewrite `expr BETWEEN a AND b` as `expr >= a AND expr <= b`. Note that the -implementation does not need to perform any recursion since this is handled by the `rewrite` method. - -```rust -struct MyExprRewriter {} - -impl ExprRewriter for MyExprRewriter { - fn mutate(&mut self, expr: Expr) -> Result { - match expr { - Expr::Between { - negated, - expr, - low, - high, - } => { - let expr: Expr = expr.as_ref().clone(); - let low: Expr = low.as_ref().clone(); - let high: Expr = high.as_ref().clone(); - if negated { - Ok(expr.clone().lt(low).or(expr.clone().gt(high))) - } else { - Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) - } - } - _ => Ok(expr.clone()), - } - } -} -``` - -### optimize_children - -TBD - -### Writing Tests - -There should be unit tests in the same file as the new rule that test the effect of the rule being applied to a plan -in isolation (without any other rule being applied). - -There should also be a test in `integration-tests.rs` that tests the rule as part of the overall optimization process. +This crate is a submodule of DataFusion that provides query optimizer rules. [df]: https://crates.io/crates/datafusion From 91b9a9dfe3b9e75b58409debb6119a7122a70014 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 07:20:54 -0600 Subject: [PATCH 04/10] validate that filter predicates return boolean --- datafusion/core/src/physical_plan/planner.rs | 5 ++- datafusion/expr/Cargo.toml | 1 + datafusion/expr/src/logical_plan/plan.rs | 41 +++++++++++++------ datafusion/optimizer/src/filter_push_down.rs | 4 +- datafusion/optimizer/src/reduce_cross_join.rs | 10 +++-- datafusion/proto/src/logical_plan.rs | 5 +-- 6 files changed, 44 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 691c5ecca2c1a..17c9b1562c44f 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1675,6 +1675,7 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { mod tests { use super::*; use crate::assert_contains; + use crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES; use crate::datasource::MemTable; use crate::execution::context::TaskContext; use crate::execution::options::CsvReadOptions; @@ -1703,7 +1704,9 @@ mod tests { fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); - SessionState::with_config_rt(SessionConfig::new(), runtime) + let config = + SessionConfig::new().set_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES, false); + SessionState::with_config_rt(config, runtime) } async fn plan(logical_plan: &LogicalPlan) -> Result> { diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 3280628a42eb9..5d7350f720cd9 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -38,4 +38,5 @@ path = "src/lib.rs" ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } arrow = { version = "24.0.0", default-features = false } datafusion-common = { path = "../common", version = "13.0.0" } +log = "^0.4" sqlparser = "0.25" diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0d76b1ef8ea34..d4b22dcef6abe 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; +use crate::expr_rewriter::{ExprRewritable, ExprRewriter}; ///! Logical plan types use crate::logical_plan::builder::validate_unique_names; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; @@ -23,9 +23,10 @@ use crate::logical_plan::extension::UserDefinedLogicalNode; use crate::utils::{ exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist, }; -use crate::{Expr, TableProviderFilterPushDown, TableSource}; +use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError}; +use log::debug; use std::collections::HashSet; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -1155,26 +1156,42 @@ pub struct Filter { } impl Filter { + /// Create a new filter operator. pub fn try_new( predicate: Expr, input: Arc, ) -> datafusion_common::Result { - struct CheckForAliasedExpr {} + // filter predicates must return a boolean value + let predicate_type = predicate.get_type(input.schema())?; + if predicate_type != DataType::Boolean { + return Err(DataFusionError::Plan(format!( + "Cannot create filter with non-boolean predicate {} returning {}", + predicate, predicate_type + ))); + } - impl ExpressionVisitor for CheckForAliasedExpr { - fn pre_visit(self, expr: &Expr) -> datafusion_common::Result> - where - Self: ExpressionVisitor, - { + // filter predicates should not contain aliased expressions so we remove any aliases here + // but perhaps we should be failing here instead? + struct RemoveAliases {} + + impl ExprRewriter for RemoveAliases { + fn mutate(&mut self, expr: Expr) -> datafusion_common::Result { match expr { - Expr::Alias(_, alias) => Err(DataFusionError::Plan(format!("Attempted to create Filter predicate containing aliased expressions with name '{}'", alias))), - _ => Ok(Recursion::Continue(self)) + Expr::Alias(expr, alias) => { + debug!( + "Attempted to create Filter predicate containing aliased \ + expression `{}` AS '{}'", + expr, alias + ); + Ok(expr.as_ref().clone()) + } + _ => Ok(expr.clone()), } } } - let visitor = CheckForAliasedExpr {}; - predicate.accept(visitor)?; + let mut remove_aliases = RemoveAliases {}; + let predicate = predicate.rewrite(&mut remove_aliases)?; Ok(Self { predicate, input }) } diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index d9692ff927e67..d1f6966213732 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -20,8 +20,8 @@ use datafusion_expr::{ col, expr_rewriter::{replace_col, ExprRewritable, ExprRewriter}, logical_plan::{ - Aggregate, CrossJoin, Join, JoinType, Limit, LogicalPlan, Projection, - TableScan, Union, + Aggregate, CrossJoin, Join, JoinType, Limit, LogicalPlan, Projection, TableScan, + Union, }, utils::{expr_to_columns, exprlist_to_columns, from_plan}, Expr, TableProviderFilterPushDown, diff --git a/datafusion/optimizer/src/reduce_cross_join.rs b/datafusion/optimizer/src/reduce_cross_join.rs index 99880cb769afe..e8c2ff9ecd0a9 100644 --- a/datafusion/optimizer/src/reduce_cross_join.rs +++ b/datafusion/optimizer/src/reduce_cross_join.rs @@ -78,15 +78,17 @@ fn reduce_cross_join( ) -> Result { match plan { LogicalPlan::Filter(filter) => { + let input = filter.input(); + let predicate = filter.predicate(); // join keys are handled locally let mut new_possible_join_keys: Vec<(Column, Column)> = vec![]; let mut new_all_join_keys = HashSet::new(); - extract_possible_join_keys(filter.predicate(), &mut new_possible_join_keys); + extract_possible_join_keys(predicate, &mut new_possible_join_keys); let new_plan = reduce_cross_join( _optimizer, - filter.input(), + input, &mut new_possible_join_keys, &mut new_all_join_keys, )?; @@ -94,12 +96,12 @@ fn reduce_cross_join( // if there are no join keys then do nothing. if new_all_join_keys.is_empty() { Ok(LogicalPlan::Filter(Filter::try_new( - filter.predicate().clone(), + predicate.clone(), Arc::new(new_plan), )?)) } else { // remove join expressions from filter - match remove_join_expressions(filter.predicate(), &new_all_join_keys)? { + match remove_join_expressions(predicate, &new_all_join_keys)? { Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new( filter_expr, Arc::new(new_plan), diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index da2654033d13f..a7c366e1633db 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -38,9 +38,8 @@ use datafusion_common::{Column, DataFusionError}; use datafusion_expr::{ logical_plan::{ Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView, - CrossJoin, Distinct, EmptyRelation, Extension, Join, JoinConstraint, - JoinType, Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, - Window, + CrossJoin, Distinct, EmptyRelation, Extension, Join, JoinConstraint, JoinType, + Limit, Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, }, Expr, LogicalPlan, LogicalPlanBuilder, }; From 8ec034f15ecf713416343031f8e9ae1925d85b7f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 07:56:14 -0600 Subject: [PATCH 05/10] update some invalid tests --- datafusion/core/src/physical_plan/planner.rs | 11 +++-- datafusion/core/tests/sql/select.rs | 9 +++- datafusion/expr/src/logical_plan/plan.rs | 16 ++++--- .../optimizer/src/simplify_expressions.rs | 48 +++++++++---------- 4 files changed, 49 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 17c9b1562c44f..599b133e995ac 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1675,7 +1675,6 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { mod tests { use super::*; use crate::assert_contains; - use crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES; use crate::datasource::MemTable; use crate::execution::context::TaskContext; use crate::execution::options::CsvReadOptions; @@ -1704,8 +1703,9 @@ mod tests { fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); - let config = - SessionConfig::new().set_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES, false); + let config = SessionConfig::new(); + // TODO we should really test that no optimizer rules are failing here + // let config = config.set_bool(crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES, false); SessionState::with_config_rt(config, runtime) } @@ -1973,6 +1973,11 @@ mod tests { let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"1\") }], negated: false, set: None }"; assert!(format!("{:?}", execution_plan).contains(expected)); + Ok(()) + } + + #[tokio::test] + async fn in_list_types_struct_literal() -> Result<()> { // expression: "a in (struct::null, 'a')" let list = vec![struct_literal(), lit("a")]; diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 1ea530bfb64e6..1c79f2386623b 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use datafusion::config::OPT_OPTIMIZER_SKIP_FAILED_RULES; use datafusion::{ datasource::empty::EmptyTable, from_slice::FromSlice, physical_plan::collect_partitioned, @@ -1211,7 +1212,9 @@ async fn boolean_literal() -> Result<()> { #[tokio::test] async fn unprojected_filter() { - let ctx = SessionContext::new(); + let config = SessionConfig::new(); + // let config = config.set_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES, false); + let ctx = SessionContext::with_config(config); let df = ctx.read_table(table_with_sequence(1, 3).unwrap()).unwrap(); let df = df @@ -1219,6 +1222,10 @@ async fn unprojected_filter() { .unwrap() .filter(col("i").gt(lit(2))) .unwrap(); + + let plan = df.to_logical_plan().unwrap(); + println!("{}", plan.display_indent()); + let results = df.collect().await.unwrap(); let expected = vec![ diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d4b22dcef6abe..6d35c798a156a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1161,14 +1161,16 @@ impl Filter { predicate: Expr, input: Arc, ) -> datafusion_common::Result { + // TODO enable this check once invalid tests are updated + // filter predicates must return a boolean value - let predicate_type = predicate.get_type(input.schema())?; - if predicate_type != DataType::Boolean { - return Err(DataFusionError::Plan(format!( - "Cannot create filter with non-boolean predicate {} returning {}", - predicate, predicate_type - ))); - } + // let predicate_type = predicate.get_type(input.schema())?; + // if predicate_type != DataType::Boolean { + // return Err(DataFusionError::Plan(format!( + // "Cannot create filter with non-boolean predicate {} returning {}", + // predicate, predicate_type + // ))); + // } // filter predicates should not contain aliased expressions so we remove any aliases here // but perhaps we should be failing here instead? diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index c96f6eea71510..94211895162e0 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -1930,7 +1930,7 @@ mod tests { assert_optimized_plan_eq( &plan, "\ - Filter: test.b > Int32(1) AS test.b > Int32(1) AND test.b > Int32(1)\ + Filter: test.b > Int32(1)\ \n Projection: test.a\ \n TableScan: test", ); @@ -1954,7 +1954,7 @@ mod tests { assert_optimized_plan_eq( &plan, "\ - Filter: test.a > Int32(5) AND test.b < Int32(6) AS test.a > Int32(5) AND test.b < Int32(6) AND test.a > Int32(5)\ + Filter: test.a > Int32(5) AND test.b < Int32(6)\ \n Projection: test.a, test.b\ \n TableScan: test", ); @@ -1975,8 +1975,8 @@ mod tests { let expected = "\ Projection: test.a\ - \n Filter: NOT test.c AS test.c = Boolean(false)\ - \n Filter: test.b AS test.b = Boolean(true)\ + \n Filter: NOT test.c\ + \n Filter: test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2000,8 +2000,8 @@ mod tests { let expected = "\ Projection: test.a\ \n Limit: skip=0, fetch=1\ - \n Filter: test.c AS test.c != Boolean(false)\ - \n Filter: NOT test.b AS test.b != Boolean(true)\ + \n Filter: test.c\ + \n Filter: NOT test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2020,7 +2020,7 @@ mod tests { let expected = "\ Projection: test.a\ - \n Filter: NOT test.b AND test.c AS test.b != Boolean(true) AND test.c = Boolean(true)\ + \n Filter: NOT test.b AND test.c\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2039,7 +2039,7 @@ mod tests { let expected = "\ Projection: test.a\ - \n Filter: NOT test.b OR NOT test.c AS test.b != Boolean(true) OR test.c = Boolean(false)\ + \n Filter: NOT test.b OR NOT test.c\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2058,7 +2058,7 @@ mod tests { let expected = "\ Projection: test.a\ - \n Filter: test.b AS NOT test.b = Boolean(false)\ + \n Filter: test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2292,7 +2292,7 @@ mod tests { // Note that constant folder runs and folds the entire // expression down to a single constant (true) - let expected = "Filter: Boolean(true) AS now() < totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) + Int64(50000)\ + let expected = "Filter: Boolean(true)\ \n TableScan: test"; let actual = get_optimized_plan_formatted(&plan, &time); @@ -2340,7 +2340,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d <= Int32(10) AS NOT test.d > Int32(10)\ + let expected = "Filter: test.d <= Int32(10)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2355,7 +2355,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d <= Int32(10) OR test.d >= Int32(100) AS NOT test.d > Int32(10) AND test.d < Int32(100)\ + let expected = "Filter: test.d <= Int32(10) OR test.d >= Int32(100)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2370,7 +2370,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d <= Int32(10) AND test.d >= Int32(100) AS NOT test.d > Int32(10) OR test.d < Int32(100)\ + let expected = "Filter: test.d <= Int32(10) AND test.d >= Int32(100)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2385,7 +2385,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d > Int32(10) AS NOT NOT test.d > Int32(10)\ + let expected = "Filter: test.d > Int32(10)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2400,7 +2400,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d IS NOT NULL AS NOT test.d IS NULL\ + let expected = "Filter: test.d IS NOT NULL\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2415,7 +2415,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d IS NULL AS NOT test.d IS NOT NULL\ + let expected = "Filter: test.d IS NULL\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2430,7 +2430,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d NOT IN ([Int32(1), Int32(2), Int32(3)]) AS NOT test.d IN (Map { iter: Iter([Int32(1), Int32(2), Int32(3)]) })\ + let expected = "Filter: test.d NOT IN ([Int32(1), Int32(2), Int32(3)])\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2445,7 +2445,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d IN ([Int32(1), Int32(2), Int32(3)]) AS NOT test.d NOT IN (Map { iter: Iter([Int32(1), Int32(2), Int32(3)]) })\ + let expected = "Filter: test.d IN ([Int32(1), Int32(2), Int32(3)])\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2466,7 +2466,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d < Int32(1) OR test.d > Int32(10) AS NOT test.d BETWEEN Int32(1) AND Int32(10)\ + let expected = "Filter: test.d < Int32(1) OR test.d > Int32(10)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2487,7 +2487,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d >= Int32(1) AND test.d <= Int32(10) AS NOT test.d NOT BETWEEN Int32(1) AND Int32(10)\ + let expected = "Filter: test.d >= Int32(1) AND test.d <= Int32(10)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2509,7 +2509,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.a NOT LIKE test.b AS NOT test.a LIKE test.b\ + let expected = "Filter: test.a NOT LIKE test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2531,7 +2531,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.a LIKE test.b AS NOT test.a NOT LIKE test.b\ + let expected = "Filter: test.a LIKE test.b\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2546,7 +2546,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d IS NOT DISTINCT FROM Int32(10) AS NOT test.d IS DISTINCT FROM Int32(10)\ + let expected = "Filter: test.d IS NOT DISTINCT FROM Int32(10)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); @@ -2561,7 +2561,7 @@ mod tests { .unwrap() .build() .unwrap(); - let expected = "Filter: test.d IS DISTINCT FROM Int32(10) AS NOT test.d IS NOT DISTINCT FROM Int32(10)\ + let expected = "Filter: test.d IS DISTINCT FROM Int32(10)\ \n TableScan: test"; assert_optimized_plan_eq(&plan, expected); From b484e8ed93b234f423495bf3cf14e181f6d8d11f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 08:15:01 -0600 Subject: [PATCH 06/10] tests pass --- datafusion/core/tests/sql/select.rs | 6 ++-- datafusion/expr/src/logical_plan/plan.rs | 28 +++++++++++++------ .../optimizer/src/common_subexpr_eliminate.rs | 2 +- .../optimizer/src/decorrelate_where_exists.rs | 2 +- .../optimizer/src/decorrelate_where_in.rs | 2 +- .../optimizer/src/projection_push_down.rs | 4 +-- .../src/rewrite_disjunctive_predicate.rs | 2 +- .../optimizer/src/scalar_subquery_to_join.rs | 2 +- 8 files changed, 28 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 1c79f2386623b..e111b21ad4e50 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -16,7 +16,6 @@ // under the License. use super::*; -use datafusion::config::OPT_OPTIMIZER_SKIP_FAILED_RULES; use datafusion::{ datasource::empty::EmptyTable, from_slice::FromSlice, physical_plan::collect_partitioned, @@ -1213,14 +1212,13 @@ async fn boolean_literal() -> Result<()> { #[tokio::test] async fn unprojected_filter() { let config = SessionConfig::new(); - // let config = config.set_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES, false); let ctx = SessionContext::with_config(config); let df = ctx.read_table(table_with_sequence(1, 3).unwrap()).unwrap(); let df = df - .select(vec![col("i") + col("i")]) - .unwrap() .filter(col("i").gt(lit(2))) + .unwrap() + .select(vec![col("i") + col("i")]) .unwrap(); let plan = df.to_logical_plan().unwrap(); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6d35c798a156a..204001aa014b5 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1161,16 +1161,24 @@ impl Filter { predicate: Expr, input: Arc, ) -> datafusion_common::Result { - // TODO enable this check once invalid tests are updated - // filter predicates must return a boolean value - // let predicate_type = predicate.get_type(input.schema())?; - // if predicate_type != DataType::Boolean { - // return Err(DataFusionError::Plan(format!( - // "Cannot create filter with non-boolean predicate {} returning {}", - // predicate, predicate_type - // ))); - // } + match predicate.get_type(input.schema()) { + Ok(predicate_type) => { + if predicate_type != DataType::Boolean { + return Err(DataFusionError::Plan(format!( + "Cannot create filter with non-boolean predicate '{}' returning {}", + predicate, predicate_type + ))); + } + } + Err(e) => { + // We shouldn't really have to deal with an error case here but it looks + // like we sometimes have plans that are not valid until they are optimized. + // In this case, the optimizer rules will eventually recreate the + // filter expression and we'll do the validation at that time. + debug!("Could not determine type of filter predicate: {}", e); + } + } // filter predicates should not contain aliased expressions so we remove any aliases here // but perhaps we should be failing here instead? @@ -1205,10 +1213,12 @@ impl Filter { } } + /// Access the filter predicate expression pub fn predicate(&self) -> &Expr { &self.predicate } + /// Access the filter input plan pub fn input(&self) -> &Arc { &self.input } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 37c24f91ae1d5..df1c4de953472 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -140,7 +140,7 @@ fn optimize( if let Some(predicate) = pop_expr(&mut new_expr)?.pop() { Ok(LogicalPlan::Filter(Filter::try_new( - predicate.clone(), + predicate, Arc::new(new_input), )?)) } else { diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 7653639692eb8..d6727ad0fd61b 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -153,7 +153,7 @@ fn optimize_exists( // split into filters let mut subqry_filter_exprs = vec![]; - split_conjunction(&subqry_filter.predicate(), &mut subqry_filter_exprs); + split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index e3e61d91c390d..a3443eaee882f 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -152,7 +152,7 @@ fn optimize_where_in( if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() { // split into filters let mut subqry_filter_exprs = vec![]; - split_conjunction(&subqry_filter.predicate(), &mut subqry_filter_exprs); + split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 051a0ed745b23..5a048aac70f0c 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -580,12 +580,12 @@ mod tests { let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .filter(col("c"))? + .filter(col("c").gt(lit(1)))? .aggregate(Vec::::new(), vec![max(col("b"))])? .build()?; let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]]\ - \n Filter: test.c\ + \n Filter: test.c > Int32(1)\ \n TableScan: test projection=[b, c]"; assert_optimized_plan_eq(&plan, expected); diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index c6f1433fde6d7..a4f051e1c2035 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -129,7 +129,7 @@ impl RewriteDisjunctivePredicate { ) -> Result { match plan { LogicalPlan::Filter(filter) => { - let predicate = predicate(&filter.predicate())?; + let predicate = predicate(filter.predicate())?; let rewritten_predicate = rewrite_predicate(predicate); let rewritten_expr = normalize_predicate(rewritten_predicate); Ok(LogicalPlan::Filter(Filter::try_new( diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 57b259fdea6ae..d148881108257 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -236,7 +236,7 @@ fn optimize_scalar( // if there were filters, split and capture them let mut subqry_filter_exprs = vec![]; if let Some(filter) = filter { - split_conjunction(&filter.predicate(), &mut subqry_filter_exprs); + split_conjunction(filter.predicate(), &mut subqry_filter_exprs); } verify_not_disjunction(&subqry_filter_exprs)?; From ae51a4570aa4eb9e081c1e3d9e436901918326d0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 08:22:43 -0600 Subject: [PATCH 07/10] make Filter attributes private --- datafusion/expr/src/logical_plan/plan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 204001aa014b5..0c2d1e34b3588 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1150,9 +1150,9 @@ pub struct SubqueryAlias { #[derive(Clone)] pub struct Filter { /// The predicate expression, which must have Boolean type. - pub(crate) predicate: Expr, + predicate: Expr, /// The incoming logical plan - pub(crate) input: Arc, + input: Arc, } impl Filter { From 32d7be281d85f4a2ffe9b28f2e3e723f1ff45c23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 08:49:21 -0600 Subject: [PATCH 08/10] update docs --- datafusion/expr/src/logical_plan/plan.rs | 26 +++++++++--------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0c2d1e34b3588..1c7aef6d4d9f3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1161,22 +1161,16 @@ impl Filter { predicate: Expr, input: Arc, ) -> datafusion_common::Result { - // filter predicates must return a boolean value - match predicate.get_type(input.schema()) { - Ok(predicate_type) => { - if predicate_type != DataType::Boolean { - return Err(DataFusionError::Plan(format!( - "Cannot create filter with non-boolean predicate '{}' returning {}", - predicate, predicate_type - ))); - } - } - Err(e) => { - // We shouldn't really have to deal with an error case here but it looks - // like we sometimes have plans that are not valid until they are optimized. - // In this case, the optimizer rules will eventually recreate the - // filter expression and we'll do the validation at that time. - debug!("Could not determine type of filter predicate: {}", e); + // Filter predicates must return a boolean value so we try and validate that here. + // Note that it is not always possible to resolve the predicate expression during plan + // construction (such as with correlated subqueries) so we make a best effort here and + // ignore errors resolving the expression against the schema. + if let Ok(predicate_type) = predicate.get_type(input.schema()) { + if predicate_type != DataType::Boolean { + return Err(DataFusionError::Plan(format!( + "Cannot create filter with non-boolean predicate '{}' returning {}", + predicate, predicate_type + ))); } } From 6cd3343c63fd172039c105b0e48910e1af3d972b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 09:50:22 -0600 Subject: [PATCH 09/10] Cargo.lock --- datafusion-cli/Cargo.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 0a08e57248ce5..f8f1a627d9650 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -477,6 +477,7 @@ dependencies = [ "parking_lot", "parquet", "paste", + "percent-encoding", "pin-project-lite", "rand", "smallvec", @@ -522,6 +523,7 @@ dependencies = [ "ahash 0.8.0", "arrow", "datafusion-common", + "log", "sqlparser", ] From b5b4cf5b3063f537fe7aa6a5e50ea029662067b4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 08:30:29 -0600 Subject: [PATCH 10/10] clippy --- datafusion/core/src/physical_plan/planner.rs | 3 +- datafusion/expr/src/expr.rs | 8 ++++ datafusion/expr/src/logical_plan/plan.rs | 31 ++++-------- datafusion/expr/src/utils.rs | 50 ++++++++++++++++++-- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 599b133e995ac..88be00cd92f93 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1694,8 +1694,7 @@ mod tests { use arrow::record_batch::RecordBatch; use datafusion_common::{DFField, DFSchema, DFSchemaRef}; use datafusion_expr::expr::GroupingSet; - use datafusion_expr::sum; - use datafusion_expr::{col, lit}; + use datafusion_expr::{col, lit, sum}; use fmt::Debug; use std::collections::HashMap; use std::convert::TryFrom; diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c131682a8ef68..7f75098869b58 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -486,6 +486,14 @@ impl Expr { Expr::Alias(Box::new(self), name.to_owned()) } + /// Remove an alias from an expression if one exists. + pub fn unalias(self) -> Expr { + match self { + Expr::Alias(expr, _) => expr.as_ref().clone(), + _ => self, + } + } + /// Return `self IN ` if `negated` is false, otherwise /// return `self NOT IN `.a pub fn in_list(self, list: Vec, negated: bool) -> Expr { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1210a3ba37b16..fce1c76c42c84 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::expr_rewriter::{ExprRewritable, ExprRewriter}; ///! Logical plan types use crate::logical_plan::builder::validate_unique_names; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; @@ -26,7 +25,6 @@ use crate::utils::{ use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError}; -use log::debug; use std::collections::HashSet; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -1174,29 +1172,16 @@ impl Filter { } } - // filter predicates should not contain aliased expressions so we remove any aliases here - // but perhaps we should be failing here instead? - struct RemoveAliases {} - - impl ExprRewriter for RemoveAliases { - fn mutate(&mut self, expr: Expr) -> datafusion_common::Result { - match expr { - Expr::Alias(expr, alias) => { - debug!( - "Attempted to create Filter predicate containing aliased \ - expression `{}` AS '{}'", - expr, alias - ); - Ok(expr.as_ref().clone()) - } - _ => Ok(expr.clone()), - } - } + // filter predicates should not be aliased + if let Expr::Alias(expr, alias) = predicate { + return Err(DataFusionError::Plan(format!( + "Attempted to create Filter predicate with \ + expression `{}` aliased as '{}'. Filter predicates should not be \ + aliased.", + expr, alias + ))); } - let mut remove_aliases = RemoveAliases {}; - let predicate = predicate.rewrite(&mut remove_aliases)?; - Ok(Self { predicate, input }) } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index fbba755b1afa6..8e2544793b259 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -17,6 +17,7 @@ //! Expression utilities +use crate::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; use crate::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; use crate::logical_plan::builder::build_join_schema; use crate::logical_plan::{ @@ -380,10 +381,51 @@ pub fn from_plan( .map(|s| s.to_vec()) .collect::>(), })), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter::try_new( - expr[0].clone(), - Arc::new(inputs[0].clone()), - )?)), + LogicalPlan::Filter { .. } => { + assert_eq!(1, expr.len()); + let predicate = expr[0].clone(); + + // filter predicates should not contain aliased expressions so we remove any aliases + // before this logic was added we would have aliases within filters such as for + // benchmark q6: + // + // lineitem.l_shipdate >= Date32(\"8766\") + // AND lineitem.l_shipdate < Date32(\"9131\") + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >= + // Decimal128(Some(49999999999999),30,15) + // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= + // Decimal128(Some(69999999999999),30,15) + // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) + + struct RemoveAliases {} + + impl ExprRewriter for RemoveAliases { + fn pre_visit(&mut self, expr: &Expr) -> Result { + match expr { + Expr::Exists { .. } + | Expr::ScalarSubquery(_) + | Expr::InSubquery { .. } => { + // subqueries could contain aliases so we don't recurse into those + Ok(RewriteRecursion::Stop) + } + Expr::Alias(_, _) => Ok(RewriteRecursion::Mutate), + _ => Ok(RewriteRecursion::Continue), + } + } + + fn mutate(&mut self, expr: Expr) -> Result { + Ok(expr.unalias()) + } + } + + let mut remove_aliases = RemoveAliases {}; + let predicate = predicate.rewrite(&mut remove_aliases)?; + + Ok(LogicalPlan::Filter(Filter::try_new( + predicate, + Arc::new(inputs[0].clone()), + )?)) + } LogicalPlan::Repartition(Repartition { partitioning_scheme, ..