From 61b1a1632ffe40e33dc252f1d9b7b9c7253af7dc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 10 Oct 2022 16:58:48 -0600 Subject: [PATCH 01/14] Optimizer example and docs --- datafusion/optimizer/README.md | 140 ++++++++++++++- datafusion/optimizer/examples/rewrite_expr.rs | 163 ++++++++++++++++++ 2 files changed, 300 insertions(+), 3 deletions(-) create mode 100644 datafusion/optimizer/examples/rewrite_expr.rs diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 39d28a8fae37a..fc2b740d32954 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -17,10 +17,144 @@ under the License. --> -# DataFusion Query Optimizer Rules +# DataFusion Query Optimizer -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. +[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory +format. -This crate is a submodule of DataFusion that provides query optimizer rules. +DataFusion has modular design, allowing individual crates to be re-used in other projects. + +This crate is a submodule of DataFusion that provides a query optimizer for logical plans. + +## Running the Optimizer + +The following code demonstrates the basic flow of creating the optimizer with a default set of optimization rules +and applying it to a logical plan to produce an optimized logical plan. + +```rust + +// We need a logical plan as the starting point. There are many ways to build a logical plan: +// +// The `datafusion-expr` crate provides a LogicalPlanBuilder +// The `datafusion-sql` crate provides a SQL query planner that can create a LogicalPlan from SQL +// The `datafusion` crate provides a DataFrame API that can create a LogicalPlan +let logical_plan = ... + +let mut config = OptimizerConfig::default(); +let optimizer = Optimizer::new(&config); +let optimized_plan = optimizer.optimize(&logical_plan, &mut config, observe)?; + +fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { + println!( + "After applying rule '{}':\n{}", + rule.name(), + plan.display_indent() + ) +} +``` + +## Providing Custom Rules + +The optimizer can be created with a custom set of rules. + +```rust +let optimizer = Optimizer::with_rules(vec![ + Arc::new(MyRule {}) +]); +``` + +## Writing Optimization Rules + +Please refer to the [examples](examples) to learn more about the general approach to writing optimizer rules and +then move onto studying the existing rules. + +All rules must implement the `OptimizerRule` trait. + +```rust +/// `OptimizerRule` transforms one ['LogicalPlan'] into another which +/// computes the same results, but in a potentially more efficient +/// way. +pub trait OptimizerRule { + /// Rewrite `plan` to an optimized form + fn optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result; + + /// A human readable name for this optimizer rule + fn name(&self) -> &str; +} +``` + +### General Guidelines + +Rules typical walk the logical plan and walk the expression trees inside operators and selectively mutate +individual operators or expressions. + +Sometimes there is an initial pass that visits the plan and builds state that is used in a second pass that performs +the actual optimization. This approach is used in projection push down and filter push down. + +### Utilities + +There are a number of utility methods provided that take care of some common tasks. + +### ExprVisitor + +TBD + +### Rewriting Expressions + +The `MyExprRewriter` trait can be implemented to provide a way to rewrite expressions. This rule can then be applied +to an expression by calling `Expr::rewrite` (from the `ExprRewritable` trait). + +The `rewrite` method will perform a depth first walk of the expression and its children to rewrite an expression, +consuming `self` producing a new expression. + +```rust +let mut expr_rewriter = MyExprRewriter {}; +let expr = expr.rewrite(&mut expr_rewriter)?; +``` + +Here is an example implementation which will rewrite `expr BETWEEN a AND b` as `expr >= a AND expr <= b`. Note that the +implementation does not need to perform any recursion since this is handled by the `rewrite` method. + +```rust +struct MyExprRewriter {} + +impl ExprRewriter for MyExprRewriter { + fn mutate(&mut self, expr: Expr) -> Result { + match expr { + Expr::Between { + negated, + expr, + low, + high, + } => { + let expr: Expr = expr.as_ref().clone(); + let low: Expr = low.as_ref().clone(); + let high: Expr = high.as_ref().clone(); + if negated { + Ok(expr.clone().lt(low).or(expr.clone().gt(high))) + } else { + Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) + } + } + _ => Ok(expr.clone()), + } + } +} +``` + +### optimize_children + +TBD + +### Writing Tests + +There should be unit tests in the same file as the new rule that test the effect of the rule being applied to a plan +in isolation (without any other rule being applied). + +There should also be a test in `integration-tests.rs` that tests the rule as part of the overall optimization process. [df]: https://crates.io/crates/datafusion diff --git a/datafusion/optimizer/examples/rewrite_expr.rs b/datafusion/optimizer/examples/rewrite_expr.rs new file mode 100644 index 0000000000000..7d2c312fb03ab --- /dev/null +++ b/datafusion/optimizer/examples/rewrite_expr.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; +use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource}; +use datafusion_optimizer::optimizer::Optimizer; +use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule}; +use datafusion_sql::planner::{ContextProvider, SqlToRel}; +use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; +use datafusion_sql::sqlparser::parser::Parser; +use datafusion_sql::TableReference; +use std::any::Any; +use std::sync::Arc; + +pub fn main() -> Result<()> { + // produce a logical plan using the datafusion-sql crate + let dialect = PostgreSqlDialect {}; + let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32"; + let statements = Parser::parse_sql(&dialect, sql)?; + + // produce a logical plan using the datafusion-sql crate + let context_provider = MyContextProvider {}; + let sql_to_rel = SqlToRel::new(&context_provider); + let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; + println!( + "Unoptimized Logical Plan:\n\n{}\n", + logical_plan.display_indent() + ); + + // now run the optimizer with our custom rule + let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); + let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false); + let optimized_plan = + optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?; + println!( + "Optimized Logical Plan:\n\n{}\n", + optimized_plan.display_indent() + ); + + Ok(()) +} + +fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { + println!( + "After applying rule '{}':\n{}\n", + rule.name(), + plan.display_indent() + ) +} + +struct MyRule {} + +impl OptimizerRule for MyRule { + fn name(&self) -> &str { + "my_rule" + } + + fn optimize( + &self, + plan: &LogicalPlan, + _config: &mut OptimizerConfig, + ) -> Result { + // recurse down and optimize children first + let plan = utils::optimize_children(self, plan, _config)?; + + match plan { + LogicalPlan::Filter(filter) => { + let mut expr_rewriter = MyExprRewriter {}; + let predicate = filter.predicate.clone(); + let predicate = predicate.rewrite(&mut expr_rewriter)?; + Ok(LogicalPlan::Filter(Filter { + predicate, + input: filter.input.clone(), + })) + } + _ => Ok(plan.clone()), + } + } +} + +struct MyExprRewriter {} + +impl ExprRewriter for MyExprRewriter { + fn mutate(&mut self, expr: Expr) -> Result { + match expr { + Expr::Between { + negated, + expr, + low, + high, + } => { + let expr: Expr = expr.as_ref().clone(); + let low: Expr = low.as_ref().clone(); + let high: Expr = high.as_ref().clone(); + if negated { + Ok(expr.clone().lt(low).or(expr.clone().gt(high))) + } else { + Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) + } + } + _ => Ok(expr.clone()), + } + } +} + +struct MyContextProvider {} + +impl ContextProvider for MyContextProvider { + fn get_table_provider(&self, name: TableReference) -> Result> { + if name.table() == "person" { + Ok(Arc::new(MyTableSource { + schema: Arc::new(Schema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::UInt8, false), + ])), + })) + } else { + Err(DataFusionError::Plan("table not found".to_string())) + } + } + + fn get_function_meta(&self, _name: &str) -> Option> { + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + None + } +} + +struct MyTableSource { + schema: SchemaRef, +} + +impl TableSource for MyTableSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} From af42a866b0a37b3fb14ad60869a3eec34a61fe46 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 10 Oct 2022 17:07:43 -0600 Subject: [PATCH 02/14] more docs --- datafusion/optimizer/README.md | 41 ++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index fc2b740d32954..dbf8472ae8061 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -101,7 +101,29 @@ There are a number of utility methods provided that take care of some common tas ### ExprVisitor -TBD +The `ExprVisitor` and `ExprVisitable` traits provide a mechanism for applying a visitor pattern to an expression tree. + +Here is an example that demonstrates this. + +```rust +fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Result<()> { + struct InSubqueryVisitor<'a> { + accum: &'a mut Vec, + } + + impl ExpressionVisitor for InSubqueryVisitor<'_> { + fn pre_visit(self, expr: &Expr) -> Result> { + if let Expr::InSubquery { .. } = expr { + self.accum.push(expr.to_owned()); + } + Ok(Recursion::Continue(self)) + } + } + + expression.accept(InSubqueryVisitor { accum: extracted })?; + Ok(()) +} +``` ### Rewriting Expressions @@ -148,7 +170,22 @@ impl ExprRewriter for MyExprRewriter { ### optimize_children -TBD +It is quite typical for a rule to be applied recursively to all operators within a query plan. Rather than duplicate +that logic in each rule, an `optimize_children` method is provided. This recursively invokes the `optimize` method on +the plan's children and then returns a node of the same type. + +```rust +fn optimize( + &self, + plan: &LogicalPlan, + _config: &mut OptimizerConfig, +) -> Result { + // recurse down and optimize children first + let plan = utils::optimize_children(self, plan, _config)?; + + ... +} +``` ### Writing Tests From cd7e2b2114888e4bb8b4abea4b350d8b4e36f8bd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 11 Oct 2022 15:48:19 -0600 Subject: [PATCH 03/14] clippy --- datafusion/optimizer/examples/rewrite_expr.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/examples/rewrite_expr.rs b/datafusion/optimizer/examples/rewrite_expr.rs index 7d2c312fb03ab..21310428e4b81 100644 --- a/datafusion/optimizer/examples/rewrite_expr.rs +++ b/datafusion/optimizer/examples/rewrite_expr.rs @@ -86,7 +86,7 @@ impl OptimizerRule for MyRule { let predicate = predicate.rewrite(&mut expr_rewriter)?; Ok(LogicalPlan::Filter(Filter { predicate, - input: filter.input.clone(), + input: filter.input, })) } _ => Ok(plan.clone()), @@ -109,9 +109,9 @@ impl ExprRewriter for MyExprRewriter { let low: Expr = low.as_ref().clone(); let high: Expr = high.as_ref().clone(); if negated { - Ok(expr.clone().lt(low).or(expr.clone().gt(high))) + Ok(expr.clone().lt(low).or(expr.gt(high))) } else { - Ok(expr.clone().gt_eq(low).and(expr.clone().lt_eq(high))) + Ok(expr.clone().gt_eq(low).and(expr.lt_eq(high))) } } _ => Ok(expr.clone()), From 21e7370e7bb3fbb4b01bba18e2c14a9d3ee0f47a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 07:04:10 -0600 Subject: [PATCH 04/14] Add notes on expression naming --- datafusion/optimizer/README.md | 92 ++++++++++++++++++++++++++- datafusion/optimizer/src/optimizer.rs | 3 +- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index dbf8472ae8061..4d76458068991 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -73,7 +73,8 @@ All rules must implement the `OptimizerRule` trait. ```rust /// `OptimizerRule` transforms one ['LogicalPlan'] into another which /// computes the same results, but in a potentially more efficient -/// way. +/// way. If there are no suitable transformations for the input plan, +/// the optimizer can simply return it as is. pub trait OptimizerRule { /// Rewrite `plan` to an optimized form fn optimize( @@ -95,6 +96,95 @@ individual operators or expressions. Sometimes there is an initial pass that visits the plan and builds state that is used in a second pass that performs the actual optimization. This approach is used in projection push down and filter push down. +### Expression Naming + +Every expression in DataFusion has a name, which is used as the column name. For example, in this example the output +contains a single column with the name "COUNT(aggregate_test_100.c9)": + +```text +❯ select count(c9) from aggregate_test_100; ++------------------------------+ +| COUNT(aggregate_test_100.c9) | ++------------------------------+ +| 100 | ++------------------------------+ +``` + +These names are used to refer to the columns in both subqueries as well as internally from one stage of the LogicalPlan +to another. For example: + +```text +❯ select "COUNT(aggregate_test_100.c9)" + 1 from (select count(c9) from aggregate_test_100) as sq; ++--------------------------------------------+ +| sq.COUNT(aggregate_test_100.c9) + Int64(1) | ++--------------------------------------------+ +| 101 | ++--------------------------------------------+ +``` + +### Implication + +DataFusion contains an extensive set of OptimizerRules that may rewrite the plan and/or its expressions so they execute +more quickly. + +Because DataFusion identifies columns using a string name, it means it is critical that the names of expressions are +not changed by the optimizer when it rewrites expressions. This is typically accomplished by renaming a rewritten +expression by adding an alias. + +Here is a simple example of such a rewrite. The expression `1 + 2` can be internally simplified to 3 but must still be +displayed the same as `1 + 2`: + +```text +❯ select 1 + 2; ++---------------------+ +| Int64(1) + Int64(2) | ++---------------------+ +| 3 | ++---------------------+ +``` + +Looking at the `EXPLAIN` output we can see that the optimizer has effectively rewritten `1 + 2` into effectively +`3 as "1 + 2"`: + +```text +❯ explain select 1 + 2; ++---------------+-------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------+ +| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) | +| | EmptyRelation | +| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] | +| | EmptyExec: produce_one_row=true | +| | | ++---------------+-------------------------------------------------+ +``` + +If the expression name is not preserved, bugs such as #3704 and #3555 occur where the expected columns can not be found. + +### Problems + +There are at least three places that we have rediscovered the issue of names changing when rewriting expressions such +as #3704, #3555 and https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/simplify_expressions.rs#L316 + +### Building Expression Names + +There are currently two ways to create a "name" for an expression in the logical plan. + +```rust +impl Expr { + /// Returns the name of this expression as it should appear in a schema. This name + /// will not include any CAST expressions. + pub fn name(&self) -> Result { + create_name(self) + } + + /// Returns a full and complete string representation of this expression. + pub fn canonical_name(&self) -> String { + format!("{}", self) + } +} +``` + ### Utilities There are a number of utility methods provided that take care of some common tasks. diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 87e4d1ffcd13a..5e61ccc5a4ef7 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -43,7 +43,8 @@ use std::sync::Arc; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which /// computes the same results, but in a potentially more efficient -/// way. +/// way. If there are no suitable transformations for the input plan, +/// the optimizer can simply return it as is. pub trait OptimizerRule { /// Rewrite `plan` to an optimized form fn optimize( From 441875f038940e6dac2120ff5dbe9d1c4355d53b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 07:06:46 -0600 Subject: [PATCH 05/14] debugging --- datafusion/optimizer/README.md | 47 ++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 4d76458068991..5c88962606d1b 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -284,4 +284,51 @@ in isolation (without any other rule being applied). There should also be a test in `integration-tests.rs` that tests the rule as part of the overall optimization process. +### Debugging + +The `EXPLAIN VERBOSE` command can be used to show the effect of each optimization rule on a query. + +```text +❯ explain verbose select cast(1 + 2.2 as string) as foo; ++------------------------------------------------------------+---------------------------------------------------------------------------+ +| plan_type | plan | ++------------------------------------------------------------+---------------------------------------------------------------------------+ +| initial_logical_plan | Projection: CAST(Int64(1) + Float64(2.2) AS Utf8) AS foo | +| | EmptyRelation | +| logical_plan after type_coercion | Projection: CAST(CAST(Int64(1) AS Float64) + Float64(2.2) AS Utf8) AS foo | +| | EmptyRelation | +| logical_plan after simplify_expressions | Projection: Utf8("3.2") AS foo | +| | EmptyRelation | +| logical_plan after unwrap_cast_in_comparison | SAME TEXT AS ABOVE | +| logical_plan after decorrelate_where_exists | SAME TEXT AS ABOVE | +| logical_plan after decorrelate_where_in | SAME TEXT AS ABOVE | +| logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | +| logical_plan after subquery_filter_to_join | SAME TEXT AS ABOVE | +| logical_plan after simplify_expressions | SAME TEXT AS ABOVE | +| logical_plan after eliminate_filter | SAME TEXT AS ABOVE | +| logical_plan after reduce_cross_join | SAME TEXT AS ABOVE | +| logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | +| logical_plan after eliminate_limit | SAME TEXT AS ABOVE | +| logical_plan after projection_push_down | SAME TEXT AS ABOVE | +| logical_plan after rewrite_disjunctive_predicate | SAME TEXT AS ABOVE | +| logical_plan after reduce_outer_join | SAME TEXT AS ABOVE | +| logical_plan after filter_push_down | SAME TEXT AS ABOVE | +| logical_plan after limit_push_down | SAME TEXT AS ABOVE | +| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | +| logical_plan | Projection: Utf8("3.2") AS foo | +| | EmptyRelation | +| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] | +| | EmptyExec: produce_one_row=true | +| | | +| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE | +| physical_plan after hash_build_probe_order | SAME TEXT AS ABOVE | +| physical_plan after coalesce_batches | SAME TEXT AS ABOVE | +| physical_plan after repartition | SAME TEXT AS ABOVE | +| physical_plan after add_merge_exec | SAME TEXT AS ABOVE | +| physical_plan | ProjectionExec: expr=[3.2 as foo] | +| | EmptyExec: produce_one_row=true | +| | | ++------------------------------------------------------------+---------------------------------------------------------------------------+ +``` + [df]: https://crates.io/crates/datafusion From 465d30f169d9f2345f3f9df21868a87459c895d7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 07:15:21 -0600 Subject: [PATCH 06/14] add display_name and deprecate name --- datafusion/expr/src/expr.rs | 13 ++++++++++--- datafusion/expr/src/expr_schema.rs | 2 +- datafusion/expr/src/utils.rs | 4 ++-- datafusion/optimizer/README.md | 17 ++++++++--------- .../optimizer/src/common_subexpr_eliminate.rs | 2 +- datafusion/optimizer/src/filter_push_down.rs | 2 +- .../optimizer/src/projection_push_down.rs | 4 ++-- .../optimizer/src/simplify_expressions.rs | 4 ++-- .../optimizer/src/single_distinct_to_groupby.rs | 4 ++-- datafusion/optimizer/src/type_coercion.rs | 4 ++-- .../optimizer/src/unwrap_cast_in_comparison.rs | 2 +- 11 files changed, 32 insertions(+), 26 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index c131682a8ef68..6a8316d02f6f0 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -367,10 +367,17 @@ impl PartialOrd for Expr { impl Expr { /// Returns the name of this expression as it should appear in a schema. This name /// will not include any CAST expressions. - pub fn name(&self) -> Result { + pub fn display_name(&self) -> Result { create_name(self) } + /// Returns the name of this expression as it should appear in a schema. This name + /// will not include any CAST expressions. + #[deprecated(since = "14.0.0", note = "please use `display_name` instead")] + pub fn name(&self) -> Result { + self.display_name() + } + /// Returns a full and complete string representation of this expression. pub fn canonical_name(&self) -> String { format!("{}", self) @@ -1178,7 +1185,7 @@ mod test { assert_eq!(expected, expr.canonical_name()); assert_eq!(expected, format!("{}", expr)); assert_eq!(expected, format!("{:?}", expr)); - assert_eq!(expected, expr.name()?); + assert_eq!(expected, expr.display_name()?); Ok(()) } @@ -1194,7 +1201,7 @@ mod test { assert_eq!(expected_canonical, format!("{:?}", expr)); // note that CAST intentionally has a name that is different from its `Display` // representation. CAST does not change the name of expressions. - assert_eq!("Float32(1.23)", expr.name()?); + assert_eq!("Float32(1.23)", expr.display_name()?); Ok(()) } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 5442a24212d21..cfcbcc9406031 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -240,7 +240,7 @@ impl ExprSchemable for Expr { )), _ => Ok(DFField::new( None, - &self.name()?, + &self.display_name()?, self.get_type(input_schema)?, self.nullable(input_schema)?, )), diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 501b4a8f11fc4..f0d94f4608a3f 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -634,7 +634,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr { Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name) } Expr::ScalarSubquery(_) => e.clone(), - _ => match e.name() { + _ => match e.display_name() { Ok(name) => match input_schema.field_with_unqualified_name(&name) { Ok(field) => Expr::Column(field.qualified_column()), // expression not provided as input, do not convert to a column reference @@ -686,7 +686,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { let field = plan.schema().field_from_column(col)?; Ok(Expr::Column(field.qualified_column())) } - _ => Ok(Expr::Column(Column::from_name(expr.name()?))), + _ => Ok(Expr::Column(Column::from_name(expr.display_name()?))), } } diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 5c88962606d1b..b6a40dd05b52d 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -99,7 +99,7 @@ the actual optimization. This approach is used in projection push down and filte ### Expression Naming Every expression in DataFusion has a name, which is used as the column name. For example, in this example the output -contains a single column with the name "COUNT(aggregate_test_100.c9)": +contains a single column with the name `"COUNT(aggregate_test_100.c9)"`: ```text ❯ select count(c9) from aggregate_test_100; @@ -159,22 +159,18 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re +---------------+-------------------------------------------------+ ``` -If the expression name is not preserved, bugs such as #3704 and #3555 occur where the expected columns can not be found. - -### Problems - -There are at least three places that we have rediscovered the issue of names changing when rewriting expressions such -as #3704, #3555 and https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/simplify_expressions.rs#L316 +If the expression name is not preserved, bugs such as [#3704](https://github.com/apache/arrow-datafusion/issues/3704) +and [#3555](https://github.com/apache/arrow-datafusion/issues/3555) occur where the expected columns can not be found. ### Building Expression Names -There are currently two ways to create a "name" for an expression in the logical plan. +There are currently two ways to create a name for an expression in the logical plan. ```rust impl Expr { /// Returns the name of this expression as it should appear in a schema. This name /// will not include any CAST expressions. - pub fn name(&self) -> Result { + pub fn display_name(&self) -> Result { create_name(self) } @@ -185,6 +181,9 @@ impl Expr { } ``` +When comparing expressions to determine if they are equivalent, `canonical_name` should be used, and when creating a +name to be used in a schema, `display_name` should be used. + ### Utilities There are a number of utility methods provided that take care of some common tasks. diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index cea5e8c46eb5e..ed65d81952143 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -599,7 +599,7 @@ impl ExprRewriter for CommonSubexprRewriter<'_> { self.curr_index += 1; } - let expr_name = expr.name()?; + let expr_name = expr.display_name()?; // Alias this `Column` expr to it original "expr name", // `projection_push_down` optimizer use "expr name" to eliminate useless // projections. diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 129766012531d..503c31288bbd6 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -406,7 +406,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { let agg_columns = aggr_expr .iter() - .map(|x| Ok(Column::from_name(x.name()?))) + .map(|x| Ok(Column::from_name(x.display_name()?))) .collect::>>()?; used_columns.extend(agg_columns); diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 051a0ed745b23..4d8abb0b14895 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -259,7 +259,7 @@ fn optimize_plan( let mut new_window_expr = Vec::new(); { window_expr.iter().try_for_each(|expr| { - let name = &expr.name()?; + let name = &expr.display_name()?; let column = Column::from_name(name); if required_columns.contains(&column) { new_window_expr.push(expr.clone()); @@ -317,7 +317,7 @@ fn optimize_plan( // Gather all columns needed for expressions in this Aggregate let mut new_aggr_expr = Vec::new(); aggr_expr.iter().try_for_each(|expr| { - let name = &expr.name()?; + let name = &expr.display_name()?; let column = Column::from_name(name); if required_columns.contains(&column) { new_aggr_expr.push(expr.clone()); diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 95c640b2a7088..5e1799caf852b 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -306,12 +306,12 @@ impl SimplifyExpressions { .map(|e| { // We need to keep original expression name, if any. // Constant folding should not change expression name. - let name = &e.name(); + let name = &e.display_name(); // Apply the actual simplification logic let new_e = e.simplify(&info)?; - let new_name = &new_e.name(); + let new_name = &new_e.display_name(); if let (Ok(expr_name), Ok(new_expr_name)) = (name, new_name) { if expr_name != new_expr_name { diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 6d33437637962..e6ec0e72febf6 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -91,7 +91,7 @@ fn optimize(plan: &LogicalPlan) -> Result { fun, args, filter, .. } => { // is_single_distinct_agg ensure args.len=1 - if group_fields_set.insert(args[0].name()?) { + if group_fields_set.insert(args[0].display_name()?) { inner_group_exprs .push(args[0].clone().alias(SINGLE_DISTINCT_ALIAS)); } @@ -189,7 +189,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { distinct_count += 1; } for e in args { - fields_set.insert(e.name()?); + fields_set.insert(e.display_name()?); } } } diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index 89d5d660b8857..5f53ac4c913d1 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -94,7 +94,7 @@ fn optimize_internal( let original_expr_names: Vec> = plan .expressions() .iter() - .map(|expr| expr.name().ok()) + .map(|expr| expr.display_name().ok()) .collect(); let new_expr = plan @@ -107,7 +107,7 @@ fn optimize_internal( // ensure aggregate names don't change: // https://github.com/apache/arrow-datafusion/issues/3555 if matches!(expr, Expr::AggregateFunction { .. }) { - if let Some((alias, name)) = original_name.zip(expr.name().ok()) { + if let Some((alias, name)) = original_name.zip(expr.display_name().ok()) { if alias != name { return Ok(expr.alias(&alias)); } diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 8392e28a913dc..3c33344c66372 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -110,7 +110,7 @@ fn optimize(plan: &LogicalPlan) -> Result { fn name_for_alias(expr: &Expr) -> Result { match expr { Expr::Sort { expr, .. } => name_for_alias(expr), - expr => expr.name(), + expr => expr.display_name(), } } From b5fea6aff6c381337cd2bb8dbe9fbfb60ff77d7e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 08:37:58 -0600 Subject: [PATCH 07/14] clippy --- datafusion/expr/src/logical_plan/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 5226399840759..0e46942c938e3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -841,7 +841,7 @@ pub(crate) fn validate_unique_names<'a>( ) -> Result<()> { let mut unique_names = HashMap::new(); expressions.into_iter().enumerate().try_for_each(|(position, expr)| { - let name = expr.name()?; + let name = expr.display_name()?; match unique_names.get(&name) { None => { unique_names.insert(name, (position, expr)); From 45aa9a070102efc5834bdb3afce13ed0c25da2ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 10:12:21 -0600 Subject: [PATCH 08/14] Update datafusion/optimizer/README.md Co-authored-by: Andrew Lamb --- datafusion/optimizer/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index b6a40dd05b52d..7d37586f3c172 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -259,7 +259,7 @@ impl ExprRewriter for MyExprRewriter { ### optimize_children -It is quite typical for a rule to be applied recursively to all operators within a query plan. Rather than duplicate +Typically a rule is applied recursively to all operators within a query plan. Rather than duplicate that logic in each rule, an `optimize_children` method is provided. This recursively invokes the `optimize` method on the plan's children and then returns a node of the same type. From 04bf091910d18c159a44397c84bc611244594034 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 10:12:28 -0600 Subject: [PATCH 09/14] Update datafusion/optimizer/README.md Co-authored-by: Andrew Lamb --- datafusion/optimizer/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 7d37586f3c172..1c0e41bbcb1ba 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -287,6 +287,8 @@ There should also be a test in `integration-tests.rs` that tests the rule as par The `EXPLAIN VERBOSE` command can be used to show the effect of each optimization rule on a query. +In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time. + ```text ❯ explain verbose select cast(1 + 2.2 as string) as foo; +------------------------------------------------------------+---------------------------------------------------------------------------+ From 9da938a1d759a5330028d03ee57e46dcb899c6cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 10:12:55 -0600 Subject: [PATCH 10/14] Update datafusion/optimizer/README.md Co-authored-by: Andrew Lamb --- datafusion/optimizer/README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 1c0e41bbcb1ba..74aa879641f7f 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -124,9 +124,6 @@ to another. For example: ### Implication -DataFusion contains an extensive set of OptimizerRules that may rewrite the plan and/or its expressions so they execute -more quickly. - Because DataFusion identifies columns using a string name, it means it is critical that the names of expressions are not changed by the optimizer when it rewrites expressions. This is typically accomplished by renaming a rewritten expression by adding an alias. From a0df099a7d471af61f9b0f8ae8fbd6a0b37c9cd4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 10:13:17 -0600 Subject: [PATCH 11/14] Update datafusion/optimizer/README.md Co-authored-by: Andrew Lamb --- datafusion/optimizer/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 74aa879641f7f..48b2de6fa0b40 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -24,7 +24,9 @@ format. DataFusion has modular design, allowing individual crates to be re-used in other projects. -This crate is a submodule of DataFusion that provides a query optimizer for logical plans. +This crate is a submodule of DataFusion that provides a query optimizer for logical plans, and +contains an extensive set of OptimizerRules that may rewrite the plan and/or its expressions so +they execute more quickly while still computing the same result. ## Running the Optimizer From 646e1b98d29b16a1230672076b780305b101caae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 11:14:27 -0600 Subject: [PATCH 12/14] move example --- datafusion-examples/Cargo.toml | 5 + datafusion/optimizer/README.md | 6 +- datafusion/optimizer/examples/rewrite_expr.rs | 163 ------------------ 3 files changed, 8 insertions(+), 166 deletions(-) delete mode 100644 datafusion/optimizer/examples/rewrite_expr.rs diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 844bdba7a0f2b..0d42fe0fa8e88 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -36,7 +36,12 @@ required-features = ["datafusion/avro"] [dev-dependencies] arrow-flight = "24.0.0" async-trait = "0.1.41" +arrow = "24.0.0" datafusion = { path = "../datafusion/core" } +datafusion-sql = { path = "../datafusion/sql" } +datafusion-expr = { path = "../datafusion/expr" } +datafusion-optimizer = { path = "../datafusion/optimizer" } +datafusion-common = { path = "../datafusion/common" } futures = "0.3" num_cpus = "1.13.0" object_store = { version = "0.5.0", features = ["aws"] } diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md index 48b2de6fa0b40..b8edeea87ac13 100644 --- a/datafusion/optimizer/README.md +++ b/datafusion/optimizer/README.md @@ -67,8 +67,8 @@ let optimizer = Optimizer::with_rules(vec![ ## Writing Optimization Rules -Please refer to the [examples](examples) to learn more about the general approach to writing optimizer rules and -then move onto studying the existing rules. +Please refer to the [rewrite_expr example](../../datafusion-examples/examples/rewrite_expr.rs) to learn more about +the general approach to writing optimizer rules and then move onto studying the existing rules. All rules must implement the `OptimizerRule` trait. @@ -286,7 +286,7 @@ There should also be a test in `integration-tests.rs` that tests the rule as par The `EXPLAIN VERBOSE` command can be used to show the effect of each optimization rule on a query. -In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time. +In the following example, the `type_coercion` and `simplify_expressions` passes have simplified the plan so that it returns the constant `"3.2"` rather than doing a computation at execution time. ```text ❯ explain verbose select cast(1 + 2.2 as string) as foo; diff --git a/datafusion/optimizer/examples/rewrite_expr.rs b/datafusion/optimizer/examples/rewrite_expr.rs deleted file mode 100644 index 5e93bfd73787b..0000000000000 --- a/datafusion/optimizer/examples/rewrite_expr.rs +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; -use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource}; -use datafusion_optimizer::optimizer::Optimizer; -use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule}; -use datafusion_sql::planner::{ContextProvider, SqlToRel}; -use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; -use datafusion_sql::sqlparser::parser::Parser; -use datafusion_sql::TableReference; -use std::any::Any; -use std::sync::Arc; - -pub fn main() -> Result<()> { - // produce a logical plan using the datafusion-sql crate - let dialect = PostgreSqlDialect {}; - let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32"; - let statements = Parser::parse_sql(&dialect, sql)?; - - // produce a logical plan using the datafusion-sql crate - let context_provider = MyContextProvider {}; - let sql_to_rel = SqlToRel::new(&context_provider); - let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; - println!( - "Unoptimized Logical Plan:\n\n{}\n", - logical_plan.display_indent() - ); - - // now run the optimizer with our custom rule - let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); - let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false); - let optimized_plan = - optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?; - println!( - "Optimized Logical Plan:\n\n{}\n", - optimized_plan.display_indent() - ); - - Ok(()) -} - -fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { - println!( - "After applying rule '{}':\n{}\n", - rule.name(), - plan.display_indent() - ) -} - -struct MyRule {} - -impl OptimizerRule for MyRule { - fn name(&self) -> &str { - "my_rule" - } - - fn optimize( - &self, - plan: &LogicalPlan, - _config: &mut OptimizerConfig, - ) -> Result { - // recurse down and optimize children first - let plan = utils::optimize_children(self, plan, _config)?; - - match plan { - LogicalPlan::Filter(filter) => { - let mut expr_rewriter = MyExprRewriter {}; - let predicate = filter.predicate().clone(); - let predicate = predicate.rewrite(&mut expr_rewriter)?; - Ok(LogicalPlan::Filter(Filter::try_new( - predicate, - filter.input().clone(), - )?)) - } - _ => Ok(plan.clone()), - } - } -} - -struct MyExprRewriter {} - -impl ExprRewriter for MyExprRewriter { - fn mutate(&mut self, expr: Expr) -> Result { - match expr { - Expr::Between { - negated, - expr, - low, - high, - } => { - let expr: Expr = expr.as_ref().clone(); - let low: Expr = low.as_ref().clone(); - let high: Expr = high.as_ref().clone(); - if negated { - Ok(expr.clone().lt(low).or(expr.gt(high))) - } else { - Ok(expr.clone().gt_eq(low).and(expr.lt_eq(high))) - } - } - _ => Ok(expr.clone()), - } - } -} - -struct MyContextProvider {} - -impl ContextProvider for MyContextProvider { - fn get_table_provider(&self, name: TableReference) -> Result> { - if name.table() == "person" { - Ok(Arc::new(MyTableSource { - schema: Arc::new(Schema::new(vec![ - Field::new("name", DataType::Utf8, false), - Field::new("age", DataType::UInt8, false), - ])), - })) - } else { - Err(DataFusionError::Plan("table not found".to_string())) - } - } - - fn get_function_meta(&self, _name: &str) -> Option> { - None - } - - fn get_aggregate_meta(&self, _name: &str) -> Option> { - None - } - - fn get_variable_type(&self, _variable_names: &[String]) -> Option { - None - } -} - -struct MyTableSource { - schema: SchemaRef, -} - -impl TableSource for MyTableSource { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} From c33b0f8ddc836fc8adfe8dc242211caba963cdcc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 11:14:47 -0600 Subject: [PATCH 13/14] move example --- datafusion-examples/examples/rewrite_expr.rs | 163 +++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 datafusion-examples/examples/rewrite_expr.rs diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs new file mode 100644 index 0000000000000..5e93bfd73787b --- /dev/null +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; +use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource}; +use datafusion_optimizer::optimizer::Optimizer; +use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule}; +use datafusion_sql::planner::{ContextProvider, SqlToRel}; +use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; +use datafusion_sql::sqlparser::parser::Parser; +use datafusion_sql::TableReference; +use std::any::Any; +use std::sync::Arc; + +pub fn main() -> Result<()> { + // produce a logical plan using the datafusion-sql crate + let dialect = PostgreSqlDialect {}; + let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32"; + let statements = Parser::parse_sql(&dialect, sql)?; + + // produce a logical plan using the datafusion-sql crate + let context_provider = MyContextProvider {}; + let sql_to_rel = SqlToRel::new(&context_provider); + let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; + println!( + "Unoptimized Logical Plan:\n\n{}\n", + logical_plan.display_indent() + ); + + // now run the optimizer with our custom rule + let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]); + let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false); + let optimized_plan = + optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?; + println!( + "Optimized Logical Plan:\n\n{}\n", + optimized_plan.display_indent() + ); + + Ok(()) +} + +fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) { + println!( + "After applying rule '{}':\n{}\n", + rule.name(), + plan.display_indent() + ) +} + +struct MyRule {} + +impl OptimizerRule for MyRule { + fn name(&self) -> &str { + "my_rule" + } + + fn optimize( + &self, + plan: &LogicalPlan, + _config: &mut OptimizerConfig, + ) -> Result { + // recurse down and optimize children first + let plan = utils::optimize_children(self, plan, _config)?; + + match plan { + LogicalPlan::Filter(filter) => { + let mut expr_rewriter = MyExprRewriter {}; + let predicate = filter.predicate().clone(); + let predicate = predicate.rewrite(&mut expr_rewriter)?; + Ok(LogicalPlan::Filter(Filter::try_new( + predicate, + filter.input().clone(), + )?)) + } + _ => Ok(plan.clone()), + } + } +} + +struct MyExprRewriter {} + +impl ExprRewriter for MyExprRewriter { + fn mutate(&mut self, expr: Expr) -> Result { + match expr { + Expr::Between { + negated, + expr, + low, + high, + } => { + let expr: Expr = expr.as_ref().clone(); + let low: Expr = low.as_ref().clone(); + let high: Expr = high.as_ref().clone(); + if negated { + Ok(expr.clone().lt(low).or(expr.gt(high))) + } else { + Ok(expr.clone().gt_eq(low).and(expr.lt_eq(high))) + } + } + _ => Ok(expr.clone()), + } + } +} + +struct MyContextProvider {} + +impl ContextProvider for MyContextProvider { + fn get_table_provider(&self, name: TableReference) -> Result> { + if name.table() == "person" { + Ok(Arc::new(MyTableSource { + schema: Arc::new(Schema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::UInt8, false), + ])), + })) + } else { + Err(DataFusionError::Plan("table not found".to_string())) + } + } + + fn get_function_meta(&self, _name: &str) -> Option> { + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + None + } +} + +struct MyTableSource { + schema: SchemaRef, +} + +impl TableSource for MyTableSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} From 5b279cbb6cd65c9a0a832d4c72649ce5139d055d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Oct 2022 12:50:34 -0600 Subject: [PATCH 14/14] toml fmt --- datafusion-examples/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 0d42fe0fa8e88..2530f890f683e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,14 +34,14 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] +arrow = "24.0.0" arrow-flight = "24.0.0" async-trait = "0.1.41" -arrow = "24.0.0" datafusion = { path = "../datafusion/core" } -datafusion-sql = { path = "../datafusion/sql" } +datafusion-common = { path = "../datafusion/common" } datafusion-expr = { path = "../datafusion/expr" } datafusion-optimizer = { path = "../datafusion/optimizer" } -datafusion-common = { path = "../datafusion/common" } +datafusion-sql = { path = "../datafusion/sql" } futures = "0.3" num_cpus = "1.13.0" object_store = { version = "0.5.0", features = ["aws"] }