From 09e51781602d6c68ceb583d21b4dbcea795155c6 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 6 Feb 2026 22:37:49 +0000 Subject: [PATCH 1/6] Support parent dynamic filters for more join types --- .../physical_optimizer/filter_pushdown.rs | 165 ++++++++++++++++-- .../physical-plan/src/filter_pushdown.rs | 85 +++++++-- .../physical-plan/src/joins/hash_join/exec.rs | 113 ++++++++---- datafusion/physical-plan/src/projection.rs | 2 +- .../dynamic_filter_pushdown_config.slt | 124 +++++++++++++ 5 files changed, 428 insertions(+), 61 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index b3ed8d9653fe1..783babf0b2de1 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -401,7 +401,8 @@ async fn test_static_filter_pushdown_through_hash_join() { " ); - // Test left join - filters should NOT be pushed down + // Test left join: filter on preserved (build) side is pushed down, + // filter on non-preserved (probe) side is NOT pushed down. let join = Arc::new( HashJoinExec::try_new( TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -425,25 +426,30 @@ async fn test_static_filter_pushdown_through_hash_join() { ); let join_schema = join.schema(); - let filter = col_lit_predicate("a", "aa", &join_schema); - let plan = - Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc; + // Filter on build side column (preserved): should be pushed down + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column (not preserved): should NOT be pushed down + let right_filter = col_lit_predicate("e", "ba", &join_schema); + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()) + as Arc; - // Test that filters are NOT pushed down for left join insta::assert_snapshot!( OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: - - FilterExec: a@0 = aa - - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + - FilterExec: e@4 = ba + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true output: Ok: - - FilterExec: a@0 = aa + - FilterExec: e@4 = ba - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true " ); @@ -1722,6 +1728,143 @@ async fn test_hashjoin_parent_filter_pushdown() { ); } +#[test] +fn test_hashjoin_parent_filter_pushdown_same_column_names() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("build_val", DataType::Utf8, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .build(); + + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("probe_val", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("id", &build_side_schema).unwrap(), + col("id", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + + let build_id_filter = col_lit_predicate("id", "aa", &join_schema); + let probe_val_filter = col_lit_predicate("probe_val", "x", &join_schema); + + let filter = + Arc::new(FilterExec::try_new(build_id_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(probe_val_filter, filter).unwrap()) + as Arc; + + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: probe_val@3 = x + - FilterExec: id@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true, predicate=id@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true, predicate=probe_val@1 = x + " + ); +} + +#[test] +fn test_hashjoin_parent_filter_pushdown_mark_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let left_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("val", DataType::Utf8, false), + ])); + let left_scan = TestScanBuilder::new(Arc::clone(&left_schema)) + .with_support(true) + .build(); + + let right_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)])); + let right_scan = TestScanBuilder::new(Arc::clone(&right_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("id", &left_schema).unwrap(), + col("id", &right_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + on, + None, + &JoinType::LeftMark, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + + let left_filter = col_lit_predicate("val", "x", &join_schema); + let mark_filter = col_lit_predicate("mark", true, &join_schema); + + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(mark_filter, filter).unwrap()) + as Arc; + + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: mark@2 = true + - FilterExec: val@1 = x + - HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: mark@2 = true + - HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true, predicate=val@1 = x + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true + " + ); +} + /// Integration test for dynamic filter pushdown with TopK. /// We use an integration test because there are complex interactions in the optimizer rules /// that the unit tests applying a single optimizer rule do not cover. diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 689f629f7bac8..c63e67ac78748 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -44,7 +44,6 @@ use datafusion_common::{ }; use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use itertools::Itertools; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterPushdownPhase { @@ -317,7 +316,8 @@ pub struct ChildFilterDescription { /// exist in the target schema. If any column in the filter is not present /// in the schema, the filter cannot be pushed down to that child. pub(crate) struct FilterColumnChecker<'a> { - column_names: HashSet<&'a str>, + /// The set of `(column_name, column_index)` pairs that are allowed. + allowed_columns: HashSet<(&'a str, usize)>, } impl<'a> FilterColumnChecker<'a> { @@ -325,13 +325,24 @@ impl<'a> FilterColumnChecker<'a> { /// /// Extracts all column names from the schema's fields to build /// a lookup set for efficient column existence checks. - pub(crate) fn new(input_schema: &'a Schema) -> Self { - let column_names: HashSet<&str> = input_schema + pub(crate) fn new_schema(input_schema: &'a Schema) -> Self { + let allowed_columns: HashSet<(&str, usize)> = input_schema .fields() .iter() - .map(|f| f.name().as_str()) + .enumerate() + .map(|(i, f)| (f.name().as_str(), i)) .collect(); - Self { column_names } + Self { allowed_columns } + } + + /// Creates a new [`FilterColumnChecker`] from an explicit set of + /// allowed `(name, index)` pairs. + /// + /// This is used by join nodes that need to restrict pushdown to columns + /// belonging to a specific side of the join, even when different sides + /// have columns with the same name (e.g. nested mark joins). + pub(crate) fn new_columns(allowed_columns: HashSet<(&'a str, usize)>) -> Self { + Self { allowed_columns } } /// Checks whether a filter expression can be pushed down to the child @@ -347,7 +358,9 @@ impl<'a> FilterColumnChecker<'a> { filter .apply(|expr| { if let Some(column) = expr.as_any().downcast_ref::() - && !self.column_names.contains(column.name()) + && !self + .allowed_columns + .contains(&(column.name(), column.index())) { can_apply = false; return Ok(TreeNodeRecursion::Stop); @@ -375,7 +388,7 @@ impl ChildFilterDescription { let child_schema = child.schema(); // Build a set of column names in the child schema for quick lookup - let checker = FilterColumnChecker::new(&child_schema); + let checker = FilterColumnChecker::new_schema(&child_schema); // Analyze each parent filter let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); @@ -401,6 +414,52 @@ impl ChildFilterDescription { }) } + /// Like [`Self::from_child`], but restricts which parent-level columns are + /// considered reachable through this child. + /// + /// `allowed_columns` is a set of `(column_name, column_index)` pairs + /// (in the *parent* schema) that map to this child's side of a join. + /// A filter is only eligible for pushdown when **every** column it + /// references appears in `allowed_columns`. This prevents incorrect + /// pushdown when different join sides have columns with the same name + pub fn from_child_with_allowed_columns( + parent_filters: &[Arc], + allowed_columns: HashSet<(&str, usize)>, + child: &Arc, + ) -> Result { + let child_schema = child.schema(); + let checker = FilterColumnChecker::new_columns(allowed_columns); + + let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); + for filter in parent_filters { + if checker.can_pushdown(filter) { + let reassigned_filter = + reassign_expr_columns(Arc::clone(filter), &child_schema)?; + child_parent_filters + .push(PushedDownPredicate::supported(reassigned_filter)); + } else { + child_parent_filters + .push(PushedDownPredicate::unsupported(Arc::clone(filter))); + } + } + + Ok(Self { + parent_filters: child_parent_filters, + self_filters: vec![], + }) + } + + /// Mark all parent filters as unsupported for this child. + pub fn all_unsupported(parent_filters: &[Arc]) -> Self { + Self { + parent_filters: parent_filters + .iter() + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) + .collect(), + self_filters: vec![], + } + } + /// Add a self filter (from the current node) to be pushed down to this child. pub fn with_self_filter(mut self, filter: Arc) -> Self { self.self_filters.push(filter); @@ -476,15 +535,9 @@ impl FilterDescription { children: &[&Arc], ) -> Self { let mut desc = Self::new(); - let child_filters = parent_filters - .iter() - .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) - .collect_vec(); for _ in 0..children.len() { - desc = desc.with_child(ChildFilterDescription { - parent_filters: child_filters.clone(), - self_filters: vec![], - }); + desc = + desc.with_child(ChildFilterDescription::all_unsupported(parent_filters)); } desc } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6b18b56413b71..44c09e2a3cfc8 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -24,7 +24,7 @@ use std::{any::Any, vec}; use crate::ExecutionPlanProperties; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; use crate::joins::Map; @@ -720,7 +720,9 @@ impl HashJoinExec { } fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { - if !config.optimizer.enable_join_dynamic_filter_pushdown { + if self.join_type != JoinType::Inner + || !config.optimizer.enable_join_dynamic_filter_pushdown + { return false; } @@ -1407,26 +1409,68 @@ impl ExecutionPlan for HashJoinExec { parent_filters: Vec>, config: &ConfigOptions, ) -> Result { - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs - // See https://github.com/apache/datafusion/issues/16973 for tracking. - if self.join_type != JoinType::Inner { - return Ok(FilterDescription::all_unsupported( - &parent_filters, - &self.children(), - )); + // This is the physical-plan equivalent of `push_down_all_join` in + // `datafusion/optimizer/src/push_down_filter.rs`. That function uses `lr_is_preserved` + // to decide which parent predicates can be pushed past a logical join to its children, + // then checks column references to route each predicate to the correct side. + // + // We apply the same two-level logic here: + // 1. `lr_is_preserved` gates whether a side is eligible at all. + // 2. For each filter, we check that all column references belong to the + // target child (using `column_indices` to map output column positions + // to join sides). This is critical for correctness: name-based matching + // alone (as done by `ChildFilterDescription::from_child`) can incorrectly + // push filters when different join sides have columns with the same name + // (e.g. nested mark joins both producing "mark" columns). + let (left_preserved, right_preserved) = lr_is_preserved(self.join_type); + + // Build the set of allowed (name, index) pairs for each side + let output_schema = self.schema(); + let output_fields = output_schema.fields(); + let column_indices: Vec = match self.projection.as_ref() { + Some(projection) => projection + .iter() + .map(|i| self.column_indices[*i].clone()) + .collect(), + None => self.column_indices.clone(), + }; + + let mut left_allowed = std::collections::HashSet::new(); + let mut right_allowed = std::collections::HashSet::new(); + for (i, ci) in column_indices.iter().enumerate() { + let name = output_fields[i].name().as_str(); + match ci.side { + JoinSide::Left => { + left_allowed.insert((name, i)); + } + JoinSide::Right => { + right_allowed.insert((name, i)); + } + JoinSide::None => { + // Mark columns - don't allow pushdown to either side + } + } } - // Get basic filter descriptions for both children - let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, - self.left(), - )?; - let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, - self.right(), - )?; + let left_child = if left_preserved { + ChildFilterDescription::from_child_with_allowed_columns( + &parent_filters, + left_allowed, + self.left(), + )? + } else { + ChildFilterDescription::all_unsupported(&parent_filters) + }; + + let mut right_child = if right_preserved { + ChildFilterDescription::from_child_with_allowed_columns( + &parent_filters, + right_allowed, + self.right(), + )? + } else { + ChildFilterDescription::all_unsupported(&parent_filters) + }; // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) @@ -1448,19 +1492,6 @@ impl ExecutionPlan for HashJoinExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for - // non-inner joins in `gather_filters_for_pushdown`. - // However it's a cheap check and serves to inform future devs touching this function that they need to be really - // careful pushing down filters through non-inner joins. - if self.join_type != JoinType::Inner { - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs - return Ok(FilterPushdownPropagation::all_unsupported( - child_pushdown_result, - )); - } - let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child @@ -1501,6 +1532,22 @@ impl ExecutionPlan for HashJoinExec { } } +/// Determines which sides of a join are "preserved" for filter pushdown. +/// +/// A preserved side means filters on that side's columns can be safely pushed +/// below the join. This mirrors the logic in the logical optimizer's +/// `lr_is_preserved` in `datafusion/optimizer/src/push_down_filter.rs`. +fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { + match join_type { + JoinType::Inner => (true, true), + JoinType::Left => (true, false), + JoinType::Right => (false, true), + JoinType::Full => (false, false), + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), + } +} + /// Accumulator for collecting min/max bounds from build-side data during hash join. /// /// This struct encapsulates the logic for progressively computing column bounds diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 76711a8f835f4..573f4ad0afed3 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -384,7 +384,7 @@ impl ExecutionPlan for ProjectionExec { // expand alias column to original expr in parent filters let invert_alias_map = self.collect_reverse_alias()?; let output_schema = self.schema(); - let checker = FilterColumnChecker::new(&output_schema); + let checker = FilterColumnChecker::new_schema(&output_schema); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 1b037ee2b83af..dbc8a4ab8dc07 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -187,6 +187,130 @@ physical_plan statement ok SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; +# Test 2b: Dynamic filter pushdown for non-inner join types +# LEFT JOIN: optimizer swaps to physical Right join (build=right_parquet, probe=left_parquet). +# Dynamic filter is NOT pushed because Right join needs all probe rows in output. +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +LEFT JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Left Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# LEFT JOIN correctness: all left rows appear, unmatched right rows produce NULLs +query ITT +SELECT l.id, l.data, r.info +FROM left_parquet l +LEFT JOIN right_parquet r ON l.id = r.id +ORDER BY l.id; +---- +1 left1 right1 +2 left2 NULL +3 left3 right3 +4 left4 NULL +5 left5 right5 + +# RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet, probe=left_parquet). +# No self-generated dynamic filter (only Inner joins get that), but parent filters +# on the preserved (build) side can still push down. +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +RIGHT JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Right Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# RIGHT JOIN correctness: all right rows appear, unmatched left rows produce NULLs +query ITT +SELECT l.id, l.data, r.info +FROM left_parquet l +RIGHT JOIN right_parquet r ON l.id = r.id +ORDER BY r.id; +---- +1 left1 right1 +3 left3 right3 +5 left5 right5 + +# FULL JOIN: dynamic filter should NOT be pushed (both sides must preserve all rows) +query TT +EXPLAIN SELECT l.id, r.id as rid, l.data, r.info +FROM left_parquet l +FULL JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, r.id AS rid, l.data, r.info +02)--Full Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@2 as id, id@0 as rid, data@3 as data, info@1 as info] +02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet, probe=left_parquet). +# No self-generated dynamic filter (only Inner joins), but parent filters on +# the preserved (probe) side can push down. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id IN (SELECT r.id FROM right_parquet r); +---- +logical_plan +01)LeftSemi Join: l.id = __correlated_sq_1.id +02)--SubqueryAlias: l +03)----TableScan: left_parquet projection=[id, data] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push +# to the preserved (left/build) side. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id NOT IN (SELECT r.id FROM right_parquet r); +---- +logical_plan +01)LeftAnti Join: l.id = __correlated_sq_1.id +02)--SubqueryAlias: l +03)----TableScan: left_parquet projection=[id, data] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet + # Test 3: Test independent control # Disable TopK, keep Join enabled From d2d0c71961273d72bd39fa896c382bef91baabf9 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Thu, 12 Feb 2026 09:20:16 +0000 Subject: [PATCH 2/6] Use index check only for joins, add slt proving we close #20213 --- .../physical-plan/src/filter_pushdown.rs | 65 +++++++++---- .../physical-plan/src/joins/hash_join/exec.rs | 14 +++ datafusion/physical-plan/src/projection.rs | 2 +- .../dynamic_filter_pushdown_config.slt | 91 +++++++++++++++++++ 4 files changed, 151 insertions(+), 21 deletions(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index c63e67ac78748..f7b8214b086be 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -316,8 +316,8 @@ pub struct ChildFilterDescription { /// exist in the target schema. If any column in the filter is not present /// in the schema, the filter cannot be pushed down to that child. pub(crate) struct FilterColumnChecker<'a> { - /// The set of `(column_name, column_index)` pairs that are allowed. - allowed_columns: HashSet<(&'a str, usize)>, + /// Column names that are allowed. + column_names: HashSet<&'a str>, } impl<'a> FilterColumnChecker<'a> { @@ -325,24 +325,13 @@ impl<'a> FilterColumnChecker<'a> { /// /// Extracts all column names from the schema's fields to build /// a lookup set for efficient column existence checks. - pub(crate) fn new_schema(input_schema: &'a Schema) -> Self { - let allowed_columns: HashSet<(&str, usize)> = input_schema + pub(crate) fn new(input_schema: &'a Schema) -> Self { + let column_names: HashSet<&str> = input_schema .fields() .iter() - .enumerate() - .map(|(i, f)| (f.name().as_str(), i)) + .map(|f| f.name().as_str()) .collect(); - Self { allowed_columns } - } - - /// Creates a new [`FilterColumnChecker`] from an explicit set of - /// allowed `(name, index)` pairs. - /// - /// This is used by join nodes that need to restrict pushdown to columns - /// belonging to a specific side of the join, even when different sides - /// have columns with the same name (e.g. nested mark joins). - pub(crate) fn new_columns(allowed_columns: HashSet<(&'a str, usize)>) -> Self { - Self { allowed_columns } + Self { column_names } } /// Checks whether a filter expression can be pushed down to the child @@ -353,6 +342,42 @@ impl<'a> FilterColumnChecker<'a> { /// /// This method traverses the entire expression tree, checking each /// column reference against the available column names. + pub(crate) fn can_pushdown(&self, filter: &Arc) -> bool { + let mut can_apply = true; + filter + .apply(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() + && !self.column_names.contains(column.name()) + { + can_apply = false; + return Ok(TreeNodeRecursion::Stop); + } + + Ok(TreeNodeRecursion::Continue) + }) + .expect("infallible traversal"); + can_apply + } +} + +/// Like [`FilterColumnChecker`] but matches on both column name and index. +/// +/// This is used by join nodes that need to restrict pushdown to columns +/// belonging to a specific side of the join, even when different sides +/// have columns with the same name (e.g. nested mark joins producing +/// identically-named "mark" columns). +pub(crate) struct FilterColumnIndexChecker<'a> { + /// The set of `(column_name, column_index)` pairs that are allowed. + allowed_columns: HashSet<(&'a str, usize)>, +} + +impl<'a> FilterColumnIndexChecker<'a> { + pub(crate) fn new(allowed_columns: HashSet<(&'a str, usize)>) -> Self { + Self { allowed_columns } + } + + /// Checks whether a filter expression can be pushed down, requiring + /// that every [`Column`] reference matches an allowed `(name, index)` pair. pub(crate) fn can_pushdown(&self, filter: &Arc) -> bool { let mut can_apply = true; filter @@ -387,8 +412,8 @@ impl ChildFilterDescription { ) -> Result { let child_schema = child.schema(); - // Build a set of column names in the child schema for quick lookup - let checker = FilterColumnChecker::new_schema(&child_schema); + // Build a set of column (name, index) pairs in the child schema for quick lookup + let checker = FilterColumnChecker::new(&child_schema); // Analyze each parent filter let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); @@ -428,7 +453,7 @@ impl ChildFilterDescription { child: &Arc, ) -> Result { let child_schema = child.schema(); - let checker = FilterColumnChecker::new_columns(allowed_columns); + let checker = FilterColumnIndexChecker::new(allowed_columns); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 44c09e2a3cfc8..e7c66d2cdd62d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -5762,4 +5762,18 @@ mod tests { .contains("null_aware anti join only supports single column join key") ); } + + #[test] + fn test_lr_is_preserved() { + assert_eq!(lr_is_preserved(JoinType::Inner), (true, true)); + assert_eq!(lr_is_preserved(JoinType::Left), (true, false)); + assert_eq!(lr_is_preserved(JoinType::Right), (false, true)); + assert_eq!(lr_is_preserved(JoinType::Full), (false, false)); + assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, false)); + assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, false)); + assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false)); + assert_eq!(lr_is_preserved(JoinType::RightSemi), (false, true)); + assert_eq!(lr_is_preserved(JoinType::RightAnti), (false, true)); + assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true)); + } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 573f4ad0afed3..76711a8f835f4 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -384,7 +384,7 @@ impl ExecutionPlan for ProjectionExec { // expand alias column to original expr in parent filters let invert_alias_map = self.collect_reverse_alias()?; let output_schema = self.schema(); - let checker = FilterColumnChecker::new_schema(&output_schema); + let checker = FilterColumnChecker::new(&output_schema); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index dbc8a4ab8dc07..dd6aea1984338 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -562,6 +562,97 @@ physical_plan 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +# Test 6: Regression test for issue #20213 - dynamic filter applied to wrong table +# when subquery join has same column names on both sides. +# +# The bug: when an outer join pushes a DynamicFilter for column "k" through an +# inner join where both sides have a column named "k", the name-based routing +# incorrectly pushed the filter to BOTH sides instead of only the correct one. +# With small row groups this caused wrong results (0 rows instead of expected). + +# Create tables with same column names (k, v) on both sides +statement ok +CREATE TABLE issue_20213_t1(k INT, v INT) AS +SELECT i as k, i as v FROM generate_series(1, 1000) t(i); + +statement ok +CREATE TABLE issue_20213_t2(k INT, v INT) AS +SELECT i + 100 as k, i as v FROM generate_series(1, 100) t(i); + +# Use small row groups to trigger statistics-based pruning +statement ok +SET datafusion.execution.parquet.max_row_group_size = 10; + +query I +COPY issue_20213_t1 TO 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t1.parquet' STORED AS PARQUET; +---- +1000 + +query I +COPY issue_20213_t2 TO 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t2.parquet' STORED AS PARQUET; +---- +100 + +# Reset row group size +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1000000; + +statement ok +CREATE EXTERNAL TABLE t1_20213(k INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t1.parquet'; + +statement ok +CREATE EXTERNAL TABLE t2_20213(k INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t2.parquet'; + +# The query from issue #20213: subquery joins t1 and t2 on v, then outer +# join uses t2's k column. The dynamic filter on k from the outer join +# must only apply to t2 (k range 101-200), NOT to t1 (k range 1-1000). +query I +SELECT count(*) FROM ( + SELECT t2_20213.k as k, t1_20213.k as k2 + FROM t1_20213 + JOIN t2_20213 ON t1_20213.v = t2_20213.v +) a +JOIN t2_20213 b ON a.k = b.k +WHERE b.v < 10; +---- +9 + +# Also verify with SELECT * to catch row-level correctness +query IIII rowsort +SELECT * FROM ( + SELECT t2_20213.k as k, t1_20213.k as k2 + FROM t1_20213 + JOIN t2_20213 ON t1_20213.v = t2_20213.v +) a +JOIN t2_20213 b ON a.k = b.k +WHERE b.v < 10; +---- +101 1 101 1 +102 2 102 2 +103 3 103 3 +104 4 104 4 +105 5 105 5 +106 6 106 6 +107 7 107 7 +108 8 108 8 +109 9 109 9 + +statement ok +DROP TABLE issue_20213_t1; + +statement ok +DROP TABLE issue_20213_t2; + +statement ok +DROP TABLE t1_20213; + +statement ok +DROP TABLE t2_20213; + # Cleanup statement ok From 7c954bd70b78cb321a0582ef44374fd95254771b Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Thu, 12 Feb 2026 15:21:47 +0000 Subject: [PATCH 3/6] Refactor FilterColumnChecker -> FilterRemapper --- .../physical-plan/src/filter_pushdown.rs | 207 +++++++++--------- .../physical-plan/src/joins/hash_join/exec.rs | 13 +- datafusion/physical-plan/src/projection.rs | 25 +-- 3 files changed, 119 insertions(+), 126 deletions(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index f7b8214b086be..44cbf1abbc3a4 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -37,7 +37,7 @@ use std::collections::HashSet; use std::sync::Arc; -use arrow_schema::Schema; +use arrow_schema::SchemaRef; use datafusion_common::{ Result, tree_node::{TreeNode, TreeNodeRecursion}, @@ -309,92 +309,109 @@ pub struct ChildFilterDescription { pub(crate) self_filters: Vec>, } -/// A utility for checking whether a filter expression can be pushed down -/// to a child node based on column availability. +/// Validates and remaps filter column references to a target schema in one step. /// -/// This checker validates that all columns referenced in a filter expression -/// exist in the target schema. If any column in the filter is not present -/// in the schema, the filter cannot be pushed down to that child. -pub(crate) struct FilterColumnChecker<'a> { - /// Column names that are allowed. - column_names: HashSet<&'a str>, +/// When pushing filters from a parent to a child node, we need to: +/// 1. Verify that all columns referenced by the filter exist in the target +/// 2. Remap column indices to match the target schema +/// +/// For join nodes, an additional constraint is needed: only columns belonging +/// to a specific side of the join should be considered valid. This is +/// controlled by an optional set of allowed column indices (in the parent +/// schema). When provided, a filter is only eligible if every column +/// reference's index appears in the allowed set. +pub(crate) struct FilterRemapper { + /// The target schema to remap column indices into. + child_schema: SchemaRef, + /// If set, only columns at these indices (in the *parent* schema) are + /// considered valid. When `None`, any column whose name exists in + /// `child_schema` is valid. + allowed_indices: Option>, } -impl<'a> FilterColumnChecker<'a> { - /// Creates a new [`FilterColumnChecker`] from the given schema. +impl FilterRemapper { + /// Create a remapper that accepts any column name present in the target schema. + pub(crate) fn new(child_schema: SchemaRef) -> Self { + Self { + child_schema, + allowed_indices: None, + } + } + + /// Create a remapper that only accepts columns at the given indices. + /// This is used by join nodes to restrict pushdown to one side of the + /// join when both sides have same-named columns. + fn with_allowed_indices( + child_schema: SchemaRef, + allowed_indices: HashSet, + ) -> Self { + Self { + child_schema, + allowed_indices: Some(allowed_indices), + } + } + + /// Try to remap a filter's column references to the target schema. /// - /// Extracts all column names from the schema's fields to build - /// a lookup set for efficient column existence checks. - pub(crate) fn new(input_schema: &'a Schema) -> Self { - let column_names: HashSet<&str> = input_schema + /// Returns `Some(remapped_filter)` if all columns pass validation, + /// or `None` if any column is not valid for this target. + pub(crate) fn try_remap( + &self, + filter: &Arc, + ) -> Result>> { + if self.all_columns_in_schema(filter) + && self + .allowed_indices + .as_ref() + .map(|allowed| Self::all_columns_in_set(filter, allowed)) + .unwrap_or(true) + { + let remapped = reassign_expr_columns(Arc::clone(filter), &self.child_schema)?; + Ok(Some(remapped)) + } else { + Ok(None) + } + } + + fn all_columns_in_schema(&self, filter: &Arc) -> bool { + let names: HashSet<&str> = self + .child_schema .fields() .iter() .map(|f| f.name().as_str()) .collect(); - Self { column_names } - } - - /// Checks whether a filter expression can be pushed down to the child - /// whose schema was used to create this checker. - /// - /// Returns `true` if all [`Column`] references in the filter expression - /// exist in the target schema, `false` otherwise. - /// - /// This method traverses the entire expression tree, checking each - /// column reference against the available column names. - pub(crate) fn can_pushdown(&self, filter: &Arc) -> bool { - let mut can_apply = true; + let mut ok = true; filter - .apply(|expr| { - if let Some(column) = expr.as_any().downcast_ref::() - && !self.column_names.contains(column.name()) + .apply(|e| { + if let Some(col) = e.as_any().downcast_ref::() + && !names.contains(col.name()) { - can_apply = false; + ok = false; return Ok(TreeNodeRecursion::Stop); } - Ok(TreeNodeRecursion::Continue) }) .expect("infallible traversal"); - can_apply + ok } -} - -/// Like [`FilterColumnChecker`] but matches on both column name and index. -/// -/// This is used by join nodes that need to restrict pushdown to columns -/// belonging to a specific side of the join, even when different sides -/// have columns with the same name (e.g. nested mark joins producing -/// identically-named "mark" columns). -pub(crate) struct FilterColumnIndexChecker<'a> { - /// The set of `(column_name, column_index)` pairs that are allowed. - allowed_columns: HashSet<(&'a str, usize)>, -} -impl<'a> FilterColumnIndexChecker<'a> { - pub(crate) fn new(allowed_columns: HashSet<(&'a str, usize)>) -> Self { - Self { allowed_columns } - } - - /// Checks whether a filter expression can be pushed down, requiring - /// that every [`Column`] reference matches an allowed `(name, index)` pair. - pub(crate) fn can_pushdown(&self, filter: &Arc) -> bool { - let mut can_apply = true; + fn all_columns_in_set( + filter: &Arc, + allowed: &HashSet, + ) -> bool { + let mut ok = true; filter - .apply(|expr| { - if let Some(column) = expr.as_any().downcast_ref::() - && !self - .allowed_columns - .contains(&(column.name(), column.index())) + .apply(|e| { + if let Some(col) = e.as_any().downcast_ref::() + && !allowed.contains(&col.index()) { - can_apply = false; + ok = false; return Ok(TreeNodeRecursion::Stop); } - Ok(TreeNodeRecursion::Continue) }) .expect("infallible traversal"); - can_apply + ok } } @@ -410,58 +427,40 @@ impl ChildFilterDescription { parent_filters: &[Arc], child: &Arc, ) -> Result { - let child_schema = child.schema(); - - // Build a set of column (name, index) pairs in the child schema for quick lookup - let checker = FilterColumnChecker::new(&child_schema); - - // Analyze each parent filter - let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); - - for filter in parent_filters { - if checker.can_pushdown(filter) { - // All columns exist in child - we can push down - // Need to reassign column indices to match child schema - let reassigned_filter = - reassign_expr_columns(Arc::clone(filter), &child_schema)?; - child_parent_filters - .push(PushedDownPredicate::supported(reassigned_filter)); - } else { - // Some columns don't exist in child - cannot push down - child_parent_filters - .push(PushedDownPredicate::unsupported(Arc::clone(filter))); - } - } - - Ok(Self { - parent_filters: child_parent_filters, - self_filters: vec![], - }) + let remapper = FilterRemapper::new(child.schema()); + Self::remap_filters(parent_filters, &remapper) } /// Like [`Self::from_child`], but restricts which parent-level columns are /// considered reachable through this child. /// - /// `allowed_columns` is a set of `(column_name, column_index)` pairs - /// (in the *parent* schema) that map to this child's side of a join. - /// A filter is only eligible for pushdown when **every** column it - /// references appears in `allowed_columns`. This prevents incorrect - /// pushdown when different join sides have columns with the same name - pub fn from_child_with_allowed_columns( + /// `allowed_indices` is the set of column indices (in the *parent* + /// schema) that map to this child's side of a join. A filter is only + /// eligible for pushdown when **every** column index it references + /// appears in `allowed_indices`. + /// + /// This prevents incorrect pushdown when different join sides have + /// columns with the same name: matching on index ensures a filter + /// referencing the right side's `k@2` is not pushed to the left side + /// which also has a column named `k` but at a different index. + pub fn from_child_with_allowed_indices( parent_filters: &[Arc], - allowed_columns: HashSet<(&str, usize)>, + allowed_indices: HashSet, child: &Arc, ) -> Result { - let child_schema = child.schema(); - let checker = FilterColumnIndexChecker::new(allowed_columns); + let remapper = + FilterRemapper::with_allowed_indices(child.schema(), allowed_indices); + Self::remap_filters(parent_filters, &remapper) + } + fn remap_filters( + parent_filters: &[Arc], + remapper: &FilterRemapper, + ) -> Result { let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { - if checker.can_pushdown(filter) { - let reassigned_filter = - reassign_expr_columns(Arc::clone(filter), &child_schema)?; - child_parent_filters - .push(PushedDownPredicate::supported(reassigned_filter)); + if let Some(remapped) = remapper.try_remap(filter)? { + child_parent_filters.push(PushedDownPredicate::supported(remapped)); } else { child_parent_filters .push(PushedDownPredicate::unsupported(Arc::clone(filter))); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index e7c66d2cdd62d..6f9dd82e9f92a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1424,9 +1424,7 @@ impl ExecutionPlan for HashJoinExec { // (e.g. nested mark joins both producing "mark" columns). let (left_preserved, right_preserved) = lr_is_preserved(self.join_type); - // Build the set of allowed (name, index) pairs for each side - let output_schema = self.schema(); - let output_fields = output_schema.fields(); + // Build the set of allowed column indices for each side let column_indices: Vec = match self.projection.as_ref() { Some(projection) => projection .iter() @@ -1438,13 +1436,12 @@ impl ExecutionPlan for HashJoinExec { let mut left_allowed = std::collections::HashSet::new(); let mut right_allowed = std::collections::HashSet::new(); for (i, ci) in column_indices.iter().enumerate() { - let name = output_fields[i].name().as_str(); match ci.side { JoinSide::Left => { - left_allowed.insert((name, i)); + left_allowed.insert(i); } JoinSide::Right => { - right_allowed.insert((name, i)); + right_allowed.insert(i); } JoinSide::None => { // Mark columns - don't allow pushdown to either side @@ -1453,7 +1450,7 @@ impl ExecutionPlan for HashJoinExec { } let left_child = if left_preserved { - ChildFilterDescription::from_child_with_allowed_columns( + ChildFilterDescription::from_child_with_allowed_indices( &parent_filters, left_allowed, self.left(), @@ -1463,7 +1460,7 @@ impl ExecutionPlan for HashJoinExec { }; let mut right_child = if right_preserved { - ChildFilterDescription::from_child_with_allowed_columns( + ChildFilterDescription::from_child_with_allowed_indices( &parent_filters, right_allowed, self.right(), diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 76711a8f835f4..a3a27e81f733b 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -29,8 +29,8 @@ use super::{ use crate::column_rewriter::PhysicalColumnRewriter; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildFilterDescription, ChildPushdownResult, FilterColumnChecker, FilterDescription, - FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, FilterRemapper, PushedDownPredicate, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr}; @@ -51,7 +51,7 @@ use datafusion_execution::TaskContext; use datafusion_expr::ExpressionPlacement; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::projection::Projector; -use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, PhysicalSortExpr, @@ -384,22 +384,19 @@ impl ExecutionPlan for ProjectionExec { // expand alias column to original expr in parent filters let invert_alias_map = self.collect_reverse_alias()?; let output_schema = self.schema(); - let checker = FilterColumnChecker::new(&output_schema); + let remapper = FilterRemapper::new(output_schema); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { - if !checker.can_pushdown(&filter) { + // Check that column exists in child, then reassign column indices to match child schema + if let Some(reassigned) = remapper.try_remap(&filter)? { + // rewrite filter expression using invert alias map + let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map); + let rewritten = reassigned.rewrite(&mut rewriter)?.data; + child_parent_filters.push(PushedDownPredicate::supported(rewritten)); + } else { child_parent_filters.push(PushedDownPredicate::unsupported(filter)); - continue; } - // All columns exist in child - we can push down - // Need to reassign column indices to match child schema - let reassigned_filter = reassign_expr_columns(filter, &output_schema)?; - // rewrite filter expression using invert alias map - let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map); - let rewritten = reassigned_filter.rewrite(&mut rewriter)?.data; - - child_parent_filters.push(PushedDownPredicate::supported(rewritten)); } Ok(FilterDescription::new().with_child(ChildFilterDescription { From e03028bd1b65a02b55a3f2ad06b7c27c181b2cdb Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Thu, 12 Feb 2026 15:31:35 +0000 Subject: [PATCH 4/6] Simplify how left_ and right_allowed is built --- .../physical-plan/src/joins/hash_join/exec.rs | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6f9dd82e9f92a..e7639d2bd11dc 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::fmt; use std::mem::size_of; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -1433,21 +1434,15 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; - let mut left_allowed = std::collections::HashSet::new(); - let mut right_allowed = std::collections::HashSet::new(); - for (i, ci) in column_indices.iter().enumerate() { + let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new()); + column_indices.iter().for_each(|ci| { match ci.side { - JoinSide::Left => { - left_allowed.insert(i); - } - JoinSide::Right => { - right_allowed.insert(i); - } - JoinSide::None => { - // Mark columns - don't allow pushdown to either side - } - } - } + JoinSide::Left => left_allowed.insert(ci.index), + JoinSide::Right => right_allowed.insert(ci.index), + // Mark columns - don't allow pushdown to either side + JoinSide::None => false, + }; + }); let left_child = if left_preserved { ChildFilterDescription::from_child_with_allowed_indices( From a1b844340745a6819c3f8f5ee615ee5b865f82f9 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Thu, 12 Feb 2026 18:21:58 +0000 Subject: [PATCH 5/6] Review comments --- .../physical-plan/src/filter_pushdown.rs | 103 +++++++----------- .../physical-plan/src/joins/hash_join/exec.rs | 19 ++-- 2 files changed, 48 insertions(+), 74 deletions(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 44cbf1abbc3a4..7e82b9e8239e0 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -40,9 +40,9 @@ use std::sync::Arc; use arrow_schema::SchemaRef; use datafusion_common::{ Result, - tree_node::{TreeNode, TreeNodeRecursion}, + tree_node::{Transformed, TreeNode}, }; -use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -315,26 +315,27 @@ pub struct ChildFilterDescription { /// 1. Verify that all columns referenced by the filter exist in the target /// 2. Remap column indices to match the target schema /// -/// For join nodes, an additional constraint is needed: only columns belonging -/// to a specific side of the join should be considered valid. This is -/// controlled by an optional set of allowed column indices (in the parent -/// schema). When provided, a filter is only eligible if every column -/// reference's index appears in the allowed set. +/// `allowed_indices` controls which column indices (in the parent schema) are +/// considered valid. For single-input nodes this defaults to +/// `0..child_schema.len()` (all columns are reachable). For join nodes it is +/// restricted to the subset of output columns that map to the target child, +/// which is critical when different sides have same-named columns. pub(crate) struct FilterRemapper { /// The target schema to remap column indices into. child_schema: SchemaRef, - /// If set, only columns at these indices (in the *parent* schema) are - /// considered valid. When `None`, any column whose name exists in - /// `child_schema` is valid. - allowed_indices: Option>, + /// Only columns at these indices (in the *parent* schema) are considered + /// valid. For non-join nodes this defaults to `0..child_schema.len()`. + allowed_indices: HashSet, } impl FilterRemapper { - /// Create a remapper that accepts any column name present in the target schema. + /// Create a remapper that accepts any column whose index falls within + /// `0..child_schema.len()` and whose name exists in the target schema. pub(crate) fn new(child_schema: SchemaRef) -> Self { + let allowed_indices = (0..child_schema.fields().len()).collect(); Self { child_schema, - allowed_indices: None, + allowed_indices, } } @@ -347,71 +348,41 @@ impl FilterRemapper { ) -> Self { Self { child_schema, - allowed_indices: Some(allowed_indices), + allowed_indices, } } /// Try to remap a filter's column references to the target schema. /// - /// Returns `Some(remapped_filter)` if all columns pass validation, - /// or `None` if any column is not valid for this target. + /// Validates and remaps in a single tree traversal: for each column, + /// checks that its index is in the allowed set and that + /// its name exists in the target schema, then remaps the index. + /// Returns `Some(remapped)` if all columns are valid, or `None` if any + /// column fails validation. pub(crate) fn try_remap( &self, filter: &Arc, ) -> Result>> { - if self.all_columns_in_schema(filter) - && self - .allowed_indices - .as_ref() - .map(|allowed| Self::all_columns_in_set(filter, allowed)) - .unwrap_or(true) - { - let remapped = reassign_expr_columns(Arc::clone(filter), &self.child_schema)?; - Ok(Some(remapped)) - } else { - Ok(None) - } - } - - fn all_columns_in_schema(&self, filter: &Arc) -> bool { - let names: HashSet<&str> = self - .child_schema - .fields() - .iter() - .map(|f| f.name().as_str()) - .collect(); - let mut ok = true; - filter - .apply(|e| { - if let Some(col) = e.as_any().downcast_ref::() - && !names.contains(col.name()) + let mut all_valid = true; + let transformed = Arc::clone(filter).transform_down(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + if self.allowed_indices.contains(&col.index()) + && let Ok(new_index) = self.child_schema.index_of(col.name()) { - ok = false; - return Ok(TreeNodeRecursion::Stop); + Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + new_index, + )))) + } else { + all_valid = false; + Ok(Transformed::complete(expr)) } - Ok(TreeNodeRecursion::Continue) - }) - .expect("infallible traversal"); - ok - } + } else { + Ok(Transformed::no(expr)) + } + })?; - fn all_columns_in_set( - filter: &Arc, - allowed: &HashSet, - ) -> bool { - let mut ok = true; - filter - .apply(|e| { - if let Some(col) = e.as_any().downcast_ref::() - && !allowed.contains(&col.index()) - { - ok = false; - return Ok(TreeNodeRecursion::Stop); - } - Ok(TreeNodeRecursion::Continue) - }) - .expect("infallible traversal"); - ok + Ok(all_valid.then_some(transformed.data)) } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index e7639d2bd11dc..8ecffa82c10bf 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1435,14 +1435,17 @@ impl ExecutionPlan for HashJoinExec { }; let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new()); - column_indices.iter().for_each(|ci| { - match ci.side { - JoinSide::Left => left_allowed.insert(ci.index), - JoinSide::Right => right_allowed.insert(ci.index), - // Mark columns - don't allow pushdown to either side - JoinSide::None => false, - }; - }); + column_indices + .iter() + .enumerate() + .for_each(|(output_idx, ci)| { + match ci.side { + JoinSide::Left => left_allowed.insert(output_idx), + JoinSide::Right => right_allowed.insert(output_idx), + // Mark columns - don't allow pushdown to either side + JoinSide::None => false, + }; + }); let left_child = if left_preserved { ChildFilterDescription::from_child_with_allowed_indices( From 043eed145d53a03447393f12a975b5d5e0e967e5 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Sun, 15 Feb 2026 19:45:33 +0000 Subject: [PATCH 6/6] Support semi/anti join pushdown --- .../physical_optimizer/filter_pushdown.rs | 75 +++++++++++++++++++ .../physical-plan/src/joins/hash_join/exec.rs | 64 ++++++++++++++-- .../dynamic_filter_pushdown_config.slt | 71 +++++++++++++++++- 3 files changed, 201 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 783babf0b2de1..99db81d34d8fa 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1865,6 +1865,81 @@ fn test_hashjoin_parent_filter_pushdown_mark_join() { ); } +/// Test that filters on join key columns are pushed to both sides of semi/anti joins. +/// For LeftSemi/LeftAnti, the output only contains left columns, but filters on +/// join key columns can also be pushed to the right (non-preserved) side because +/// the equijoin condition guarantees the key values match. +#[test] +fn test_hashjoin_parent_filter_pushdown_semi_anti_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let left_schema = Arc::new(Schema::new(vec![ + Field::new("k", DataType::Utf8, false), + Field::new("v", DataType::Utf8, false), + ])); + let left_scan = TestScanBuilder::new(Arc::clone(&left_schema)) + .with_support(true) + .build(); + + let right_schema = Arc::new(Schema::new(vec![ + Field::new("k", DataType::Utf8, false), + Field::new("w", DataType::Utf8, false), + ])); + let right_scan = TestScanBuilder::new(Arc::clone(&right_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("k", &left_schema).unwrap(), + col("k", &right_schema).unwrap(), + )]; + + let join = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + // Filter on join key column: k = 'x' — should be pushed to BOTH sides + let key_filter = col_lit_predicate("k", "x", &join_schema); + // Filter on non-key column: v = 'y' — should only be pushed to the left side + let val_filter = col_lit_predicate("v", "y", &join_schema); + + let filter = + Arc::new(FilterExec::try_new(key_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(val_filter, filter).unwrap()) + as Arc; + + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: v@1 = y + - FilterExec: k@0 = x + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true, predicate=k@0 = x AND v@1 = y + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true, predicate=k@0 = x + " + ); +} + /// Integration test for dynamic filter pushdown with TopK. /// We use an integration test because there are complex interactions in the optimizer rules /// that the unit tests applying a single optimizer rule do not cover. diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 8ecffa82c10bf..7e31f719d7ea1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -81,7 +81,7 @@ use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumula use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; -use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; @@ -1447,6 +1447,51 @@ impl ExecutionPlan for HashJoinExec { }; }); + // For semi/anti joins, the non-preserved side's columns are not in the + // output, but filters on join key columns can still be pushed there. + // We find output columns that are join keys on the preserved side and + // add their output indices to the non-preserved side's allowed set. + // The name-based remap in FilterRemapper will then match them to the + // corresponding column in the non-preserved child's schema. + match self.join_type { + JoinType::LeftSemi | JoinType::LeftAnti => { + let left_key_indices: HashSet = self + .on + .iter() + .filter_map(|(left_key, _)| { + left_key + .as_any() + .downcast_ref::() + .map(|c| c.index()) + }) + .collect(); + for (output_idx, ci) in column_indices.iter().enumerate() { + if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) { + right_allowed.insert(output_idx); + } + } + } + JoinType::RightSemi | JoinType::RightAnti => { + let right_key_indices: HashSet = self + .on + .iter() + .filter_map(|(_, right_key)| { + right_key + .as_any() + .downcast_ref::() + .map(|c| c.index()) + }) + .collect(); + for (output_idx, ci) in column_indices.iter().enumerate() { + if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index) + { + left_allowed.insert(output_idx); + } + } + } + _ => {} + } + let left_child = if left_preserved { ChildFilterDescription::from_child_with_allowed_indices( &parent_filters, @@ -1538,8 +1583,13 @@ fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { JoinType::Left => (true, false), JoinType::Right => (false, true), JoinType::Full => (false, false), - JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), - JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), + // Filters in semi/anti joins are either on the preserved side, or on join keys, + // as all output columns come from the preserved side. Join key filters can be + // safely pushed down into the other side. + JoinType::LeftSemi | JoinType::LeftAnti => (true, true), + JoinType::RightSemi | JoinType::RightAnti => (true, true), + JoinType::LeftMark => (true, false), + JoinType::RightMark => (false, true), } } @@ -5764,11 +5814,11 @@ mod tests { assert_eq!(lr_is_preserved(JoinType::Left), (true, false)); assert_eq!(lr_is_preserved(JoinType::Right), (false, true)); assert_eq!(lr_is_preserved(JoinType::Full), (false, false)); - assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, false)); - assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, false)); + assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true)); + assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true)); assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false)); - assert_eq!(lr_is_preserved(JoinType::RightSemi), (false, true)); - assert_eq!(lr_is_preserved(JoinType::RightAnti), (false, true)); + assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true)); + assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true)); assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true)); } } diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index dd6aea1984338..275b0c9dd490f 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -311,6 +311,73 @@ physical_plan 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet 03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet +# Test 2c: Parent dynamic filter (from TopK) pushed through semi/anti joins +# Sort on the join key (id) so the TopK dynamic filter pushes to BOTH sides. + +# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join key) +# that pushes through the RightSemi join to both the build and probe sides. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +logical_plan +01)Sort: l.id ASC NULLS LAST, fetch=2 +02)--LeftSemi Join: l.id = __correlated_sq_1.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: __correlated_sq_1 +06)------SubqueryAlias: r +07)--------TableScan: right_parquet projection=[id] +physical_plan +01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Correctness check +query IT +SELECT l.* +FROM left_parquet l +WHERE l.id IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +1 left1 +3 left3 + +# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join key) +# that pushes through the LeftAnti join to both the preserved and non-preserved sides. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id NOT IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +logical_plan +01)Sort: l.id ASC NULLS LAST, fetch=2 +02)--LeftAnti Join: l.id = __correlated_sq_1.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: __correlated_sq_1 +06)------SubqueryAlias: r +07)--------TableScan: right_parquet projection=[id] +physical_plan +01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Correctness check +query IT +SELECT l.* +FROM left_parquet l +WHERE l.id NOT IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +2 left2 +4 left4 + # Test 3: Test independent control # Disable TopK, keep Join enabled @@ -568,7 +635,7 @@ physical_plan # The bug: when an outer join pushes a DynamicFilter for column "k" through an # inner join where both sides have a column named "k", the name-based routing # incorrectly pushed the filter to BOTH sides instead of only the correct one. -# With small row groups this caused wrong results (0 rows instead of expected). +# This caused wrong results (0 rows instead of expected). # Create tables with same column names (k, v) on both sides statement ok @@ -579,7 +646,7 @@ statement ok CREATE TABLE issue_20213_t2(k INT, v INT) AS SELECT i + 100 as k, i as v FROM generate_series(1, 100) t(i); -# Use small row groups to trigger statistics-based pruning +# Use small row groups to make statistics-based pruning more likely to manifest the bug statement ok SET datafusion.execution.parquet.max_row_group_size = 10;