From e50b86cf8f2ca8045bbe56a1b440e992299e30ed Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Nov 2021 08:10:56 -0500 Subject: [PATCH 1/3] Consolidate `ConstantFolding` and `SimplifyExpression` --- datafusion/src/execution/context.rs | 2 - datafusion/src/optimizer/constant_folding.rs | 502 ----------------- datafusion/src/optimizer/mod.rs | 1 - .../src/optimizer/simplify_expressions.rs | 516 +++++++++++++++--- 4 files changed, 447 insertions(+), 574 deletions(-) delete mode 100644 datafusion/src/optimizer/constant_folding.rs diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 27116c0a4a952..e3293a0dac3b9 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -67,7 +67,6 @@ use crate::logical_plan::{ LogicalPlanBuilder, UNNAMED_TABLE, }; use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; -use crate::optimizer::constant_folding::ConstantFolding; use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::limit_push_down::LimitPushDown; use crate::optimizer::optimizer::OptimizerRule; @@ -896,7 +895,6 @@ impl Default for ExecutionConfig { target_partitions: num_cpus::get(), batch_size: 8192, optimizers: vec![ - Arc::new(ConstantFolding::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), Arc::new(ProjectionPushDown::new()), diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs deleted file mode 100644 index c2274f9034a58..0000000000000 --- a/datafusion/src/optimizer/constant_folding.rs +++ /dev/null @@ -1,502 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Constant folding and algebraic simplification - -use crate::error::Result; -use crate::execution::context::ExecutionProps; -use crate::logical_plan::LogicalPlan; -use crate::optimizer::optimizer::OptimizerRule; -use crate::optimizer::utils; - -/// Simplifies plans by rewriting [`Expr`]`s evaluating constants -/// and applying algebraic simplifications -/// -/// For example -/// `true && col` --> `col` where `col` is a boolean types -pub struct ConstantFolding {} - -impl ConstantFolding { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl OptimizerRule for ConstantFolding { - fn optimize( - &self, - plan: &LogicalPlan, - execution_props: &ExecutionProps, - ) -> Result { - // We need to pass down the all schemas within the plan tree to `optimize_expr` in order to - // to evaluate expression types. For example, a projection plan's schema will only include - // projected columns. With just the projected schema, it's not possible to infer types for - // expressions that references non-projected columns within the same project plan or its - // children plans. - let mut simplifier = - super::simplify_expressions::Simplifier::new(plan.all_schemas()); - - let mut const_evaluator = - super::simplify_expressions::ConstEvaluator::new(execution_props); - - match plan { - // Recurse into plan, apply optimization where possible - LogicalPlan::Filter { .. } - | LogicalPlan::Projection { .. } - | LogicalPlan::Window { .. } - | LogicalPlan::Aggregate { .. } - | LogicalPlan::Repartition(_) - | LogicalPlan::CreateExternalTable(_) - | LogicalPlan::CreateMemoryTable(_) - | LogicalPlan::DropTable(_) - | LogicalPlan::Values(_) - | LogicalPlan::Extension { .. } - | LogicalPlan::Sort { .. } - | LogicalPlan::Explain { .. } - | LogicalPlan::Analyze { .. } - | LogicalPlan::Limit(_) - | LogicalPlan::Union(_) - | LogicalPlan::Join { .. } - | LogicalPlan::CrossJoin(_) => { - // apply the optimization to all inputs of the plan - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| self.optimize(plan, execution_props)) - .collect::>>()?; - - let expr = plan - .expressions() - .into_iter() - .map(|e| { - // We need to keep original expression name, if any. - // Constant folding should not change expression name. - let name = &e.name(plan.schema()); - - // TODO iterate until no changes are made - // during rewrite (evaluating constants can - // enable new simplifications and - // simplifications can enable new constant - // evaluation) - let new_e = e - // fold constants and then simplify - .rewrite(&mut const_evaluator)? - .rewrite(&mut simplifier)?; - - let new_name = &new_e.name(plan.schema()); - - if let (Ok(expr_name), Ok(new_expr_name)) = (name, new_name) { - if expr_name != new_expr_name { - Ok(new_e.alias(expr_name)) - } else { - Ok(new_e) - } - } else { - Ok(new_e) - } - }) - .collect::>>()?; - - utils::from_plan(plan, &expr, &new_inputs) - } - LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) => { - Ok(plan.clone()) - } - } - } - - fn name(&self) -> &str { - "constant_folding" - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - assert_contains, - logical_plan::{col, lit, max, min, Expr, LogicalPlanBuilder, Operator}, - physical_plan::functions::BuiltinScalarFunction, - }; - - use arrow::datatypes::*; - use chrono::{DateTime, TimeZone, Utc}; - - fn test_table_scan() -> Result { - let schema = Schema::new(vec![ - Field::new("a", DataType::Boolean, false), - Field::new("b", DataType::Boolean, false), - Field::new("c", DataType::Boolean, false), - Field::new("d", DataType::UInt32, false), - ]); - LogicalPlanBuilder::scan_empty(Some("test"), &schema, None)?.build() - } - - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = ConstantFolding::new(); - let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) - .expect("failed to optimize plan"); - let formatted_plan = format!("{:?}", optimized_plan); - assert_eq!(formatted_plan, expected); - } - - #[test] - fn optimize_plan_eq_expr() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(col("b").eq(lit(true)))? - .filter(col("c").eq(lit(false)))? - .project(vec![col("a")])? - .build()?; - - let expected = "\ - Projection: #test.a\ - \n Filter: NOT #test.c AS test.c = Boolean(false)\ - \n Filter: #test.b AS test.b = Boolean(true)\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_not_eq_expr() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(col("b").not_eq(lit(true)))? - .filter(col("c").not_eq(lit(false)))? - .limit(1)? - .project(vec![col("a")])? - .build()?; - - let expected = "\ - Projection: #test.a\ - \n Limit: 1\ - \n Filter: #test.c AS test.c != Boolean(false)\ - \n Filter: NOT #test.b AS test.b != Boolean(true)\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_and_expr() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(col("b").not_eq(lit(true)).and(col("c").eq(lit(true))))? - .project(vec![col("a")])? - .build()?; - - let expected = "\ - Projection: #test.a\ - \n Filter: NOT #test.b AND #test.c AS test.b != Boolean(true) AND test.c = Boolean(true)\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_or_expr() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(col("b").not_eq(lit(true)).or(col("c").eq(lit(false))))? - .project(vec![col("a")])? - .build()?; - - let expected = "\ - Projection: #test.a\ - \n Filter: NOT #test.b OR NOT #test.c AS test.b != Boolean(true) OR test.c = Boolean(false)\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_not_expr() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .filter(col("b").eq(lit(false)).not())? - .project(vec![col("a")])? - .build()?; - - let expected = "\ - Projection: #test.a\ - \n Filter: #test.b AS NOT test.b = Boolean(false)\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_support_projection() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![col("a"), col("d"), col("b").eq(lit(false))])? - .build()?; - - let expected = "\ - Projection: #test.a, #test.d, NOT #test.b AS test.b = Boolean(false)\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_support_aggregate() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![col("a"), col("c"), col("b")])? - .aggregate( - vec![col("a"), col("c")], - vec![max(col("b").eq(lit(true))), min(col("b"))], - )? - .build()?; - - let expected = "\ - Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b) AS MAX(test.b = Boolean(true)), MIN(#test.b)]]\ - \n Projection: #test.a, #test.c, #test.b\ - \n TableScan: test projection=None"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - #[test] - fn optimize_plan_support_values() -> Result<()> { - let expr1 = Expr::BinaryExpr { - left: Box::new(lit(1)), - op: Operator::Plus, - right: Box::new(lit(2)), - }; - let expr2 = Expr::BinaryExpr { - left: Box::new(lit(2)), - op: Operator::Minus, - right: Box::new(lit(1)), - }; - let values = vec![vec![expr1, expr2]]; - let plan = LogicalPlanBuilder::values(values)?.build()?; - - let expected = "\ - Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"; - - assert_optimized_plan_eq(&plan, expected); - Ok(()) - } - - // expect optimizing will result in an error, returning the error string - fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime) -> String { - let rule = ConstantFolding::new(); - let execution_props = ExecutionProps { - query_execution_start_time: *date_time, - }; - - let err = rule - .optimize(plan, &execution_props) - .expect_err("expected optimization to fail"); - - err.to_string() - } - - fn get_optimized_plan_formatted( - plan: &LogicalPlan, - date_time: &DateTime, - ) -> String { - let rule = ConstantFolding::new(); - let execution_props = ExecutionProps { - query_execution_start_time: *date_time, - }; - - let optimized_plan = rule - .optimize(plan, &execution_props) - .expect("failed to optimize plan"); - return format!("{:?}", optimized_plan); - } - - /// Create a to_timestamp expr - fn to_timestamp_expr(arg: impl Into) -> Expr { - Expr::ScalarFunction { - args: vec![lit(arg.into())], - fun: BuiltinScalarFunction::ToTimestamp, - } - } - - #[test] - fn to_timestamp_expr_folded() { - let table_scan = test_table_scan().unwrap(); - let proj = vec![to_timestamp_expr("2020-09-08T12:00:00+00:00")]; - - let plan = LogicalPlanBuilder::from(table_scan) - .project(proj) - .unwrap() - .build() - .unwrap(); - - let expected = "Projection: TimestampNanosecond(1599566400000000000) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\ - \n TableScan: test projection=None" - .to_string(); - let actual = get_optimized_plan_formatted(&plan, &Utc::now()); - assert_eq!(expected, actual); - } - - #[test] - fn to_timestamp_expr_wrong_arg() { - let table_scan = test_table_scan().unwrap(); - let proj = vec![to_timestamp_expr("I'M NOT A TIMESTAMP")]; - let plan = LogicalPlanBuilder::from(table_scan) - .project(proj) - .unwrap() - .build() - .unwrap(); - - let expected = "Error parsing 'I'M NOT A TIMESTAMP' as timestamp"; - let actual = get_optimized_plan_err(&plan, &Utc::now()); - assert_contains!(actual, expected); - } - - #[test] - fn cast_expr() { - let table_scan = test_table_scan().unwrap(); - let proj = vec![Expr::Cast { - expr: Box::new(lit("0")), - data_type: DataType::Int32, - }]; - let plan = LogicalPlanBuilder::from(table_scan) - .project(proj) - .unwrap() - .build() - .unwrap(); - - let expected = "Projection: Int32(0) AS CAST(Utf8(\"0\") AS Int32)\ - \n TableScan: test projection=None"; - let actual = get_optimized_plan_formatted(&plan, &Utc::now()); - assert_eq!(expected, actual); - } - - #[test] - fn cast_expr_wrong_arg() { - let table_scan = test_table_scan().unwrap(); - let proj = vec![Expr::Cast { - expr: Box::new(lit("")), - data_type: DataType::Int32, - }]; - let plan = LogicalPlanBuilder::from(table_scan) - .project(proj) - .unwrap() - .build() - .unwrap(); - - let expected = - "Cannot cast string '' to value of arrow::datatypes::types::Int32Type type"; - let actual = get_optimized_plan_err(&plan, &Utc::now()); - assert_contains!(actual, expected); - } - - fn now_expr() -> Expr { - Expr::ScalarFunction { - args: vec![], - fun: BuiltinScalarFunction::Now, - } - } - - #[test] - fn multiple_now_expr() { - let table_scan = test_table_scan().unwrap(); - let time = Utc::now(); - let proj = vec![ - now_expr(), - Expr::Alias(Box::new(now_expr()), "t2".to_string()), - ]; - let plan = LogicalPlanBuilder::from(table_scan) - .project(proj) - .unwrap() - .build() - .unwrap(); - - // expect the same timestamp appears in both exprs - let actual = get_optimized_plan_formatted(&plan, &time); - let expected = format!( - "Projection: TimestampNanosecond({}) AS now(), TimestampNanosecond({}) AS t2\ - \n TableScan: test projection=None", - time.timestamp_nanos(), - time.timestamp_nanos() - ); - - assert_eq!(actual, expected); - } - - #[test] - fn simplify_and_eval() { - // demonstrate a case where the evaluation needs to run prior - // to the simplifier for it to work - let table_scan = test_table_scan().unwrap(); - let time = Utc::now(); - // (true or false) != col --> !col - let proj = vec![lit(true).or(lit(false)).not_eq(col("a"))]; - let plan = LogicalPlanBuilder::from(table_scan) - .project(proj) - .unwrap() - .build() - .unwrap(); - - let actual = get_optimized_plan_formatted(&plan, &time); - let expected = - "Projection: NOT #test.a AS Boolean(true) OR Boolean(false) != test.a\ - \n TableScan: test projection=None"; - - assert_eq!(actual, expected); - } - - fn cast_to_int64_expr(expr: Expr) -> Expr { - Expr::Cast { - expr: expr.into(), - data_type: DataType::Int64, - } - } - - #[test] - fn now_less_than_timestamp() { - let table_scan = test_table_scan().unwrap(); - - let ts_string = "2020-09-08T12:05:00+00:00"; - let time = chrono::Utc.timestamp_nanos(1599566400000000000i64); - - // now() < cast(to_timestamp(...) as int) + 5000000000 - let plan = LogicalPlanBuilder::from(table_scan) - .filter( - cast_to_int64_expr(now_expr()) - .lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000)), - ) - .unwrap() - .build() - .unwrap(); - - // Note that constant folder runs and folds the entire - // expression down to a single constant (true) - let expected = "Filter: Boolean(true) AS CAST(now() AS Int64) < CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Int64) + Int32(50000)\ - \n TableScan: test projection=None"; - let actual = get_optimized_plan_formatted(&plan, &time); - - assert_eq!(expected, actual); - } -} diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs index 419d6bcda2424..c5cab97926dfc 100644 --- a/datafusion/src/optimizer/mod.rs +++ b/datafusion/src/optimizer/mod.rs @@ -19,7 +19,6 @@ //! some simple rules to a logical plan, such as "Projection Push Down" and "Type Coercion". pub mod common_subexpr_eliminate; -pub mod constant_folding; pub mod eliminate_limit; pub mod filter_push_down; pub mod limit_push_down; diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index f8567abb3d2b0..35fb38a22c24f 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -32,13 +32,17 @@ use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::scalar::ScalarValue; use crate::{error::Result, logical_plan::Operator}; -/// Simplify expressions optimizer. +/// Simplifies plans by rewriting [`Expr`]`s evaluating constants +/// and applying algebraic simplifications +/// /// # Introduction /// It uses boolean algebra laws to simplify or reduce the number of terms in expressions. /// -/// Filter: b > 2 AND b > 2 +/// # Example: +/// `Filter: b > 2 AND b > 2` /// is optimized to -/// Filter: b > 2 +/// `Filter: b > 2` +/// pub struct SimplifyExpressions {} fn expr_contains(expr: &Expr, needle: &Expr) -> bool { @@ -262,20 +266,6 @@ fn simplify(expr: &Expr) -> Expr { } } -fn optimize(plan: &LogicalPlan) -> Result { - let new_inputs = plan - .inputs() - .iter() - .map(|input| optimize(input)) - .collect::>>()?; - let expr = plan - .expressions() - .into_iter() - .map(|x| simplify(&x)) - .collect::>(); - utils::from_plan(plan, &expr, &new_inputs) -} - impl OptimizerRule for SimplifyExpressions { fn name(&self) -> &str { "simplify_expressions" @@ -284,9 +274,62 @@ impl OptimizerRule for SimplifyExpressions { fn optimize( &self, plan: &LogicalPlan, - _execution_props: &ExecutionProps, + execution_props: &ExecutionProps, ) -> Result { - optimize(plan) + // We need to pass down the all schemas within the plan tree to `optimize_expr` in order to + // to evaluate expression types. For example, a projection plan's schema will only include + // projected columns. With just the projected schema, it's not possible to infer types for + // expressions that references non-projected columns within the same project plan or its + // children plans. + let mut simplifier = + super::simplify_expressions::Simplifier::new(plan.all_schemas()); + + let mut const_evaluator = + super::simplify_expressions::ConstEvaluator::new(execution_props); + + let new_inputs = plan + .inputs() + .iter() + .map(|input| self.optimize(input, execution_props)) + .collect::>>()?; + + let expr = plan + .expressions() + .into_iter() + .map(|e| { + // We need to keep original expression name, if any. + // Constant folding should not change expression name. + let name = &e.name(plan.schema()); + + // TODO combine simplify into Simplifier + let e = simplify(&e); + + // TODO iterate until no changes are made + // during rewrite (evaluating constants can + // enable new simplifications and + // simplifications can enable new constant + // evaluation) + let new_e = e + // fold constants and then simplify + .rewrite(&mut const_evaluator)? + .rewrite(&mut simplifier)?; + + let new_name = &new_e.name(plan.schema()); + + // TODO simplify this logic + if let (Ok(expr_name), Ok(new_expr_name)) = (name, new_name) { + if expr_name != new_expr_name { + Ok(new_e.alias(expr_name)) + } else { + Ok(new_e) + } + } else { + Ok(new_e) + } + }) + .collect::>>()?; + + utils::from_plan(plan, &expr, &new_inputs) } } @@ -733,22 +776,13 @@ mod tests { use chrono::{DateTime, TimeZone, Utc}; use super::*; + use crate::assert_contains; use crate::logical_plan::{ and, binary_expr, col, create_udf, lit, lit_timestamp_nano, DFField, Expr, LogicalPlanBuilder, }; use crate::physical_plan::functions::{make_scalar_function, BuiltinScalarFunction}; use crate::physical_plan::udf::ScalarUDF; - use crate::test::*; - - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = SimplifyExpressions::new(); - let optimized_plan = rule - .optimize(plan, &ExecutionProps::new()) - .expect("failed to optimize plan"); - let formatted_plan = format!("{:?}", optimized_plan); - assert_eq!(formatted_plan, expected); - } #[test] fn test_simplify_or_true() -> Result<()> { @@ -925,46 +959,6 @@ mod tests { Ok(()) } - #[test] - fn test_simplify_optimized_plan() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![col("a")])? - .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))? - .build()?; - - assert_optimized_plan_eq( - &plan, - "\ - Filter: #test.b > Int32(1)\ - \n Projection: #test.a\ - \n TableScan: test projection=None", - ); - Ok(()) - } - - // ((c > 5) AND (d < 6)) AND (c > 5) --> (c > 5) AND (d < 6) - #[test] - fn test_simplify_optimized_plan_with_composed_and() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![col("a")])? - .filter(and( - and(col("a").gt(lit(5)), col("b").lt(lit(6))), - col("a").gt(lit(5)), - ))? - .build()?; - - assert_optimized_plan_eq( - &plan, - "\ - Filter: #test.a > Int32(5) AND #test.b < Int32(6)\ - \n Projection: #test.a\ - \n TableScan: test projection=None", - ); - Ok(()) - } - #[test] fn test_const_evaluator() { // true --> true @@ -1562,4 +1556,388 @@ mod tests { .unwrap(), ) } + + fn test_table_scan() -> Result { + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, false), + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Boolean, false), + Field::new("d", DataType::UInt32, false), + ]); + LogicalPlanBuilder::scan_empty(Some("test"), &schema, None)?.build() + } + + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { + let rule = SimplifyExpressions::new(); + let optimized_plan = rule + .optimize(plan, &ExecutionProps::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(formatted_plan, expected); + } + + #[test] + fn test_simplify_optimized_plan() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))? + .build()?; + + assert_optimized_plan_eq( + &plan, + "\ + Filter: #test.b > Int32(1)\ + \n Projection: #test.a\ + \n TableScan: test projection=None", + ); + Ok(()) + } + + // ((c > 5) AND (d < 6)) AND (c > 5) --> (c > 5) AND (d < 6) + #[test] + fn test_simplify_optimized_plan_with_composed_and() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .filter(and( + and(col("a").gt(lit(5)), col("b").lt(lit(6))), + col("a").gt(lit(5)), + ))? + .build()?; + + assert_optimized_plan_eq( + &plan, + "\ + Filter: #test.a > Int32(5) AND #test.b < Int32(6)\ + \n Projection: #test.a\ + \n TableScan: test projection=None", + ); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_eq_expr() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").eq(lit(true)))? + .filter(col("c").eq(lit(false)))? + .project(vec![col("a")])? + .build()?; + + let expected = "\ + Projection: #test.a\ + \n Filter: NOT #test.c AS test.c = Boolean(false)\ + \n Filter: #test.b AS test.b = Boolean(true)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_not_eq_expr() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").not_eq(lit(true)))? + .filter(col("c").not_eq(lit(false)))? + .limit(1)? + .project(vec![col("a")])? + .build()?; + + let expected = "\ + Projection: #test.a\ + \n Limit: 1\ + \n Filter: #test.c AS test.c != Boolean(false)\ + \n Filter: NOT #test.b AS test.b != Boolean(true)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_and_expr() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").not_eq(lit(true)).and(col("c").eq(lit(true))))? + .project(vec![col("a")])? + .build()?; + + let expected = "\ + Projection: #test.a\ + \n Filter: NOT #test.b AND #test.c AS test.b != Boolean(true) AND test.c = Boolean(true)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_or_expr() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").not_eq(lit(true)).or(col("c").eq(lit(false))))? + .project(vec![col("a")])? + .build()?; + + let expected = "\ + Projection: #test.a\ + \n Filter: NOT #test.b OR NOT #test.c AS test.b != Boolean(true) OR test.c = Boolean(false)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_not_expr() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").eq(lit(false)).not())? + .project(vec![col("a")])? + .build()?; + + let expected = "\ + Projection: #test.a\ + \n Filter: #test.b AS NOT test.b = Boolean(false)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_support_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("d"), col("b").eq(lit(false))])? + .build()?; + + let expected = "\ + Projection: #test.a, #test.d, NOT #test.b AS test.b = Boolean(false)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_support_aggregate() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("c"), col("b")])? + .aggregate( + vec![col("a"), col("c")], + vec![ + crate::logical_plan::max(col("b").eq(lit(true))), + crate::logical_plan::min(col("b")), + ], + )? + .build()?; + + let expected = "\ + Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b) AS MAX(test.b = Boolean(true)), MIN(#test.b)]]\ + \n Projection: #test.a, #test.c, #test.b\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn test_simplity_optimized_plan_support_values() -> Result<()> { + let expr1 = Expr::BinaryExpr { + left: Box::new(lit(1)), + op: Operator::Plus, + right: Box::new(lit(2)), + }; + let expr2 = Expr::BinaryExpr { + left: Box::new(lit(2)), + op: Operator::Minus, + right: Box::new(lit(1)), + }; + let values = vec![vec![expr1, expr2]]; + let plan = LogicalPlanBuilder::values(values)?.build()?; + + let expected = "\ + Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + // expect optimizing will result in an error, returning the error string + fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime) -> String { + let rule = SimplifyExpressions::new(); + let execution_props = ExecutionProps { + query_execution_start_time: *date_time, + }; + + let err = rule + .optimize(plan, &execution_props) + .expect_err("expected optimization to fail"); + + err.to_string() + } + + fn get_optimized_plan_formatted( + plan: &LogicalPlan, + date_time: &DateTime, + ) -> String { + let rule = SimplifyExpressions::new(); + let execution_props = ExecutionProps { + query_execution_start_time: *date_time, + }; + + let optimized_plan = rule + .optimize(plan, &execution_props) + .expect("failed to optimize plan"); + return format!("{:?}", optimized_plan); + } + + #[test] + fn to_timestamp_expr_folded() { + let table_scan = test_table_scan().unwrap(); + let proj = vec![to_timestamp_expr("2020-09-08T12:00:00+00:00")]; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(proj) + .unwrap() + .build() + .unwrap(); + + let expected = "Projection: TimestampNanosecond(1599566400000000000) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\ + \n TableScan: test projection=None" + .to_string(); + let actual = get_optimized_plan_formatted(&plan, &Utc::now()); + assert_eq!(expected, actual); + } + + #[test] + fn to_timestamp_expr_wrong_arg() { + let table_scan = test_table_scan().unwrap(); + let proj = vec![to_timestamp_expr("I'M NOT A TIMESTAMP")]; + let plan = LogicalPlanBuilder::from(table_scan) + .project(proj) + .unwrap() + .build() + .unwrap(); + + let expected = "Error parsing 'I'M NOT A TIMESTAMP' as timestamp"; + let actual = get_optimized_plan_err(&plan, &Utc::now()); + assert_contains!(actual, expected); + } + + #[test] + fn cast_expr() { + let table_scan = test_table_scan().unwrap(); + let proj = vec![Expr::Cast { + expr: Box::new(lit("0")), + data_type: DataType::Int32, + }]; + let plan = LogicalPlanBuilder::from(table_scan) + .project(proj) + .unwrap() + .build() + .unwrap(); + + let expected = "Projection: Int32(0) AS CAST(Utf8(\"0\") AS Int32)\ + \n TableScan: test projection=None"; + let actual = get_optimized_plan_formatted(&plan, &Utc::now()); + assert_eq!(expected, actual); + } + + #[test] + fn cast_expr_wrong_arg() { + let table_scan = test_table_scan().unwrap(); + let proj = vec![Expr::Cast { + expr: Box::new(lit("")), + data_type: DataType::Int32, + }]; + let plan = LogicalPlanBuilder::from(table_scan) + .project(proj) + .unwrap() + .build() + .unwrap(); + + let expected = + "Cannot cast string '' to value of arrow::datatypes::types::Int32Type type"; + let actual = get_optimized_plan_err(&plan, &Utc::now()); + assert_contains!(actual, expected); + } + + #[test] + fn multiple_now_expr() { + let table_scan = test_table_scan().unwrap(); + let time = Utc::now(); + let proj = vec![ + now_expr(), + Expr::Alias(Box::new(now_expr()), "t2".to_string()), + ]; + let plan = LogicalPlanBuilder::from(table_scan) + .project(proj) + .unwrap() + .build() + .unwrap(); + + // expect the same timestamp appears in both exprs + let actual = get_optimized_plan_formatted(&plan, &time); + let expected = format!( + "Projection: TimestampNanosecond({}) AS now(), TimestampNanosecond({}) AS t2\ + \n TableScan: test projection=None", + time.timestamp_nanos(), + time.timestamp_nanos() + ); + + assert_eq!(actual, expected); + } + + #[test] + fn simplify_and_eval() { + // demonstrate a case where the evaluation needs to run prior + // to the simplifier for it to work + let table_scan = test_table_scan().unwrap(); + let time = Utc::now(); + // (true or false) != col --> !col + let proj = vec![lit(true).or(lit(false)).not_eq(col("a"))]; + let plan = LogicalPlanBuilder::from(table_scan) + .project(proj) + .unwrap() + .build() + .unwrap(); + + let actual = get_optimized_plan_formatted(&plan, &time); + let expected = + "Projection: NOT #test.a AS Boolean(true) OR Boolean(false) != test.a\ + \n TableScan: test projection=None"; + + assert_eq!(actual, expected); + } + + #[test] + fn now_less_than_timestamp() { + let table_scan = test_table_scan().unwrap(); + + let ts_string = "2020-09-08T12:05:00+00:00"; + let time = chrono::Utc.timestamp_nanos(1599566400000000000i64); + + // now() < cast(to_timestamp(...) as int) + 5000000000 + let plan = LogicalPlanBuilder::from(table_scan) + .filter( + cast_to_int64_expr(now_expr()) + .lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000)), + ) + .unwrap() + .build() + .unwrap(); + + // Note that constant folder runs and folds the entire + // expression down to a single constant (true) + let expected = "Filter: Boolean(true) AS CAST(now() AS Int64) < CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Int64) + Int32(50000)\ + \n TableScan: test projection=None"; + let actual = get_optimized_plan_formatted(&plan, &time); + + assert_eq!(expected, actual); + } } From 882356ed7e5e43506b50a9acb2564bfcdf37e751 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Nov 2021 08:13:10 -0500 Subject: [PATCH 2/3] Update simplify tests --- datafusion/src/optimizer/simplify_expressions.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 35fb38a22c24f..30fdd8ca56752 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -1587,7 +1587,7 @@ mod tests { assert_optimized_plan_eq( &plan, "\ - Filter: #test.b > Int32(1)\ + Filter: #test.b > Int32(1) AS test.b > Int32(1) AND test.b > Int32(1)\ \n Projection: #test.a\ \n TableScan: test projection=None", ); @@ -1609,7 +1609,7 @@ mod tests { assert_optimized_plan_eq( &plan, "\ - Filter: #test.a > Int32(5) AND #test.b < Int32(6)\ + Filter: #test.a > Int32(5) AND #test.b < Int32(6) AS test.a > Int32(5) AND test.b < Int32(6) AND test.a > Int32(5)\ \n Projection: #test.a\ \n TableScan: test projection=None", ); From ba3368e210a33e7ab69bb4c95dd4b52ffe9c445e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 Nov 2021 07:11:06 -0500 Subject: [PATCH 3/3] simplify before other optimizations --- datafusion/src/execution/context.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index e3293a0dac3b9..17d23a86f85ca 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -895,11 +895,13 @@ impl Default for ExecutionConfig { target_partitions: num_cpus::get(), batch_size: 8192, optimizers: vec![ + // Simplify expressions first to maximize the chance + // of applying other optimizations + Arc::new(SimplifyExpressions::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), Arc::new(ProjectionPushDown::new()), Arc::new(FilterPushDown::new()), - Arc::new(SimplifyExpressions::new()), Arc::new(LimitPushDown::new()), Arc::new(SingleDistinctToGroupBy::new()), ],