diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index b3ed8d9653fe1..86db33e24ec11 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -401,7 +401,8 @@ async fn test_static_filter_pushdown_through_hash_join() { " ); - // Test left join - filters should NOT be pushed down + // Test left join: filter on preserved (build) side is pushed down, + // filter on non-preserved (probe) side is NOT pushed down. let join = Arc::new( HashJoinExec::try_new( TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -429,7 +430,6 @@ async fn test_static_filter_pushdown_through_hash_join() { let plan = Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc; - // Test that filters are NOT pushed down for left join insta::assert_snapshot!( OptimizationTest::new(plan, FilterPushdown::new(), true), @r" @@ -441,10 +441,9 @@ async fn test_static_filter_pushdown_through_hash_join() { - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true output: Ok: - - FilterExec: a@0 = aa - - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true " ); } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 689f629f7bac8..34ef8282b6db0 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -44,7 +44,6 @@ use datafusion_common::{ }; use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use itertools::Itertools; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterPushdownPhase { @@ -317,21 +316,33 @@ pub struct ChildFilterDescription { /// exist in the target schema. If any column in the filter is not present /// in the schema, the filter cannot be pushed down to that child. pub(crate) struct FilterColumnChecker<'a> { - column_names: HashSet<&'a str>, + /// The set of `(column_name, column_index)` pairs that are allowed. + allowed_columns: HashSet<(&'a str, usize)>, } impl<'a> FilterColumnChecker<'a> { /// Creates a new [`FilterColumnChecker`] from the given schema. /// - /// Extracts all column names from the schema's fields to build + /// Extracts all column `(name, index)` pairs from the schema's fields to build /// a lookup set for efficient column existence checks. - pub(crate) fn new(input_schema: &'a Schema) -> Self { - let column_names: HashSet<&str> = input_schema + pub(crate) fn new_schema(input_schema: &'a Schema) -> Self { + let allowed_columns: HashSet<(&str, usize)> = input_schema .fields() .iter() - .map(|f| f.name().as_str()) + .enumerate() + .map(|(i, f)| (f.name().as_str(), i)) .collect(); - Self { column_names } + Self { allowed_columns } + } + + /// Creates a new [`FilterColumnChecker`] from an explicit set of + /// allowed `(name, index)` pairs. + /// + /// This is used by join nodes that need to restrict pushdown to columns + /// belonging to a specific side of the join, even when different sides + /// have columns with the same name. + pub(crate) fn new_columns(allowed_columns: HashSet<(&'a str, usize)>) -> Self { + Self { allowed_columns } } /// Checks whether a filter expression can be pushed down to the child @@ -347,7 +358,9 @@ impl<'a> FilterColumnChecker<'a> { filter .apply(|expr| { if let Some(column) = expr.as_any().downcast_ref::() - && !self.column_names.contains(column.name()) + && !self + .allowed_columns + .contains(&(column.name(), column.index())) { can_apply = false; return Ok(TreeNodeRecursion::Stop); @@ -375,7 +388,7 @@ impl ChildFilterDescription { let child_schema = child.schema(); // Build a set of column names in the child schema for quick lookup - let checker = FilterColumnChecker::new(&child_schema); + let checker = FilterColumnChecker::new_schema(&child_schema); // Analyze each parent filter let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); @@ -401,6 +414,52 @@ impl ChildFilterDescription { }) } + /// Like [`Self::from_child`], but restricts which parent-level columns are + /// considered reachable through this child. + /// + /// `allowed_columns` is a set of `(column_name, column_index)` pairs + /// (in the *parent* schema) that map to this child's side of a join. + /// A filter is only eligible for pushdown when **every** column it + /// references appears in `allowed_columns`. This prevents incorrect + /// pushdown when different join sides have columns with the same name. + pub fn from_child_with_allowed_columns( + parent_filters: &[Arc], + allowed_columns: HashSet<(&str, usize)>, + child: &Arc, + ) -> Result { + let child_schema = child.schema(); + let checker = FilterColumnChecker::new_columns(allowed_columns); + + let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); + for filter in parent_filters { + if checker.can_pushdown(filter) { + let reassigned_filter = + reassign_expr_columns(Arc::clone(filter), &child_schema)?; + child_parent_filters + .push(PushedDownPredicate::supported(reassigned_filter)); + } else { + child_parent_filters + .push(PushedDownPredicate::unsupported(Arc::clone(filter))); + } + } + + Ok(Self { + parent_filters: child_parent_filters, + self_filters: vec![], + }) + } + + /// Mark all parent filters as unsupported for this child. + pub fn all_unsupported(parent_filters: &[Arc]) -> Self { + Self { + parent_filters: parent_filters + .iter() + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) + .collect(), + self_filters: vec![], + } + } + /// Add a self filter (from the current node) to be pushed down to this child. pub fn with_self_filter(mut self, filter: Arc) -> Self { self.self_filters.push(filter); @@ -476,15 +535,9 @@ impl FilterDescription { children: &[&Arc], ) -> Self { let mut desc = Self::new(); - let child_filters = parent_filters - .iter() - .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) - .collect_vec(); for _ in 0..children.len() { - desc = desc.with_child(ChildFilterDescription { - parent_filters: child_filters.clone(), - self_filters: vec![], - }); + desc = + desc.with_child(ChildFilterDescription::all_unsupported(parent_filters)); } desc } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6b18b56413b71..26ebb04b3d7ae 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -24,7 +24,7 @@ use std::{any::Any, vec}; use crate::ExecutionPlanProperties; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; use crate::joins::Map; @@ -720,7 +720,9 @@ impl HashJoinExec { } fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { - if !config.optimizer.enable_join_dynamic_filter_pushdown { + if self.join_type != JoinType::Inner + || !config.optimizer.enable_join_dynamic_filter_pushdown + { return false; } @@ -1407,32 +1409,76 @@ impl ExecutionPlan for HashJoinExec { parent_filters: Vec>, config: &ConfigOptions, ) -> Result { - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs - // See https://github.com/apache/datafusion/issues/16973 for tracking. - if self.join_type != JoinType::Inner { - return Ok(FilterDescription::all_unsupported( - &parent_filters, - &self.children(), - )); + // This is the physical-plan equivalent of `push_down_all_join` in + // `datafusion/optimizer/src/push_down_filter.rs`. That function uses + // `lr_is_preserved` to decide which parent predicates can be pushed + // past a logical join to its children, then checks column references + // to route each predicate to the correct side. + // + // We apply the same two-level logic here: + // 1. `lr_is_preserved` gates whether a side is eligible at all. + // 2. For each filter, we check that all column references belong to + // the target child using `(name, index)` pairs via + // `from_child_with_allowed_columns`. This is critical for + // correctness: name-based matching alone can incorrectly push + // filters when different join sides have columns with the same + // name (see https://github.com/apache/datafusion/issues/20213). + let (left_preserved, right_preserved) = lr_is_preserved(self.join_type); + + // Build the set of allowed (name, index) pairs for each side. + // When a projection is present, the output schema differs from the + // raw join schema, so we need to map through the projection. + let output_schema = self.schema(); + let output_fields = output_schema.fields(); + let column_indices: Vec = match self.projection.as_ref() { + Some(projection) => projection + .iter() + .map(|i| self.column_indices[*i].clone()) + .collect(), + None => self.column_indices.clone(), + }; + + let mut left_allowed = std::collections::HashSet::new(); + let mut right_allowed = std::collections::HashSet::new(); + for (i, ci) in column_indices.iter().enumerate() { + let name = output_fields[i].name().as_str(); + match ci.side { + JoinSide::Left => { + left_allowed.insert((name, i)); + } + JoinSide::Right => { + right_allowed.insert((name, i)); + } + JoinSide::None => { + // Mark columns — don't allow pushdown to either side + } + } } - // Get basic filter descriptions for both children - let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, - self.left(), - )?; - let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, - self.right(), - )?; + let left_child = if left_preserved { + ChildFilterDescription::from_child_with_allowed_columns( + &parent_filters, + left_allowed, + self.left(), + )? + } else { + ChildFilterDescription::all_unsupported(&parent_filters) + }; + + let mut right_child = if right_preserved { + ChildFilterDescription::from_child_with_allowed_columns( + &parent_filters, + right_allowed, + self.right(), + )? + } else { + ChildFilterDescription::all_unsupported(&parent_filters) + }; // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) && self.allow_join_dynamic_filter_pushdown(config) { - // Add actual dynamic filter to right side (probe side) let dynamic_filter = Self::create_dynamic_filter(&self.on); right_child = right_child.with_self_filter(dynamic_filter); } @@ -1448,19 +1494,6 @@ impl ExecutionPlan for HashJoinExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for - // non-inner joins in `gather_filters_for_pushdown`. - // However it's a cheap check and serves to inform future devs touching this function that they need to be really - // careful pushing down filters through non-inner joins. - if self.join_type != JoinType::Inner { - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs - return Ok(FilterPushdownPropagation::all_unsupported( - child_pushdown_result, - )); - } - let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child @@ -1501,6 +1534,22 @@ impl ExecutionPlan for HashJoinExec { } } +/// Determines which sides of a join are "preserved" for filter pushdown. +/// +/// A preserved side means filters on that side's columns can be safely pushed +/// below the join. This mirrors the logic in the logical optimizer's +/// `lr_is_preserved` in `datafusion/optimizer/src/push_down_filter.rs`. +fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { + match join_type { + JoinType::Inner => (true, true), + JoinType::Left => (true, false), + JoinType::Right => (false, true), + JoinType::Full => (false, false), + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), + } +} + /// Accumulator for collecting min/max bounds from build-side data during hash join. /// /// This struct encapsulates the logic for progressively computing column bounds @@ -5715,4 +5764,18 @@ mod tests { .contains("null_aware anti join only supports single column join key") ); } + + #[test] + fn test_lr_is_preserved() { + assert_eq!(lr_is_preserved(JoinType::Inner), (true, true)); + assert_eq!(lr_is_preserved(JoinType::Left), (true, false)); + assert_eq!(lr_is_preserved(JoinType::Right), (false, true)); + assert_eq!(lr_is_preserved(JoinType::Full), (false, false)); + assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, false)); + assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, false)); + assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false)); + assert_eq!(lr_is_preserved(JoinType::RightSemi), (false, true)); + assert_eq!(lr_is_preserved(JoinType::RightAnti), (false, true)); + assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true)); + } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 76711a8f835f4..573f4ad0afed3 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -384,7 +384,7 @@ impl ExecutionPlan for ProjectionExec { // expand alias column to original expr in parent filters let invert_alias_map = self.collect_reverse_alias()?; let output_schema = self.schema(); - let checker = FilterColumnChecker::new(&output_schema); + let checker = FilterColumnChecker::new_schema(&output_schema); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index b112d70f427f1..3f3d10ed421c4 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -419,6 +419,97 @@ physical_plan 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +# Test 6: Regression test for issue #20213 - dynamic filter applied to wrong table +# when subquery join has same column names on both sides. +# +# The bug: when an outer join pushes a DynamicFilter for column "k" through an +# inner join where both sides have a column named "k", the name-based routing +# incorrectly pushed the filter to BOTH sides instead of only the correct one. +# With small row groups this caused wrong results (0 rows instead of expected). + +# Create tables with same column names (k, v) on both sides +statement ok +CREATE TABLE issue_20213_t1(k INT, v INT) AS +SELECT i as k, i as v FROM generate_series(1, 1000) t(i); + +statement ok +CREATE TABLE issue_20213_t2(k INT, v INT) AS +SELECT i + 100 as k, i as v FROM generate_series(1, 100) t(i); + +# Use small row groups to trigger statistics-based pruning +statement ok +SET datafusion.execution.parquet.max_row_group_size = 10; + +query I +COPY issue_20213_t1 TO 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t1.parquet' STORED AS PARQUET; +---- +1000 + +query I +COPY issue_20213_t2 TO 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t2.parquet' STORED AS PARQUET; +---- +100 + +# Reset row group size +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1000000; + +statement ok +CREATE EXTERNAL TABLE t1_20213(k INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t1.parquet'; + +statement ok +CREATE EXTERNAL TABLE t2_20213(k INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t2.parquet'; + +# The query from issue #20213: subquery joins t1 and t2 on v, then outer +# join uses t2's k column. The dynamic filter on k from the outer join +# must only apply to t2 (k range 101-200), NOT to t1 (k range 1-1000). +query I +SELECT count(*) FROM ( + SELECT t2_20213.k as k, t1_20213.k as k2 + FROM t1_20213 + JOIN t2_20213 ON t1_20213.v = t2_20213.v +) a +JOIN t2_20213 b ON a.k = b.k +WHERE b.v < 10; +---- +9 + +# Also verify with SELECT * to catch row-level correctness +query IIII rowsort +SELECT * FROM ( + SELECT t2_20213.k as k, t1_20213.k as k2 + FROM t1_20213 + JOIN t2_20213 ON t1_20213.v = t2_20213.v +) a +JOIN t2_20213 b ON a.k = b.k +WHERE b.v < 10; +---- +101 1 101 1 +102 2 102 2 +103 3 103 3 +104 4 104 4 +105 5 105 5 +106 6 106 6 +107 7 107 7 +108 8 108 8 +109 9 109 9 + +statement ok +DROP TABLE issue_20213_t1; + +statement ok +DROP TABLE issue_20213_t2; + +statement ok +DROP TABLE t1_20213; + +statement ok +DROP TABLE t2_20213; + # Cleanup statement ok