diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index ef8abb01c2d2c..15b7f987a20ec 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1074,6 +1074,16 @@ mod tests { run_query(14).await } + #[tokio::test] + async fn run_q16() -> Result<()> { + run_query(16).await + } + + #[tokio::test] + async fn run_q18() -> Result<()> { + run_query(18).await + } + #[tokio::test] async fn run_q19() -> Result<()> { run_query(19).await diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 01a5eefa9dc01..895e5bc1e71dc 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -72,6 +72,7 @@ use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::simplify_expressions::SimplifyExpressions; use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy; +use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; @@ -1199,6 +1200,7 @@ impl SessionState { // Simplify expressions first to maximize the chance // of applying other optimizations Arc::new(SimplifyExpressions::new()), + Arc::new(SubqueryFilterToJoin::new()), Arc::new(EliminateFilter::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 19535de86b9f5..0fd107b40dead 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -14,20 +14,17 @@ //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan +use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_expr::TableProviderFilterPushDown; use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union}; use crate::logical_plan::{ - and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, + col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan, }; use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; -use crate::{error::Result, logical_plan::Operator}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet}; /// Filter Push Down optimizer rule pushes filter clauses down the plan /// # Introduction @@ -95,23 +92,6 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result { utils::from_plan(plan, &expr, &new_inputs) } -/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with -/// its predicate be all `predicates` ANDed. -fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { - // reduce filters to a single filter with an AND - let predicate = predicates - .iter() - .skip(1) - .fold(predicates[0].clone(), |acc, predicate| { - and(acc, (*predicate).to_owned()) - }); - - LogicalPlan::Filter(Filter { - predicate, - input: Arc::new(plan), - }) -} - // remove all filters from `filters` that are in `predicate_columns` fn remove_filters( filters: &[(Expr, HashSet)], @@ -150,7 +130,7 @@ fn issue_filters( return push_down(&state, plan); } - let plan = add_filter(plan.clone(), &predicates); + let plan = utils::add_filter(plan.clone(), &predicates); state.filters = remove_filters(&state.filters, &predicate_columns); @@ -158,24 +138,6 @@ fn issue_filters( push_down(&state, &plan) } -/// converts "A AND B AND C" => [A, B, C] -fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { - match predicate { - Expr::BinaryExpr { - right, - op: Operator::And, - left, - } => { - split_members(left, predicates); - split_members(right, predicates); - } - Expr::Alias(expr, _) => { - split_members(expr, predicates); - } - other => predicates.push(other), - } -} - // For a given JOIN logical plan, determine whether each side of the join is preserved. // We say a join side is preserved if the join returns all or a subset of the rows from // the relevant side, such that each row of the output table directly maps to a row of @@ -289,7 +251,7 @@ fn optimize_join( Ok(plan) } else { // wrap the join on the filter whose predicates must be kept - let plan = add_filter(plan, &to_keep.0); + let plan = utils::add_filter(plan, &to_keep.0); state.filters = remove_filters(&state.filters, &to_keep.1); Ok(plan) @@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { LogicalPlan::Analyze { .. } => push_down(&state, plan), LogicalPlan::Filter(Filter { input, predicate }) => { let mut predicates = vec![]; - split_members(predicate, &mut predicates); + utils::split_conjunction(predicate, &mut predicates); // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) let mut no_col_predicates = vec![]; @@ -328,7 +290,10 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // As those contain only literals, they could be optimized using constant folding // and removal of WHERE TRUE / WHERE FALSE if !no_col_predicates.is_empty() { - Ok(add_filter(optimize(input, state)?, &no_col_predicates)) + Ok(utils::add_filter( + optimize(input, state)?, + &no_col_predicates, + )) } else { optimize(input, state) } @@ -592,17 +557,18 @@ fn rewrite(expr: &Expr, projection: &HashMap) -> Result { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::datasource::TableProvider; + use crate::logical_plan::plan::provider_as_source; use crate::logical_plan::{ - lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator, + and, col, lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, + Operator, }; use crate::physical_plan::ExecutionPlan; + use crate::prelude::JoinType; use crate::test::*; - use crate::{ - logical_plan::{col, plan::provider_as_source}, - prelude::JoinType, - }; use arrow::datatypes::SchemaRef; use async_trait::async_trait; diff --git a/datafusion/core/src/optimizer/mod.rs b/datafusion/core/src/optimizer/mod.rs index 9f12ecea81df0..b274ab645f54c 100644 --- a/datafusion/core/src/optimizer/mod.rs +++ b/datafusion/core/src/optimizer/mod.rs @@ -28,4 +28,5 @@ pub mod optimizer; pub mod projection_push_down; pub mod simplify_expressions; pub mod single_distinct_to_groupby; +pub mod subquery_filter_to_join; pub mod utils; diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs new file mode 100644 index 0000000000000..5f4583c28f75d --- /dev/null +++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule for rewriting subquery filters to joins +//! +//! It handles standalone parts of logical conjunction expressions, i.e. +//! ```text +//! WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x' +//! ``` +//! will be rewritten, but +//! ```text +//! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x' +//! ``` +//! won't +use std::sync::Arc; + +use crate::error::{DataFusionError, Result}; +use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{Filter, Join}; +use crate::logical_plan::{ + build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan, +}; +use crate::optimizer::optimizer::OptimizerRule; +use crate::optimizer::utils; + +/// Optimizer rule for rewriting subquery filters to joins +#[derive(Default)] +pub struct SubqueryFilterToJoin {} + +impl SubqueryFilterToJoin { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for SubqueryFilterToJoin { + fn optimize( + &self, + plan: &LogicalPlan, + execution_props: &ExecutionProps, + ) -> Result { + match plan { + LogicalPlan::Filter(Filter { predicate, input }) => { + // Apply optimizer rule to current input + let optimized_input = self.optimize(input, execution_props)?; + + // Splitting filter expression into components by AND + let mut filters = vec![]; + utils::split_conjunction(predicate, &mut filters); + + // Searching for subquery-based filters + let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) = + filters + .into_iter() + .partition(|&e| matches!(e, Expr::InSubquery { .. })); + + // Check all subquery filters could be rewritten + // + // In case of expressions which could not be rewritten + // return original filter with optimized input + let mut subqueries_in_regular = vec![]; + regular_filters.iter().try_for_each(|&e| { + extract_subquery_filters(e, &mut subqueries_in_regular) + })?; + + if !subqueries_in_regular.is_empty() { + return Ok(LogicalPlan::Filter(Filter { + predicate: predicate.clone(), + input: Arc::new(optimized_input), + })); + }; + + // Add subquery joins to new_input + // optimized_input value should retain for possible optimization rollback + let opt_result = subquery_filters.iter().try_fold( + optimized_input.clone(), + |input, &e| match e { + Expr::InSubquery { + expr, + subquery, + negated, + } => { + let right_input = self.optimize( + &*subquery.subquery, + execution_props + )?; + let right_schema = right_input.schema(); + if right_schema.fields().len() != 1 { + return Err(DataFusionError::Plan( + "Only single column allowed in InSubquery" + .to_string(), + )); + }; + + let right_key = right_schema.field(0).qualified_column(); + let left_key = match *expr.clone() { + Expr::Column(col) => col, + _ => return Err(DataFusionError::NotImplemented( + "Filtering by expression not implemented for InSubquery" + .to_string(), + )), + }; + + let join_type = if *negated { + JoinType::Anti + } else { + JoinType::Semi + }; + + let schema = build_join_schema( + optimized_input.schema(), + right_schema, + &join_type, + )?; + + Ok(LogicalPlan::Join(Join { + left: Arc::new(input), + right: Arc::new(right_input), + on: vec![(left_key, right_key)], + join_type, + join_constraint: JoinConstraint::On, + schema: Arc::new(schema), + null_equals_null: false, + })) + } + _ => Err(DataFusionError::Plan( + "Unknown expression while rewriting subquery to joins" + .to_string(), + )), + } + ); + + // In case of expressions which could not be rewritten + // return original filter with optimized input + let new_input = match opt_result { + Ok(plan) => plan, + Err(_) => { + return Ok(LogicalPlan::Filter(Filter { + predicate: predicate.clone(), + input: Arc::new(optimized_input), + })) + } + }; + + // Apply regular filters to join output if some or just return join + if regular_filters.is_empty() { + Ok(new_input) + } else { + Ok(utils::add_filter(new_input, ®ular_filters)) + } + } + _ => { + // Apply the optimization to all inputs of the plan + utils::optimize_children(self, plan, execution_props) + } + } + } + + fn name(&self) -> &str { + "subquery_filter_to_join" + } +} + +fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Result<()> { + utils::expr_sub_expressions(expression)? + .into_iter() + .try_for_each(|se| match se { + Expr::InSubquery { .. } => { + extracted.push(se); + Ok(()) + } + _ => extract_subquery_filters(&se, extracted), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::logical_plan::{ + and, binary_expr, col, in_subquery, lit, not_in_subquery, or, LogicalPlanBuilder, + Operator, + }; + use crate::test::*; + + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { + let rule = SubqueryFilterToJoin::new(); + let optimized_plan = rule + .optimize(plan, &ExecutionProps::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); + assert_eq!(formatted_plan, expected); + } + + fn test_subquery_with_name(name: &str) -> Result> { + let table_scan = test_table_scan_with_name(name)?; + Ok(Arc::new( + LogicalPlanBuilder::from(table_scan) + .project(vec![col("c")])? + .build()?, + )) + } + + /// Test for single IN subquery filter + #[test] + fn in_subquery_simple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq.c [c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for single NOT IN subquery filter + #[test] + fn not_in_subquery_simple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Anti Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq.c [c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for several IN subquery expressions + #[test] + fn in_subquery_multiple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + in_subquery(col("c"), test_subquery_with_name("sq_1")?), + in_subquery(col("b"), test_subquery_with_name("sq_2")?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Semi Join: #test.b = #sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq_1.c [c:UInt32]\ + \n TableScan: sq_1 projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq_2.c [c:UInt32]\ + \n TableScan: sq_2 projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for IN subquery with additional AND filter + #[test] + fn in_subquery_with_and_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and( + in_subquery(col("c"), test_subquery_with_name("sq")?), + and( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + ), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq.c [c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for IN subquery with additional OR filter + /// filter expression not modified + #[test] + fn in_subquery_with_or_filters() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(or( + and( + binary_expr(col("a"), Operator::Eq, lit(1_u32)), + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + ), + in_subquery(col("c"), test_subquery_with_name("sq")?), + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) OR #test.c IN (\ + Subquery: Projection: #sq.c\ + \n TableScan: sq projection=None) [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for nested IN subqueries + #[test] + fn in_subquery_nested() -> Result<()> { + let table_scan = test_table_scan()?; + + let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?) + .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))? + .project(vec![col("a")])? + .build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("b"), Arc::new(subquery)))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: #test.b [b:UInt32]\ + \n Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq.a [a:UInt32]\ + \n Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq_nested.c [c:UInt32]\ + \n TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + /// Test for filter input modification in case filter not supported + /// Outer filter expression not modified while inner converted to join + #[test] + fn in_subquery_input_modified() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))? + .project_with_alias(vec![col("b"), col("c")], Some("wrapped".to_string()))? + .filter(or( + binary_expr(col("b"), Operator::Lt, lit(30_u32)), + in_subquery(col("c"), test_subquery_with_name("sq_outer")?), + ))? + .project(vec![col("b")])? + .build()?; + + let expected = "Projection: #wrapped.b [b:UInt32]\ + \n Filter: #wrapped.b < UInt32(30) OR #wrapped.c IN (\ + Subquery: Projection: #sq_outer.c\ + \n TableScan: sq_outer projection=None) [b:UInt32, c:UInt32]\ + \n Projection: #test.b, #test.c, alias=wrapped [b:UInt32, c:UInt32]\ + \n Semi Join: #test.c = #sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\ + \n Projection: #sq_inner.c [c:UInt32]\ + \n TableScan: sq_inner projection=None [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } +} diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index df36761fec40b..48855df9f8e8a 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -25,7 +25,7 @@ use datafusion_expr::logical_plan::{ }; use crate::logical_plan::{ - build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, ExprVisitable, + and, build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, ExprVisitable, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion, Repartition, Union, Values, }; @@ -556,6 +556,41 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { } } +/// converts "A AND B AND C" => [A, B, C] +pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { + match predicate { + Expr::BinaryExpr { + right, + op: Operator::And, + left, + } => { + split_conjunction(left, predicates); + split_conjunction(right, predicates); + } + Expr::Alias(expr, _) => { + split_conjunction(expr, predicates); + } + other => predicates.push(other), + } +} + +/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with +/// its predicate be all `predicates` ANDed. +pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { + // reduce filters to a single filter with an AND + let predicate = predicates + .iter() + .skip(1) + .fold(predicates[0].clone(), |acc, predicate| { + and(acc, (*predicate).to_owned()) + }); + + LogicalPlan::Filter(Filter { + predicate, + input: Arc::new(plan), + }) +} + #[cfg(test)] mod tests { use super::*;