From 8b15468f1dbebe4926009bd359b511fe3a695e8f Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 28 Jan 2025 21:06:18 +0800 Subject: [PATCH 1/3] fix: FULL OUTER JOIN and LIMIT produces wrong results --- datafusion/optimizer/src/push_down_limit.rs | 1 - datafusion/sqllogictest/test_files/joins.slt | 159 +++++++++++++++++-- 2 files changed, 147 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 8a3aa4bb84599..4da112d5153a3 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { match join.join_type { Left => (Some(limit), None), Right => (None, Some(limit)), - Full => (Some(limit), Some(limit)), _ => (None, None), } }; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 496c6c609e45e..ac02aeb6fea4e 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4240,10 +4240,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2; logical_plan 01)Limit: skip=0, fetch=2 02)--Full Join: t0.c1 = t1.c1 -03)----Limit: skip=0, fetch=2 -04)------TableScan: t0 projection=[c1, c2], fetch=2 -05)----Limit: skip=0, fetch=2 -06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +03)----TableScan: t0 projection=[c1, c2] +04)----TableScan: t1 projection=[c1, c2, c3] physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] @@ -4257,10 +4255,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; logical_plan 01)Limit: skip=0, fetch=2 02)--Full Join: Filter: t0.c2 >= t1.c2 -03)----Limit: skip=0, fetch=2 -04)------TableScan: t0 projection=[c1, c2], fetch=2 -05)----Limit: skip=0, fetch=2 -06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +03)----TableScan: t0 projection=[c1, c2] +04)----TableScan: t1 projection=[c1, c2, c3] physical_plan 01)GlobalLimitExec: skip=0, fetch=2 02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 @@ -4274,16 +4270,155 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT logical_plan 01)Limit: skip=0, fetch=2 02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2 -03)----Limit: skip=0, fetch=2 -04)------TableScan: t0 projection=[c1, c2], fetch=2 -05)----Limit: skip=0, fetch=2 -06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +03)----TableScan: t0 projection=[c1, c2] +04)----TableScan: t1 projection=[c1, c2, c3] physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 03)----MemoryExec: partitions=1, partition_sizes=[1] 04)----MemoryExec: partitions=1, partition_sizes=[1] +## Add more test cases for join limit pushdown +statement ok +drop table t1 + +## Test limit pushdown through OUTER JOIN including left/right and full outer join cases +statement ok +set datafusion.execution.target_partitions = 1; + +### Limit pushdown through join + +# Note we use csv as MemoryExec does not support limit push down (so doesn't manifest +# bugs if limits are improperly pushed down) +query I +COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv' +STORED AS CSV +---- +5 + +# store t2 in different order so the top N rows are not the same as the top N rows of t1 +query I +COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv' +STORED AS CSV +---- +5 + +statement ok +create external table t1(a int) stored as CSV location 'test_files/scratch/limit/t1.csv'; + +statement ok +create external table t2(b int) stored as CSV location 'test_files/scratch/limit/t2.csv'; + +###### +## LEFT JOIN w/ LIMIT +###### +query II +select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +2 2 +1 1 + +# the output of this query should be two rows from the previous query +# there should be no nulls +query II +select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +2 2 +1 1 + +# can only push down to t1 (preserved side) +query TT +explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Left Join: t1.a = t2.b +03)----Limit: skip=0, fetch=2 +04)------TableScan: t1 projection=[a], fetch=2 +05)----TableScan: t2 projection=[b] +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)] +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], limit=2, has_header=true +04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true + +###### +## RIGHT JOIN w/ LIMIT +###### + +query II +select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +5 5 +4 4 + +# the output of this query should be two rows from the previous query +# there should be no nulls +query II +select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +5 5 +4 4 + +# can only push down to t2 (preserved side) +query TT +explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Right Join: t1.a = t2.b +03)----TableScan: t1 projection=[a] +04)----Limit: skip=0, fetch=2 +05)------TableScan: t2 projection=[b], fetch=2 +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)] +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true +04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], limit=2, has_header=true + +###### +## FULL JOIN w/ LIMIT +###### +query II rowsort +select * from t1 FULL JOIN t2 ON t1.a = t2.b; +---- +1 1 +2 2 +3 3 +4 4 +5 5 + +# the output of this query should be two rows from the previous query +# there should be no nulls +# Reproducer for https://github.com/apache/datafusion/issues/14335 +query II +select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +5 5 +4 4 + + +# can't push limit for full outer join +query TT +explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Full Join: t1.a = t2.b +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[b] +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)] +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true +04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true + +statement ok +drop table t1; + +statement ok +drop table t2; + # Test Utf8View as Join Key # Issue: https://github.com/apache/datafusion/issues/12468 statement ok From 7cb9e334d1289baab1891b970bf6709c7587270c Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 28 Jan 2025 21:13:12 +0800 Subject: [PATCH 2/3] Fix minor slt testing --- datafusion/sqllogictest/test_files/joins.slt | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index ac02aeb6fea4e..79006bdb256df 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4279,9 +4279,6 @@ physical_plan 04)----MemoryExec: partitions=1, partition_sizes=[1] ## Add more test cases for join limit pushdown -statement ok -drop table t1 - ## Test limit pushdown through OUTER JOIN including left/right and full outer join cases statement ok set datafusion.execution.target_partitions = 1; From 8b63074f68fb18916e0d8648090dfb57c5626c2c Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 28 Jan 2025 21:26:21 +0800 Subject: [PATCH 3/3] fix test --- datafusion/sqllogictest/test_files/joins.slt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 79006bdb256df..ac02aeb6fea4e 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4279,6 +4279,9 @@ physical_plan 04)----MemoryExec: partitions=1, partition_sizes=[1] ## Add more test cases for join limit pushdown +statement ok +drop table t1 + ## Test limit pushdown through OUTER JOIN including left/right and full outer join cases statement ok set datafusion.execution.target_partitions = 1;