From c680f0d913218cfc9146c3d2d995b215fa066ab2 Mon Sep 17 00:00:00 2001 From: Helgi Sigurbjarnarson Date: Fri, 20 Feb 2026 04:09:57 -0800 Subject: [PATCH] Extend dynamic filter to joins that preserve probe side ON The dynamic filter from HashJoinExec was previously gated to Inner joins only. PR #20192 refactored the join filter pushdown infrastructure, which makes extending self-generated filters to join types that preserve probe side on as defined by by `on_lr_is_preserved` function. This PR promotes that function to the `JoinType` and uses it to determine what self-generated dynamic join filter to push down. This enables dynamic filter for hash joins to Left, LeftSemi, RightSemi, LeftAnti and LeftMark. --- datafusion/common/src/join_type.rs | 29 ++ .../physical_optimizer/filter_pushdown.rs | 274 ++++++++++++++++++ datafusion/optimizer/src/push_down_filter.rs | 22 +- .../physical-plan/src/joins/hash_join/exec.rs | 5 +- .../dynamic_filter_pushdown_config.slt | 99 ++++++- .../test_files/projection_pushdown.slt | 2 +- 6 files changed, 391 insertions(+), 40 deletions(-) diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index e6a90db2dc3eb..8855e993f2bc7 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -97,6 +97,35 @@ impl JoinType { } } + /// Whether each side of the join is preserved for ON-clause filter pushdown. + /// + /// It is only correct to push ON-clause filters below a join for preserved + /// inputs. + /// + /// # "Preserved" input definition + /// + /// A join side is preserved if the join returns all or a subset of the rows + /// from that side, such that each output row directly maps to an input row. + /// If a side is not preserved, the join can produce extra null rows that + /// don't map to any input row. + /// + /// # Return Value + /// + /// A tuple of booleans - (left_preserved, right_preserved). + pub fn on_lr_is_preserved(&self) -> (bool, bool) { + match self { + JoinType::Inner => (true, true), + JoinType::Left => (false, true), + JoinType::Right => (true, false), + JoinType::Full => (false, false), + JoinType::LeftSemi | JoinType::RightSemi => (true, true), + JoinType::LeftAnti => (false, true), + JoinType::RightAnti => (true, false), + JoinType::LeftMark => (false, true), + JoinType::RightMark => (true, false), + } + } + /// Does the join type support swapping inputs? pub fn supports_swap(&self) -> bool { matches!( diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 99db81d34d8fa..ac64727481148 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() { ]; assert_batches_eq!(expected, &result); } + +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_left_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values (some won't match) + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap(), + ]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with Left join and CollectLeft mode + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + Arc::clone(&probe_scan), + on, + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ) as Arc; + + // Expect the dynamic filter predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + ", + ); + + // Actually apply the optimization and execute the plan + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + // Test that dynamic filter linking survives with_new_children + let children = plan.children().into_iter().map(Arc::clone).collect(); + let plan = plan.with_new_children(children).unwrap(); + + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // After execution, verify the dynamic filter was populated with bounds and IN-list + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + " + ); + + // Verify result correctness: left join preserves all build (left) rows. + // All build rows match probe rows here, so we get 2 matched rows. + // The dynamic filter pruned unmatched probe rows (ac, ad) at scan time, + // which is safe because those probe rows would never match any build row. + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + insta::assert_snapshot!( + result, + @r" + +----+----+-----+----+----+-----+ + | a | b | c | a | b | e | + +----+----+-----+----+----+-----+ + | aa | ba | 1.0 | aa | ba | 1.0 | + | ab | bb | 2.0 | ab | bb | 2.0 | + +----+----+-----+----+----+-----+ + " + ); +} + +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values (some won't match) + let probe_batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap(), + ]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with LeftSemi join and CollectLeft mode + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + Arc::clone(&probe_scan), + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ) as Arc; + + // Expect the dynamic filter predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + ", + ); + + // Actually apply the optimization and execute the plan + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + + // Test that dynamic filter linking survives with_new_children + let children = plan.children().into_iter().map(Arc::clone).collect(); + let plan = plan.with_new_children(children).unwrap(); + + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // After execution, verify the dynamic filter was populated with bounds and IN-list + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + " + ); + + // Verify result correctness: left semi join returns only build (left) rows + // that have at least one matching probe row. Output schema is build-side columns only. + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + insta::assert_snapshot!( + result, + @r" + +----+----+-----+ + | a | b | c | + +----+----+-----+ + | aa | ba | 1.0 | + | ab | bb | 2.0 | + +----+----+-----+ + " + ); +} diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index b1c0960386c2c..f1664f267b298 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -176,27 +176,9 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { } } -/// For a given JOIN type, determine whether each input of the join is preserved -/// for the join condition (`ON` clause filters). -/// -/// It is only correct to push filters below a join for preserved inputs. -/// -/// # Return Value -/// A tuple of booleans - (left_preserved, right_preserved). -/// -/// See [`lr_is_preserved`] for a definition of "preserved". +/// See [`JoinType::on_lr_is_preserved`] for details. pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) { - match join_type { - JoinType::Inner => (true, true), - JoinType::Left => (false, true), - JoinType::Right => (true, false), - JoinType::Full => (false, false), - JoinType::LeftSemi | JoinType::RightSemi => (true, true), - JoinType::LeftAnti => (false, true), - JoinType::RightAnti => (true, false), - JoinType::LeftMark => (false, true), - JoinType::RightMark => (true, false), - } + join_type.on_lr_is_preserved() } /// Evaluates the columns referenced in the given expression to see if they refer diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index eda7e93effa2c..b6ce472bbef85 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -837,9 +837,8 @@ impl HashJoinExec { } fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { - if self.join_type != JoinType::Inner - || !config.optimizer.enable_join_dynamic_filter_pushdown - { + let (_, probe_preserved) = self.join_type.on_lr_is_preserved(); + if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown { return false; } diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 275b0c9dd490f..9e8673cef6607 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -222,8 +222,7 @@ ORDER BY l.id; 5 left5 right5 # RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet, probe=left_parquet). -# No self-generated dynamic filter (only Inner joins get that), but parent filters -# on the preserved (build) side can still push down. +# Physical Left join generates a self-generated dynamic filter on the probe side. query TT EXPLAIN SELECT l.*, r.info FROM left_parquet l @@ -240,7 +239,7 @@ physical_plan 01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] 02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] # RIGHT JOIN correctness: all right rows appear, unmatched left rows produce NULLs query ITT @@ -272,9 +271,8 @@ physical_plan 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet -# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet, probe=left_parquet). -# No self-generated dynamic filter (only Inner joins), but parent filters on -# the preserved (probe) side can push down. +# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet, probe=left_parquet) +# and pushes the self-generated filter to the right side (left parquet). query TT EXPLAIN SELECT l.* FROM left_parquet l @@ -290,10 +288,40 @@ logical_plan physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] -# LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push -# to the preserved (left/build) side. +# LEFT SEMI JOIN (physical LeftSemi): reverse table roles so optimizer keeps LeftSemi +# (right_parquet has 3 rows < left_parquet has 5 rows, so no swap occurs). +# Physical LeftSemi generates a self-generated dynamic filter on the probe side. +query TT +EXPLAIN SELECT r.* +FROM right_parquet r +WHERE r.id IN (SELECT l.id FROM left_parquet l); +---- +logical_plan +01)LeftSemi Join: r.id = __correlated_sq_1.id +02)--SubqueryAlias: r +03)----TableScan: right_parquet projection=[id, info] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: l +06)------TableScan: left_parquet projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] + +# LEFT SEMI (physical LeftSemi) correctness: only right rows with matching left ids +query IT rowsort +SELECT r.* +FROM right_parquet r +WHERE r.id IN (SELECT l.id FROM left_parquet l); +---- +1 right1 +3 right3 +5 right5 + +# LEFT ANTI JOIN: both self generated and parent filters can push to the +# preserved (left/build) side. query TT EXPLAIN SELECT l.* FROM left_parquet l @@ -309,13 +337,50 @@ logical_plan physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] + +# LEFT MARK JOIN: the OR prevents decorrelation to LeftSemi, so the optimizer +# uses LeftMark. Self-generated dynamic filter pushes to the probe side. +query TT +EXPLAIN SELECT r.id, r.info +FROM right_parquet r +WHERE EXISTS (SELECT 1 FROM left_parquet l WHERE r.id = l.id) + OR r.id = 999; +---- +logical_plan +01)Projection: r.id, r.info +02)--Filter: __correlated_sq_1.mark OR r.id = Int32(999) +03)----LeftMark Join: r.id = __correlated_sq_1.id +04)------SubqueryAlias: r +05)--------TableScan: right_parquet projection=[id, info] +06)------SubqueryAlias: __correlated_sq_1 +07)--------SubqueryAlias: l +08)----------TableScan: left_parquet projection=[id] +physical_plan +01)FilterExec: mark@2 OR id@0 = 999, projection=[id@0, info@1] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] + +# LEFT MARK correctness: all right rows match EXISTS, so all 3 appear +query IT rowsort +SELECT r.id, r.info +FROM right_parquet r +WHERE EXISTS (SELECT 1 FROM left_parquet l WHERE r.id = l.id) + OR r.id = 999; +---- +1 right1 +3 right3 +5 right5 # Test 2c: Parent dynamic filter (from TopK) pushed through semi/anti joins # Sort on the join key (id) so the TopK dynamic filter pushes to BOTH sides. -# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join key) -# that pushes through the RightSemi join to both the build and probe sides. +# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join +# key) that pushes through the RightSemi join to both the build and probe sides +# as well as the HashJoinExec pushing the self-generated filter to the +# right-hand side of the join. query TT EXPLAIN SELECT l.* FROM left_parquet l @@ -334,7 +399,7 @@ physical_plan 01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] # Correctness check query IT @@ -346,8 +411,10 @@ ORDER BY l.id LIMIT 2; 1 left1 3 left3 -# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join key) -# that pushes through the LeftAnti join to both the preserved and non-preserved sides. +# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join +# key) that pushes through the LeftAnti join to both the preserved and +# non-preserved sides. The HashJoin pushes the self-generated filter to the +# right hand side of the LeftAnti join. query TT EXPLAIN SELECT l.* FROM left_parquet l @@ -366,7 +433,7 @@ physical_plan 01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] 02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] # Correctness check query IT diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt b/datafusion/sqllogictest/test_files/projection_pushdown.slt index dbb77b33c21b7..1c89923080b69 100644 --- a/datafusion/sqllogictest/test_files/projection_pushdown.slt +++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt @@ -1604,7 +1604,7 @@ physical_plan 02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@1, id@0)], projection=[__datafusion_extracted_2@0, id@1, __datafusion_extracted_3@3] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_2, id], file_type=parquet 04)----FilterExec: __datafusion_extracted_1@0 > 5, projection=[id@1, __datafusion_extracted_3@2] -05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_1, id, get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_1, id, get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify correctness - left join with level > 5 condition # Only join_right rows with level > 5 are matched: id=1 (level=10), id=4 (level=8)