-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Refactor DecorrelateWhereExists and add back Distinct if needs #5345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2ce552b
45f3212
af25a6e
dbaf64c
0c28e11
47b1cd5
0b4d252
573c48b
8e6c726
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,14 +16,16 @@ | |
| // under the License. | ||
|
|
||
| use crate::optimizer::ApplyOrder; | ||
| use crate::utils::{conjunction, extract_join_filters, split_conjunction}; | ||
| use crate::utils::{ | ||
| collect_subquery_cols, conjunction, extract_join_filters, split_conjunction, | ||
| }; | ||
| use crate::{OptimizerConfig, OptimizerRule}; | ||
| use datafusion_common::{Column, DataFusionError, Result}; | ||
| use datafusion_expr::{ | ||
| logical_plan::{Filter, JoinType, Subquery}, | ||
| logical_plan::{Distinct, Filter, JoinType, Subquery}, | ||
| Expr, LogicalPlan, LogicalPlanBuilder, | ||
| }; | ||
| use std::collections::BTreeSet; | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| /// Optimizer rule for rewriting subquery filters to joins | ||
|
|
@@ -142,69 +144,70 @@ fn optimize_exists( | |
| query_info: &SubqueryInfo, | ||
| outer_input: &LogicalPlan, | ||
| ) -> Result<Option<LogicalPlan>> { | ||
| let maybe_subqury_filter = match query_info.query.subquery.as_ref() { | ||
| LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() { | ||
| LogicalPlan::Projection(subqry_proj) => &subqry_proj.input, | ||
| _ => { | ||
| return Ok(None); | ||
| } | ||
| }, | ||
| LogicalPlan::Projection(subqry_proj) => &subqry_proj.input, | ||
| _ => { | ||
| // Subquery currently only supports distinct or projection | ||
| return Ok(None); | ||
| } | ||
| } | ||
| .as_ref(); | ||
| let subquery = query_info.query.subquery.as_ref(); | ||
| if let Some((join_filter, optimized_subquery)) = optimize_subquery(subquery)? { | ||
| // join our sub query into the main plan | ||
| let join_type = match query_info.negated { | ||
| true => JoinType::LeftAnti, | ||
| false => JoinType::LeftSemi, | ||
| }; | ||
|
|
||
| let new_plan = LogicalPlanBuilder::from(outer_input.clone()) | ||
| .join( | ||
| optimized_subquery, | ||
| join_type, | ||
| (Vec::<Column>::new(), Vec::<Column>::new()), | ||
| Some(join_filter), | ||
| )? | ||
| .build()?; | ||
|
|
||
| // extract join filters | ||
| let (join_filters, subquery_input) = extract_join_filters(maybe_subqury_filter)?; | ||
| // cannot optimize non-correlated subquery | ||
| if join_filters.is_empty() { | ||
| return Ok(None); | ||
| Ok(Some(new_plan)) | ||
| } else { | ||
| Ok(None) | ||
| } | ||
|
|
||
| let input_schema = subquery_input.schema(); | ||
| let subquery_cols: BTreeSet<Column> = | ||
| join_filters | ||
| .iter() | ||
| .try_fold(BTreeSet::new(), |mut cols, expr| { | ||
| let using_cols: Vec<Column> = expr | ||
| .to_columns()? | ||
| } | ||
| /// Optimize the subquery and extract the possible join filter. | ||
| /// This function can't optimize non-correlated subquery, and will return None. | ||
| fn optimize_subquery(subquery: &LogicalPlan) -> Result<Option<(Expr, LogicalPlan)>> { | ||
| match subquery { | ||
| LogicalPlan::Distinct(subqry_distinct) => { | ||
| let distinct_input = &subqry_distinct.input; | ||
| let optimized_plan = | ||
| optimize_subquery(distinct_input)?.map(|(filters, right)| { | ||
| ( | ||
| filters, | ||
| LogicalPlan::Distinct(Distinct { | ||
| input: Arc::new(right), | ||
| }), | ||
| ) | ||
| }); | ||
| Ok(optimized_plan) | ||
| } | ||
| LogicalPlan::Projection(projection) => { | ||
| // extract join filters | ||
| let (join_filters, subquery_input) = extract_join_filters(&projection.input)?; | ||
| // cannot optimize non-correlated subquery | ||
| if join_filters.is_empty() { | ||
| return Ok(None); | ||
| } | ||
| let input_schema = subquery_input.schema(); | ||
| let project_exprs: Vec<Expr> = | ||
| collect_subquery_cols(&join_filters, input_schema.clone())? | ||
| .into_iter() | ||
| .filter(|col| input_schema.has_column(col)) | ||
| .collect::<_>(); | ||
|
|
||
| cols.extend(using_cols); | ||
| Result::<_, DataFusionError>::Ok(cols) | ||
| .map(Expr::Column) | ||
| .collect(); | ||
| let right = LogicalPlanBuilder::from(subquery_input) | ||
| .project(project_exprs)? | ||
| .build()?; | ||
|
|
||
| // join_filters is not empty. | ||
| let join_filter = conjunction(join_filters).ok_or_else(|| { | ||
| DataFusionError::Internal("join filters should not be empty".to_string()) | ||
| })?; | ||
|
|
||
| let projection_exprs: Vec<Expr> = | ||
| subquery_cols.into_iter().map(Expr::Column).collect(); | ||
|
|
||
| let right = LogicalPlanBuilder::from(subquery_input) | ||
| .project(projection_exprs)? | ||
| .build()?; | ||
|
|
||
| let join_filter = conjunction(join_filters); | ||
|
|
||
| // join our sub query into the main plan | ||
| let join_type = match query_info.negated { | ||
| true => JoinType::LeftAnti, | ||
| false => JoinType::LeftSemi, | ||
| }; | ||
|
|
||
| // TODO: add Distinct if the original plan is a Distinct. | ||
| let new_plan = LogicalPlanBuilder::from(outer_input.clone()) | ||
| .join( | ||
| right, | ||
| join_type, | ||
| (Vec::<Column>::new(), Vec::<Column>::new()), | ||
| join_filter, | ||
| )? | ||
| .build()?; | ||
|
|
||
| Ok(Some(new_plan)) | ||
| Ok(Some((join_filter, right))) | ||
| } | ||
| _ => Ok(None), | ||
| } | ||
| } | ||
|
|
||
| struct SubqueryInfo { | ||
|
|
@@ -670,4 +673,76 @@ mod tests { | |
|
|
||
| assert_plan_eq(&plan, expected) | ||
| } | ||
|
|
||
| #[test] | ||
| fn exists_distinct_subquery() -> Result<()> { | ||
| let table_scan = test_table_scan()?; | ||
| let subquery_scan = test_table_scan_with_name("sq")?; | ||
| let subquery = LogicalPlanBuilder::from(subquery_scan) | ||
| .filter((lit(1u32) + col("sq.a")).gt(col("test.a") * lit(2u32)))? | ||
| .project(vec![col("sq.c")])? | ||
| .distinct()? | ||
| .build()?; | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .filter(exists(Arc::new(subquery)))? | ||
| .project(vec![col("test.b")])? | ||
| .build()?; | ||
|
|
||
| let expected = "Projection: test.b [b:UInt32]\ | ||
| \n LeftSemi Join: Filter: UInt32(1) + sq.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n Distinct: [a:UInt32]\ | ||
| \n Projection: sq.a [a:UInt32]\ | ||
| \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; | ||
|
|
||
| assert_plan_eq(&plan, expected) | ||
| } | ||
|
|
||
| #[test] | ||
| fn exists_distinct_expr_subquery() -> Result<()> { | ||
| let table_scan = test_table_scan()?; | ||
| let subquery_scan = test_table_scan_with_name("sq")?; | ||
| let subquery = LogicalPlanBuilder::from(subquery_scan) | ||
| .filter((lit(1u32) + col("sq.a")).gt(col("test.a") * lit(2u32)))? | ||
| .project(vec![col("sq.b") + col("sq.c")])? | ||
| .distinct()? | ||
| .build()?; | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .filter(exists(Arc::new(subquery)))? | ||
| .project(vec![col("test.b")])? | ||
| .build()?; | ||
|
|
||
| let expected = "Projection: test.b [b:UInt32]\ | ||
| \n LeftSemi Join: Filter: UInt32(1) + sq.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n Distinct: [a:UInt32]\ | ||
| \n Projection: sq.a [a:UInt32]\ | ||
| \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; | ||
|
|
||
| assert_plan_eq(&plan, expected) | ||
| } | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find the behaviors of
I think I am not sure if the way of this pr is appropriate.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it likely depends on how the join operators are implemented If the However, since there is no equality predicate (the predicate is
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is a good job.
I'm also not sure about it
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@ygf11 @jackwener @alamb You can test try those SQLs in Postgres: You can see that for I think in DataFusion, I like the implementation in this PR and the generated plan is more consistent. For the correctness of NAAJ, we have another ticket to track this and we can make the Hash Join itself Null aware. |
||
|
|
||
| #[test] | ||
| fn exists_distinct_subquery_with_literal() -> Result<()> { | ||
| let table_scan = test_table_scan()?; | ||
| let subquery_scan = test_table_scan_with_name("sq")?; | ||
| let subquery = LogicalPlanBuilder::from(subquery_scan) | ||
| .filter((lit(1u32) + col("sq.a")).gt(col("test.a") * lit(2u32)))? | ||
| .project(vec![lit(1u32), col("sq.c")])? | ||
| .distinct()? | ||
| .build()?; | ||
| let plan = LogicalPlanBuilder::from(table_scan) | ||
| .filter(exists(Arc::new(subquery)))? | ||
| .project(vec![col("test.b")])? | ||
| .build()?; | ||
|
|
||
| let expected = "Projection: test.b [b:UInt32]\ | ||
| \n LeftSemi Join: Filter: UInt32(1) + sq.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ | ||
| \n Distinct: [a:UInt32]\ | ||
| \n Projection: sq.a [a:UInt32]\ | ||
| \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; | ||
|
|
||
| assert_plan_eq(&plan, expected) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure the behavior of
Distinctis correct, so do not handleAggregatehere.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a closer look at this PR tomorrow. If you do not know how to handle
Aggregatehere, you can just leave it here and only handle theDistinctcase.A simplest approach is checking whether there are out reference used by
Aggregateexpressions, if there are, return Err and else add the correlated columns to be part of the group by columns. (Only allow out reference columns referred inFilterexpressions orJoinexpressions, this will limit the supported cases ofSubqueries, but more safe.)For example if the original inner aggregate is
group by inner_aand there is correlation condition likeouter_b = inner_b, them add theinner_bto be part of the group by conditions. It is not a perfect solution and sometimes might cause some bug.SparkSQL has the similar logic in the rule
pullOutCorrelatedPredicatesThe latest SparkSQL's implementation also has bug here (need to differ the original Aggregate is Scalar Aggregate or Vector Aggregate)
SQL to reproduce the bug, you can have a try on both PostgreSQL and SparkSQL
PostgreSQL does not support decorrelate
Subquerieswhich includeAggregates. SparkSQL supports some cases but not all the cases and the implementation has bug.I'm going to implement a general subquery decorrelation rule based on the two well known papers.
#5492
Orthogonal Optimization of Subqueries and Aggregation
https://dl.acm.org/doi/10.1145/375663.375748
Unnesting Arbitrary Queries
https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job !
may related #5368