From 6890d23f838b99ae07e66da7828c1e59f1795bc8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 3 Jul 2021 15:50:33 +0200 Subject: [PATCH 1/7] Allow non-equijoin filters in join condition --- benchmarks/queries/q13.sql | 2 +- datafusion/src/sql/planner.rs | 66 +++++++++++++++++++++++++---------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/benchmarks/queries/q13.sql b/benchmarks/queries/q13.sql index 4bfe8c35553cb..f7ede4ecb5f40 100644 --- a/benchmarks/queries/q13.sql +++ b/benchmarks/queries/q13.sql @@ -5,7 +5,7 @@ from ( select c_custkey, - count(o_orderkey) + count(o_orderkey) c_count from customer left outer join orders on c_custkey = o_custkey diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index b86dc0f48c149..bba5495ff46cb 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -368,15 +368,29 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // parse ON expression let expr = self.sql_to_rex(sql_expr, &join_schema)?; + // expression that didn't match equi-join pattern + let mut filter = vec![]; + // extract join keys - extract_join_keys(&expr, &mut keys)?; + extract_join_keys(&expr, &mut keys, &mut filter); let (left_keys, right_keys): (Vec, Vec) = keys.into_iter().unzip(); // return the logical plan representing the join - LogicalPlanBuilder::from(left) - .join(right, join_type, left_keys, right_keys)? + let join = LogicalPlanBuilder::from(left) + .join(right, join_type, left_keys, right_keys)?; + + if filter.is_empty() { + join.build() + } else { + join.filter( + filter + .iter() + .skip(1) + .fold(filter[0].clone(), |acc, e| acc.and(e.clone())), + )? .build() + } } JoinConstraint::Using(idents) => { let keys: Vec = idents @@ -1556,32 +1570,32 @@ fn remove_join_expressions( /// foo = bar /// foo = bar AND bar = baz AND ... /// -fn extract_join_keys(expr: &Expr, accum: &mut Vec<(Column, Column)>) -> Result<()> { +fn extract_join_keys( + expr: &Expr, + accum: &mut Vec<(Column, Column)>, + accum_filter: &mut Vec, +) { match expr { Expr::BinaryExpr { left, op, right } => match op { Operator::Eq => match (left.as_ref(), right.as_ref()) { (Expr::Column(l), Expr::Column(r)) => { accum.push((l.clone(), r.clone())); - Ok(()) } - other => Err(DataFusionError::SQL(ParserError(format!( - "Unsupported expression '{:?}' in JOIN condition", - other - )))), + _other => { + accum_filter.push(expr.clone()); + } }, Operator::And => { - extract_join_keys(left, accum)?; - extract_join_keys(right, accum) + extract_join_keys(left, accum, accum_filter); + extract_join_keys(right, accum, accum_filter); + } + _other => { + accum_filter.push(expr.clone()); } - other => Err(DataFusionError::SQL(ParserError(format!( - "Unsupported expression '{:?}' in JOIN condition", - other - )))), }, - other => Err(DataFusionError::SQL(ParserError(format!( - "Unsupported expression '{:?}' in JOIN condition", - other - )))), + _other => { + accum_filter.push(expr.clone()); + } } } @@ -2701,6 +2715,20 @@ mod tests { quick_test(sql, expected); } + #[test] + fn equijoin_unsupported_expression() { + let sql = "SELECT id, order_id \ + FROM person \ + JOIN orders \ + ON id = customer_id AND order_id > 1 "; + let expected = "Projection: #person.id, #orders.order_id\ + \n Filter: #orders.order_id Gt Int64(1) + \n Join: #person.id = #orders.customer_id\ + \n TableScan: person projection=None\ + \n TableScan: orders projection=None"; + quick_test(sql, expected); + } + #[test] fn join_with_table_name() { let sql = "SELECT id, order_id \ From 71b42ba422d4d0461d73d83ce5ab376cfbe79c07 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 3 Jul 2021 15:54:07 +0200 Subject: [PATCH 2/7] Revert change to query --- benchmarks/queries/q13.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/queries/q13.sql b/benchmarks/queries/q13.sql index f7ede4ecb5f40..4bfe8c35553cb 100644 --- a/benchmarks/queries/q13.sql +++ b/benchmarks/queries/q13.sql @@ -5,7 +5,7 @@ from ( select c_custkey, - count(o_orderkey) c_count + count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey From d8e07be2d2f703cab1223a8f9b82cab562a11864 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 3 Jul 2021 16:23:19 +0200 Subject: [PATCH 3/7] Fix, only do for inner join --- datafusion/src/sql/planner.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index bba5495ff46cb..e12d4e7336ab1 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -382,7 +382,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if filter.is_empty() { join.build() - } else { + } else if join_type == JoinType::Inner { join.filter( filter .iter() @@ -390,6 +390,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .fold(filter[0].clone(), |acc, e| acc.and(e.clone())), )? .build() + } else { + Err(DataFusionError::NotImplemented(format!( + "Unsupported expressions in {:?} JOIN: {:?}", + join_type, filter + ))) } } JoinConstraint::Using(idents) => { @@ -2722,7 +2727,7 @@ mod tests { JOIN orders \ ON id = customer_id AND order_id > 1 "; let expected = "Projection: #person.id, #orders.order_id\ - \n Filter: #orders.order_id Gt Int64(1) + \n Filter: #orders.order_id Gt Int64(1)\ \n Join: #person.id = #orders.customer_id\ \n TableScan: person projection=None\ \n TableScan: orders projection=None"; From ecc6d9ae18f19f45de5169d8e908c7bdc893a996 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 3 Jul 2021 16:30:10 +0200 Subject: [PATCH 4/7] Add test --- datafusion/tests/sql.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index c06a4bb1462ee..3d578b57c211d 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1694,6 +1694,17 @@ async fn equijoin() -> Result<()> { Ok(()) } +#[tokio::test] +async fn equijoin_unsuppored_condition() -> Result<()> { + let mut ctx = create_join_context("t1_id", "t2_id")?; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id"; + let actual = execute(&mut ctx, sql).await; + let expected = vec![vec!["11", "a", "z"], vec!["22", "b", "y"]]; + assert_eq!(expected, actual); + Ok(()) +} + #[tokio::test] async fn left_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; From 7e6b7c27b5d5d834910ee66cb1fdee9a2ebeafa6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 5 Jul 2021 08:43:16 +0200 Subject: [PATCH 5/7] docs update --- datafusion/src/sql/planner.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index e12d4e7336ab1..3f9a04cb978c3 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -1568,12 +1568,14 @@ fn remove_join_expressions( } } -/// Parse equijoin ON condition which could be a single Eq or multiple conjunctive Eqs +/// Extracts equijoin ON condition be a single Eq or multiple conjunctive Eqs +/// Filters matching this pattern are added to `accum` +/// Filters that don't match this pattern are added to `accum_filter` +/// Examples: /// -/// Examples -/// -/// foo = bar -/// foo = bar AND bar = baz AND ... +/// foo = bar => accum=[(foo, bar)] accum_filter=[] +/// foo = bar AND bar = baz => accum=[(foo, bar), (bar, baz)] accum_filter=[] +/// foo = bar AND baz > 1 => accum=[(foo, bar)] accum_filter=[baz > 1] /// fn extract_join_keys( expr: &Expr, From 878a4605ad6410800385a7ab37f83c1c699c56c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jul 2021 08:11:03 +0200 Subject: [PATCH 6/7] Update test name Co-authored-by: Andrew Lamb --- datafusion/tests/sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 3d578b57c211d..5400d7a83f924 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1695,7 +1695,7 @@ async fn equijoin() -> Result<()> { } #[tokio::test] -async fn equijoin_unsuppored_condition() -> Result<()> { +async fn equijoin_and_other_condition() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id"; From 503a137c57e0ef65830eaf6cb10edbb162ee7e58 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 7 Jul 2021 08:19:05 +0200 Subject: [PATCH 7/7] Add negative test --- datafusion/tests/sql.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 5400d7a83f924..3445df3b741b7 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1705,6 +1705,17 @@ async fn equijoin_and_other_condition() -> Result<()> { Ok(()) } +#[tokio::test] +async fn equijoin_and_unsupported_condition() -> Result<()> { + let ctx = create_join_context("t1_id", "t2_id")?; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id"; + let res = ctx.create_logical_plan(sql); + assert!(res.is_err()); + assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t2.t2_name GtEq Utf8(\"y\")]"); + Ok(()) +} + #[tokio::test] async fn left_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?;