From 2dcdc6710eb6e096608c3a1a6cd3b86193e036ed Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Mon, 2 May 2022 18:42:27 +0300 Subject: [PATCH 1/5] naive in subquery implementation --- datafusion/core/src/execution/context.rs | 2 + .../core/src/optimizer/filter_push_down.rs | 66 +--- datafusion/core/src/optimizer/mod.rs | 1 + .../src/optimizer/subquery_filter_to_join.rs | 312 ++++++++++++++++++ datafusion/core/src/optimizer/utils.rs | 37 ++- 5 files changed, 367 insertions(+), 51 deletions(-) create mode 100644 datafusion/core/src/optimizer/subquery_filter_to_join.rs diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 01a5eefa9dc01..895e5bc1e71dc 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -72,6 +72,7 @@ use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::simplify_expressions::SimplifyExpressions; use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; +use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; @@ -1199,6 +1200,7 @@ impl SessionState { // Simplify expressions first to maximize the chance // of applying other optimizations Arc::new(SimplifyExpressions::new()), + Arc::new(SubqueryFilterToJoin::new()), Arc::new(EliminateFilter::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 19535de86b9f5..0fd107b40dead 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -14,20 +14,17 @@ //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan +use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_expr::TableProviderFilterPushDown; use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union}; use crate::logical_plan::{ - and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, + col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, }; use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; -use crate::{error::Result, logical_plan::Operator}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet}; /// Filter Push Down optimizer rule pushes filter clauses down the plan /// # Introduction @@ -95,23 +92,6 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result { utils::from_plan(plan, &expr, &new_inputs) } -/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with -/// its predicate be all `predicates` ANDed. -fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { - // reduce filters to a single filter with an AND - let predicate = predicates - .iter() - .skip(1) - .fold(predicates[0].clone(), |acc, predicate| { - and(acc, (*predicate).to_owned()) - }); - - LogicalPlan::Filter(Filter { - predicate, - input: Arc::new(plan), - }) -} - // remove all filters from `filters` that are in `predicate_columns` fn remove_filters( filters: &[(Expr, HashSet)], @@ -150,7 +130,7 @@ fn issue_filters( return push_down(&state, plan); } - let plan = add_filter(plan.clone(), &predicates); + let plan = utils::add_filter(plan.clone(), &predicates); state.filters = remove_filters(&state.filters, &predicate_columns); @@ -158,24 +138,6 @@ fn issue_filters( push_down(&state, &plan) } -/// converts "A AND B AND C" => [A, B, C] -fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { - match predicate { - Expr::BinaryExpr { - right, - op: Operator::And, - left, - } => { - split_members(left, predicates); - split_members(right, predicates); - } - Expr::Alias(expr, _) => { - split_members(expr, predicates); - } - other => predicates.push(other), - } -} - // For a given JOIN logical plan, determine whether each side of the join is preserved. // We say a join side is preserved if the join returns all or a subset of the rows from // the relevant side, such that each row of the output table directly maps to a row of @@ -289,7 +251,7 @@ fn optimize_join( Ok(plan) } else { // wrap the join on the filter whose predicates must be kept - let plan = add_filter(plan, &to_keep.0); + let plan = utils::add_filter(plan, &to_keep.0); state.filters = remove_filters(&state.filters, &to_keep.1); Ok(plan) @@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { LogicalPlan::Analyze { .. } => push_down(&state, plan), LogicalPlan::Filter(Filter { input, predicate }) => { let mut predicates = vec![]; - split_members(predicate, &mut predicates); + utils::split_conjunction(predicate, &mut predicates); // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) let mut no_col_predicates = vec![]; @@ -328,7 +290,10 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // As those contain only literals, they could be optimized using constant folding // and removal of WHERE TRUE / WHERE FALSE if !no_col_predicates.is_empty() { - Ok(add_filter(optimize(input, state)?, &no_col_predicates)) + Ok(utils::add_filter( + optimize(input, state)?, + &no_col_predicates, + )) } else { optimize(input, state) } @@ -592,17 +557,18 @@ fn rewrite(expr: &Expr, projection: &HashMap) -> Result { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::datasource::TableProvider; + use crate::logical_plan::plan::provider_as_source; use crate::logical_plan::{ - lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator, + and, col, lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, + Operator, }; use crate::physical_plan::ExecutionPlan; + use crate::prelude::JoinType; use crate::test::*; - use crate::{ - logical_plan::{col, plan::provider_as_source}, - prelude::JoinType, - }; use arrow::datatypes::SchemaRef; use async_trait::async_trait; diff --git a/datafusion/core/src/optimizer/mod.rs b/datafusion/core/src/optimizer/mod.rs index 9f12ecea81df0..b274ab645f54c 100644 --- a/datafusion/core/src/optimizer/mod.rs +++ b/datafusion/core/src/optimizer/mod.rs @@ -28,4 +28,5 @@ pub mod optimizer; pub mod projection_push_down; pub mod simplify_expressions; pub mod single_distinct_to_groupby; +pub mod subquery_filter_to_join; pub mod utils; diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs new file mode 100644 index 0000000000000..1da5a94c94cc2 --- /dev/null +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule for rewriting subquery filters to joins +//! +//! It handles standalone parts of logical conjunction expressions, i.e. +//! ```text +//! WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x' +//! ``` +//! will be rewritten, but +//! ```text +//! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x' +//! ``` +//! won't +use std::sync::Arc; + +use crate::error::{DataFusionError, Result}; +use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{Filter, Join}; +use crate::logical_plan::{ + build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan, +}; +use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::utils; + +/// Optimizer rule for rewriting subquery filters to joins +#[derive(Default)] +pub struct SubqueryFilterToJoin {} + +impl SubqueryFilterToJoin { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for SubqueryFilterToJoin { + fn optimize( + &self, + plan: &LogicalPlan, + execution_props: &ExecutionProps, + ) -> Result { + match plan { + LogicalPlan::Filter(Filter { predicate, input }) => { + // Splitting filter expression into components by AND + let mut filters = vec![]; + utils::split_conjunction(predicate, &mut filters); + + // Searching for subquery-based filters + let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) = + filters + .into_iter() + .partition(|&e| matches!(e, Expr::InSubquery { .. })); + + // Check all subquery filters could be rewritten + let mut subqueries_in_regular = vec![]; + regular_filters.iter().try_for_each(|&e| { + extract_subquery_filters(e, &mut subqueries_in_regular) + })?; + + if !subqueries_in_regular.is_empty() { + return Err(DataFusionError::NotImplemented( + "InSubquery allowed only as part of AND conjunction".to_string(), + )); + }; + + // Apply optimizer rule to current input + let mut new_input = self.optimize(input, execution_props)?; + + // Add subquery joins to new_input + subquery_filters.iter().try_for_each(|&e| match e { + Expr::InSubquery { + expr, + subquery, + negated, + } => { + let right_input = + self.optimize(&*subquery.subquery, execution_props)?; + let right_schema = right_input.schema(); + if right_schema.fields().len() != 1 { + return Err(DataFusionError::Plan( + "Only single column allowed in InSubquery".to_string(), + )); + }; + + let right_key = right_schema.field(0).qualified_column(); + let left_key = match *expr.clone() { + Expr::Column(col) => col, + _ => return Err(DataFusionError::NotImplemented( + "Filtering by expression not implemented for InSubquery" + .to_string(), + )), + }; + + let join_type = match negated { + true => JoinType::Anti, + false => JoinType::Semi, + }; + + let schema = build_join_schema( + new_input.schema(), + right_schema, + &join_type, + )?; + + new_input = LogicalPlan::Join(Join { + left: Arc::new(new_input.clone()), + right: Arc::new(right_input), + on: vec![(left_key, right_key)], + join_type, + join_constraint: JoinConstraint::On, + schema: Arc::new(schema), + null_equals_null: false, + }); + + Ok(()) + } + _ => Err(DataFusionError::Plan( + "Unknown expression while rewriting subquery to joins" + .to_string(), + )), + })?; + + // Apply regular filters to join output if some or just return join + if regular_filters.is_empty() { + Ok(new_input) + } else { + Ok(utils::add_filter(new_input, ®ular_filters)) + } + } + _ => { + // Apply the optimization to all inputs of the plan + utils::optimize_children(self, plan, execution_props) + } + } + } + + fn name(&self) -> &str { + "subquery_filter_to_join" + } +} + +fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Result<()> { + utils::expr_sub_expressions(expression)? + .into_iter() + .try_for_each(|se| match se { + Expr::InSubquery { .. } => { + extracted.push(se); + Ok(()) + } + _ => extract_subquery_filters(&se, extracted), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::logical_plan::{ + and, binary_expr, col, in_subquery, lit, not_in_subquery, LogicalPlanBuilder, + Operator, + }; + use crate::test::*; + + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { + let rule = SubqueryFilterToJoin::new(); + let optimized_plan = rule + .optimize(plan, &ExecutionProps::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); + assert_eq!(formatted_plan, expected); + } + + fn test_subquery() -> Result> { + let table_scan = test_table_scan()?; + Ok(Arc::new( + LogicalPlanBuilder::from(table_scan) + .project(vec![col("c")])? + .build()?, + )) + } + + /// Test for single IN subquery filter + #[test] + fn in_subquery_simple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("c"), test_subquery()?))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.c [c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for single NOT IN subquery filter + #[test] + fn not_in_subquery_simple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(not_in_subquery(col("c"), test_subquery()?))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Anti Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.c [c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for several IN subquery expressions + #[test] + fn in_subquery_multiple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + in_subquery(col("c"), test_subquery()?), + in_subquery(col("b"), test_subquery()?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Semi Join: #test.b = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.c [c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.c [c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for IN subquery with additional filters + #[test] + fn in_subquery_with_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + in_subquery(col("c"), test_subquery()?), + and( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + ), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.c [c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for nested IN subqueries + #[test] + fn in_subquery_nested() -> Result<()> { + let table_scan = test_table_scan()?; + + let subquery = LogicalPlanBuilder::from(table_scan.clone()) + .filter(in_subquery(col("a"), test_subquery()?))? + .project(vec![col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("b"), Arc::new(subquery)))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Semi Join: #test.b = #test.a [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.a [a:UInt32]\ + \n Semi Join: #test.a = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #test.c [c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } +} diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index df36761fec40b..48855df9f8e8a 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -25,7 +25,7 @@ use datafusion_expr::logical_plan::{ }; use crate::logical_plan::{ - build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, ExprVisitable, + and, build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, ExprVisitable, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion, Repartition, Union, Values, }; @@ -556,6 +556,41 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { } } +/// converts "A AND B AND C" => [A, B, C] +pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { + match predicate { + Expr::BinaryExpr { + right, + op: Operator::And, + left, + } => { + split_conjunction(left, predicates); + split_conjunction(right, predicates); + } + Expr::Alias(expr, _) => { + split_conjunction(expr, predicates); + } + other => predicates.push(other), + } +} + +/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with +/// its predicate be all `predicates` ANDed. +pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { + // reduce filters to a single filter with an AND + let predicate = predicates + .iter() + .skip(1) + .fold(predicates[0].clone(), |acc, predicate| { + and(acc, (*predicate).to_owned()) + }); + + LogicalPlan::Filter(Filter { + predicate, + input: Arc::new(plan), + }) +} + #[cfg(test)] mod tests { use super::*; From 4a2fd2c4d2a380ff1e072962610228a7f40fd04a Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 3 May 2022 07:55:19 +0300 Subject: [PATCH 2/5] 16 and 18 tpch queries enabled in benchmark --- benchmarks/src/bin/tpch.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index ef8abb01c2d2c..15b7f987a20ec 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1074,6 +1074,16 @@ mod tests { run_query(14).await } + #[tokio::test] + async fn run_q16() -> Result<()> { + run_query(16).await + } + + #[tokio::test] + async fn run_q18() -> Result<()> { + run_query(18).await + } + #[tokio::test] async fn run_q19() -> Result<()> { run_query(19).await From 4f15cd12eddcb235cb781731081094dec9d6b712 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 3 May 2022 08:51:45 +0300 Subject: [PATCH 3/5] rollback rewriting instead of fail --- .../src/optimizer/subquery_filter_to_join.rs | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs index 1da5a94c94cc2..3e89328746daf 100644 --- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -56,6 +56,9 @@ impl OptimizerRule for SubqueryFilterToJoin { ) -> Result { match plan { LogicalPlan::Filter(Filter { predicate, input }) => { + // Apply optimizer rule to current input + let optimized_input = self.optimize(input, execution_props)?; + // Splitting filter expression into components by AND let mut filters = vec![]; utils::split_conjunction(predicate, &mut filters); @@ -67,22 +70,25 @@ impl OptimizerRule for SubqueryFilterToJoin { .partition(|&e| matches!(e, Expr::InSubquery { .. })); // Check all subquery filters could be rewritten + // + // In case of expressions which could not be rewritten + // return original filter with optimized input let mut subqueries_in_regular = vec![]; regular_filters.iter().try_for_each(|&e| { extract_subquery_filters(e, &mut subqueries_in_regular) })?; if !subqueries_in_regular.is_empty() { - return Err(DataFusionError::NotImplemented( - "InSubquery allowed only as part of AND conjunction".to_string(), - )); + return Ok(LogicalPlan::Filter(Filter { + predicate: predicate.clone(), + input: Arc::new(optimized_input), + })); }; - // Apply optimizer rule to current input - let mut new_input = self.optimize(input, execution_props)?; - // Add subquery joins to new_input - subquery_filters.iter().try_for_each(|&e| match e { + // optimized_input value should retain for possible optimization rollback + let mut new_input = optimized_input.clone(); + let opt_result = subquery_filters.iter().try_for_each(|&e| match e { Expr::InSubquery { expr, subquery, @@ -133,7 +139,16 @@ impl OptimizerRule for SubqueryFilterToJoin { "Unknown expression while rewriting subquery to joins" .to_string(), )), - })?; + }); + + // In case of expressions which could not be rewritten + // return original filter with optimized input + if opt_result.is_err() { + return Ok(LogicalPlan::Filter(Filter { + predicate: predicate.clone(), + input: Arc::new(optimized_input), + })); + } // Apply regular filters to join output if some or just return join if regular_filters.is_empty() { From 4b13c2fc7394f6a60ff1e089c29ac247a91788af Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 3 May 2022 17:06:30 +0300 Subject: [PATCH 4/5] try_fold used for input plan rewriting --- .../src/optimizer/subquery_filter_to_join.rs | 123 +++++++++--------- 1 file changed, 65 insertions(+), 58 deletions(-) diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs index 3e89328746daf..fcbbc5f392d59 100644 --- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -87,68 +87,75 @@ impl OptimizerRule for SubqueryFilterToJoin { // Add subquery joins to new_input // optimized_input value should retain for possible optimization rollback - let mut new_input = optimized_input.clone(); - let opt_result = subquery_filters.iter().try_for_each(|&e| match e { - Expr::InSubquery { - expr, - subquery, - negated, - } => { - let right_input = - self.optimize(&*subquery.subquery, execution_props)?; - let right_schema = right_input.schema(); - if right_schema.fields().len() != 1 { - return Err(DataFusionError::Plan( - "Only single column allowed in InSubquery".to_string(), - )); - }; - - let right_key = right_schema.field(0).qualified_column(); - let left_key = match *expr.clone() { - Expr::Column(col) => col, - _ => return Err(DataFusionError::NotImplemented( - "Filtering by expression not implemented for InSubquery" - .to_string(), - )), - }; - - let join_type = match negated { - true => JoinType::Anti, - false => JoinType::Semi, - }; - - let schema = build_join_schema( - new_input.schema(), - right_schema, - &join_type, - )?; - - new_input = LogicalPlan::Join(Join { - left: Arc::new(new_input.clone()), - right: Arc::new(right_input), - on: vec![(left_key, right_key)], - join_type, - join_constraint: JoinConstraint::On, - schema: Arc::new(schema), - null_equals_null: false, - }); - - Ok(()) + let opt_result = subquery_filters.iter().try_fold( + optimized_input.clone(), + |input, &e| match e { + Expr::InSubquery { + expr, + subquery, + negated, + } => { + let right_input = self.optimize( + &*subquery.subquery, + execution_props + )?; + let right_schema = right_input.schema(); + if right_schema.fields().len() != 1 { + return Err(DataFusionError::Plan( + "Only single column allowed in InSubquery" + .to_string(), + )); + }; + + let right_key = right_schema.field(0).qualified_column(); + let left_key = match *expr.clone() { + Expr::Column(col) => col, + _ => return Err(DataFusionError::NotImplemented( + "Filtering by expression not implemented for InSubquery" + .to_string(), + )), + }; + + let join_type = if *negated { + JoinType::Anti + } else { + JoinType::Semi + }; + + let schema = build_join_schema( + optimized_input.schema(), + right_schema, + &join_type, + )?; + + Ok(LogicalPlan::Join(Join { + left: Arc::new(input), + right: Arc::new(right_input), + on: vec![(left_key, right_key)], + join_type, + join_constraint: JoinConstraint::On, + schema: Arc::new(schema), + null_equals_null: false, + })) + } + _ => Err(DataFusionError::Plan( + "Unknown expression while rewriting subquery to joins" + .to_string(), + )), } - _ => Err(DataFusionError::Plan( - "Unknown expression while rewriting subquery to joins" - .to_string(), - )), - }); + ); // In case of expressions which could not be rewritten // return original filter with optimized input - if opt_result.is_err() { - return Ok(LogicalPlan::Filter(Filter { - predicate: predicate.clone(), - input: Arc::new(optimized_input), - })); - } + let new_input = match opt_result { + Ok(plan) => plan, + Err(_) => { + return Ok(LogicalPlan::Filter(Filter { + predicate: predicate.clone(), + input: Arc::new(optimized_input), + })) + } + }; // Apply regular filters to join output if some or just return join if regular_filters.is_empty() { From 271695a930859fe93c7efe6fbda9d830808ef504 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Wed, 4 May 2022 09:05:12 +0300 Subject: [PATCH 5/5] test readability & negative test cases --- .../src/optimizer/subquery_filter_to_join.rs | 119 +++++++++++++----- 1 file changed, 87 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs index fcbbc5f392d59..5f4583c28f75d 100644 --- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -192,7 +192,7 @@ fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Res mod tests { use super::*; use crate::logical_plan::{ - and, binary_expr, col, in_subquery, lit, not_in_subquery, LogicalPlanBuilder, + and, binary_expr, col, in_subquery, lit, not_in_subquery, or, LogicalPlanBuilder, Operator, }; use crate::test::*; @@ -206,8 +206,8 @@ mod tests { assert_eq!(formatted_plan, expected); } - fn test_subquery() -> Result> { - let table_scan = test_table_scan()?; + fn test_subquery_with_name(name: &str) -> Result> { + let table_scan = test_table_scan_with_name(name)?; Ok(Arc::new( LogicalPlanBuilder::from(table_scan) .project(vec![col("c")])? @@ -220,15 +220,15 @@ mod tests { fn in_subquery_simple() -> Result<()> { let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .filter(in_subquery(col("c"), test_subquery()?))? + .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))? .project(vec![col("test.b")])? .build()?; let expected = "Projection: #test.b [b:UInt32]\ - \n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.c [c:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + \n Projection: #sq.c [c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -239,15 +239,15 @@ mod tests { fn not_in_subquery_simple() -> Result<()> { let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .filter(not_in_subquery(col("c"), test_subquery()?))? + .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))? .project(vec![col("test.b")])? .build()?; let expected = "Projection: #test.b [b:UInt32]\ - \n Anti Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Anti Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.c [c:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + \n Projection: #sq.c [c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -259,32 +259,32 @@ mod tests { let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) .filter(and( - in_subquery(col("c"), test_subquery()?), - in_subquery(col("b"), test_subquery()?), + in_subquery(col("c"), test_subquery_with_name("sq_1")?), + in_subquery(col("b"), test_subquery_with_name("sq_2")?), ))? .project(vec![col("test.b")])? .build()?; let expected = "Projection: #test.b [b:UInt32]\ - \n Semi Join: #test.b = #test.c [a:UInt32, b:UInt32, c:UInt32]\ - \n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.b = #sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.c [c:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.c [c:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + \n Projection: #sq_1.c [c:UInt32]\ + \n TableScan: sq_1 projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq_2.c [c:UInt32]\ + \n TableScan: sq_2 projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) } - /// Test for IN subquery with additional filters + /// Test for IN subquery with additional AND filter #[test] - fn in_subquery_with_filters() -> Result<()> { + fn in_subquery_with_and_filters() -> Result<()> { let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) .filter(and( - in_subquery(col("c"), test_subquery()?), + in_subquery(col("c"), test_subquery_with_name("sq")?), and( binary_expr(col("a"), Operator::Eq, lit(1_u32)), binary_expr(col("b"), Operator::Lt, lit(30_u32)), @@ -295,10 +295,36 @@ mod tests { let expected = "Projection: #test.b [b:UInt32]\ \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ - \n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.c [c:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + \n Projection: #sq.c [c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for IN subquery with additional OR filter + /// filter expression not modified + #[test] + fn in_subquery_with_or_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(or( + and( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + ), + in_subquery(col("c"), test_subquery_with_name("sq")?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) OR #test.c IN (\ + Subquery: Projection: #sq.c\ + \n TableScan: sq projection=None) [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(()) @@ -309,8 +335,8 @@ mod tests { fn in_subquery_nested() -> Result<()> { let table_scan = test_table_scan()?; - let subquery = LogicalPlanBuilder::from(table_scan.clone()) - .filter(in_subquery(col("a"), test_subquery()?))? + let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?) + .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))? .project(vec![col("a")])? .build()?; @@ -320,13 +346,42 @@ mod tests { .build()?; let expected = "Projection: #test.b [b:UInt32]\ - \n Semi Join: #test.b = #test.a [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.a [a:UInt32]\ - \n Semi Join: #test.a = #test.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq.a [a:UInt32]\ + \n Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq_nested.c [c:UInt32]\ + \n TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for filter input modification in case filter not supported + /// Outer filter expression not modified while inner converted to join + #[test] + fn in_subquery_input_modified() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))? + .project_with_alias(vec![col("b"), col("c")], Some("wrapped".to_string()))? + .filter(or( + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + in_subquery(col("c"), test_subquery_with_name("sq_outer")?), + ))? + .project(vec![col("b")])? + .build()?; + + let expected = "Projection: #wrapped.b [b:UInt32]\ + \n Filter: #wrapped.b < UInt32(30) OR #wrapped.c IN (\ + Subquery: Projection: #sq_outer.c\ + \n TableScan: sq_outer projection=None) [b:UInt32, c:UInt32]\ + \n Projection: #test.b, #test.c, alias=wrapped [b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: #test.c [c:UInt32]\ - \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + \n Projection: #sq_inner.c [c:UInt32]\ + \n TableScan: sq_inner projection=None [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_eq(&plan, expected); Ok(())