From 7549e8b4b4c758653a110e0497e60cce8fee36f1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 1 Jul 2024 16:15:02 -0700 Subject: [PATCH 1/5] fix: Incorrect LEFT JOIN evaluation result on OR conditions --- datafusion/optimizer/src/push_down_filter.rs | 7 +- datafusion/sqllogictest/test_files/join.slt | 73 ++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index fa432ad76de53..c827d6b5434e7 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -424,8 +424,9 @@ fn push_down_all_join( } } + let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?; + if !on_filter.is_empty() { - let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?; for on in on_filter { if on_left_preserved && can_pushdown_join_predicate(&on, left_schema)? { left_push.push(on) @@ -441,11 +442,11 @@ fn push_down_all_join( // Extract from OR clause, generate new predicates for both side of join if possible. // We only track the unpushable predicates above. - if left_preserved { + if on_left_preserved { left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema)); left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema)); } - if right_preserved { + if on_right_preserved { right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema)); right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema)); } diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 6732d3e9108b1..338e6b8ecdd90 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -793,3 +793,76 @@ DROP TABLE companies statement ok DROP TABLE leads + +# create tables +statement ok +CREATE TABLE employees(emp_id INT, name VARCHAR); + +statement ok +CREATE TABLE department(emp_id INT, dept_name VARCHAR); + +statement ok +INSERT INTO employees (emp_id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol'); + +statement ok +INSERT INTO department (emp_id, dept_name) VALUES (1, 'HR'), (3, 'Engineering'), (4, 'Sales'); + +query TT +EXPLAIN SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +logical_plan +01)Left Join: Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob") +02)--SubqueryAlias: e +03)----TableScan: employees projection=[emp_id, name] +04)--SubqueryAlias: d +05)----TableScan: department projection=[dept_name] +physical_plan +01)ProjectionExec: expr=[emp_id@1 as emp_id, name@2 as name, dept_name@0 as dept_name] +02)--NestedLoopJoinExec: join_type=Right, filter=name@0 = Alice OR name@0 = Bob +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +2 Bob HR +1 Alice Engineering +2 Bob Engineering +1 Alice Sales +2 Bob Sales +3 Carol NULL + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees e +LEFT JOIN department d +ON (e.name = 'NotExist1' OR e.name = 'NotExist2'); +---- +1 Alice NULL +2 Bob NULL +3 Carol NULL + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees e +LEFT JOIN department d +ON (e.name = 'Alice' OR e.name = 'NotExist'); +---- +1 Alice HR +1 Alice Engineering +1 Alice Sales +2 Bob NULL +3 Carol NULL + +statement ok +DROP TABLE employees + +statement ok +DROP TABLE department From a506bed921d774bd52b307d57922cb59260fbb35 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 2 Jul 2024 07:08:34 -0400 Subject: [PATCH 2/5] Add a few more test cases --- datafusion/sqllogictest/test_files/join.slt | 72 +++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 338e6b8ecdd90..fe5cc0db8ec09 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -794,6 +794,11 @@ DROP TABLE companies statement ok DROP TABLE leads +#### +## Test ON clause predicates are not pushed past join for OUTER JOINs +#### + + # create tables statement ok CREATE TABLE employees(emp_id INT, name VARCHAR); @@ -807,6 +812,7 @@ INSERT INTO employees (emp_id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol statement ok INSERT INTO department (emp_id, dept_name) VALUES (1, 'HR'), (3, 'Engineering'), (4, 'Sales'); +# Can not push the ON filter below an OUTER JOIN query TT EXPLAIN SELECT e.emp_id, e.name, d.dept_name FROM employees AS e @@ -839,6 +845,36 @@ ON (e.name = 'Alice' OR e.name = 'Bob'); 2 Bob Sales 3 Carol NULL +# neither RIGHT OUTER JOIN +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM department AS d +RIGHT JOIN employees AS e +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +2 Bob HR +1 Alice Engineering +2 Bob Engineering +1 Alice Sales +2 Bob Sales +3 Carol NULL + +# neither FULL OUTER JOIN +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM department AS d +FULL JOIN employees AS e +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +2 Bob HR +1 Alice Engineering +2 Bob Engineering +1 Alice Sales +2 Bob Sales +3 Carol NULL + query ITT SELECT e.emp_id, e.name, d.dept_name FROM employees e @@ -861,6 +897,42 @@ ON (e.name = 'Alice' OR e.name = 'NotExist'); 2 Bob NULL 3 Carol NULL +# Can push the ON filter below the JOIN for INNER JOIN (expect to see a filter below the join) +query TT +EXPLAIN SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +logical_plan +01)CrossJoin: +02)--SubqueryAlias: e +03)----Filter: employees.name = Utf8("Alice") OR employees.name = Utf8("Bob") +04)------TableScan: employees projection=[emp_id, name] +05)--SubqueryAlias: d +06)----TableScan: department projection=[dept_name] +physical_plan +01)CrossJoinExec +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----FilterExec: name@1 = Alice OR name@1 = Bob +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)--MemoryExec: partitions=1, partition_sizes=[1] + +# expect no row for Carol +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +JOIN department AS d +ON (e.name = 'Alice' OR e.name = 'Bob'); +---- +1 Alice HR +1 Alice Engineering +1 Alice Sales +2 Bob HR +2 Bob Engineering +2 Bob Sales + + statement ok DROP TABLE employees From 17b2d431e44c07fdb6ffc3f4c81c3add0ba0783a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Jul 2024 06:48:37 -0700 Subject: [PATCH 3/5] Don't push join filter predicates into join_conditions --- datafusion/optimizer/src/push_down_filter.rs | 23 +++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index c827d6b5434e7..a1848e2178795 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -424,6 +424,7 @@ fn push_down_all_join( } } + let mut on_filter_join_conditions = vec![]; let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?; if !on_filter.is_empty() { @@ -435,22 +436,37 @@ fn push_down_all_join( { right_push.push(on) } else { - join_conditions.push(on) + on_filter_join_conditions.push(on) } } } // Extract from OR clause, generate new predicates for both side of join if possible. // We only track the unpushable predicates above. - if on_left_preserved { + if left_preserved { left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema)); left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema)); } - if on_right_preserved { + if left_preserved { right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema)); right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema)); } + // For predicates from join filter, we should check with if a join side is preserved + // in term of join filtering. + if on_left_preserved { + left_push.extend(extract_or_clauses_for_join( + &on_filter_join_conditions, + left_schema, + )); + } + if on_right_preserved { + right_push.extend(extract_or_clauses_for_join( + &on_filter_join_conditions, + right_schema, + )); + } + if let Some(predicate) = conjunction(left_push) { join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.left)?)); } @@ -460,6 +476,7 @@ fn push_down_all_join( } // Add any new join conditions as the non join predicates + join_conditions.extend(on_filter_join_conditions); join.filter = conjunction(join_conditions); // wrap the join on the filter whose predicates must be kept, if any From 68e883812457331d610503cc7dac6aa845e94803 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Jul 2024 07:03:33 -0700 Subject: [PATCH 4/5] Add test case and fix typo --- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/sqllogictest/test_files/join.slt | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index a1848e2178795..664fc93a762a5 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -447,7 +447,7 @@ fn push_down_all_join( left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema)); left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema)); } - if left_preserved { + if right_preserved { right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema)); right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema)); } diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index fe5cc0db8ec09..5133f39f02b6d 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -932,6 +932,16 @@ ON (e.name = 'Alice' OR e.name = 'Bob'); 2 Bob Engineering 2 Bob Sales +# OR conditions on Filter (not join filter) +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON e.emp_id = d.emp_id +WHERE (e.name = 'Alice' OR e.name = 'Carol'); +---- +1 Alice HR +3 Carol Engineering statement ok DROP TABLE employees From 70bf59fb1464c6277a00009eed3939eeedeea21d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Jul 2024 07:32:48 -0700 Subject: [PATCH 5/5] Add test case --- datafusion/sqllogictest/test_files/join.slt | 38 +++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 5133f39f02b6d..3c89109145d70 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -943,6 +943,44 @@ WHERE (e.name = 'Alice' OR e.name = 'Carol'); 1 Alice HR 3 Carol Engineering +# Push down OR conditions on Filter through LEFT JOIN if possible +query TT +EXPLAIN SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON e.emp_id = d.emp_id +WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' AND e.name = 'Carol')); +---- +logical_plan +01)Filter: d.dept_name != Utf8("Engineering") AND e.name = Utf8("Alice") OR e.name != Utf8("Alice") AND e.name = Utf8("Carol") +02)--Projection: e.emp_id, e.name, d.dept_name +03)----Left Join: e.emp_id = d.emp_id +04)------SubqueryAlias: e +05)--------Filter: employees.name = Utf8("Alice") OR employees.name != Utf8("Alice") AND employees.name = Utf8("Carol") +06)----------TableScan: employees projection=[emp_id, name] +07)------SubqueryAlias: d +08)--------TableScan: department projection=[emp_id, dept_name] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: dept_name@2 != Engineering AND name@1 = Alice OR name@1 != Alice AND name@1 = Carol +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------HashJoinExec: mode=CollectLeft, join_type=Left, on=[(emp_id@0, emp_id@0)], projection=[emp_id@0, name@1, dept_name@3] +06)----------CoalesceBatchesExec: target_batch_size=8192 +07)------------FilterExec: name@1 = Alice OR name@1 != Alice AND name@1 = Carol +08)--------------MemoryExec: partitions=1, partition_sizes=[1] +09)----------MemoryExec: partitions=1, partition_sizes=[1] + +query ITT +SELECT e.emp_id, e.name, d.dept_name +FROM employees AS e +LEFT JOIN department AS d +ON e.emp_id = d.emp_id +WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' AND e.name = 'Carol')); +---- +1 Alice HR +3 Carol Engineering + statement ok DROP TABLE employees