diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index b3ed8d9653fe1..99db81d34d8fa 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -401,7 +401,8 @@ async fn test_static_filter_pushdown_through_hash_join() { " ); - // Test left join - filters should NOT be pushed down + // Test left join: filter on preserved (build) side is pushed down, + // filter on non-preserved (probe) side is NOT pushed down. let join = Arc::new( HashJoinExec::try_new( TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -425,25 +426,30 @@ async fn test_static_filter_pushdown_through_hash_join() { ); let join_schema = join.schema(); - let filter = col_lit_predicate("a", "aa", &join_schema); - let plan = - Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc; + // Filter on build side column (preserved): should be pushed down + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column (not preserved): should NOT be pushed down + let right_filter = col_lit_predicate("e", "ba", &join_schema); + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()) + as Arc; - // Test that filters are NOT pushed down for left join insta::assert_snapshot!( OptimizationTest::new(plan, FilterPushdown::new(), true), @r" OptimizationTest: input: - - FilterExec: a@0 = aa - - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + - FilterExec: e@4 = ba + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true output: Ok: - - FilterExec: a@0 = aa + - FilterExec: e@4 = ba - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true " ); @@ -1722,6 +1728,218 @@ async fn test_hashjoin_parent_filter_pushdown() { ); } +#[test] +fn test_hashjoin_parent_filter_pushdown_same_column_names() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("build_val", DataType::Utf8, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .build(); + + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("probe_val", DataType::Utf8, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("id", &build_side_schema).unwrap(), + col("id", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + + let build_id_filter = col_lit_predicate("id", "aa", &join_schema); + let probe_val_filter = col_lit_predicate("probe_val", "x", &join_schema); + + let filter = + Arc::new(FilterExec::try_new(build_id_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(probe_val_filter, filter).unwrap()) + as Arc; + + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: probe_val@3 = x + - FilterExec: id@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true, predicate=id@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true, predicate=probe_val@1 = x + " + ); +} + +#[test] +fn test_hashjoin_parent_filter_pushdown_mark_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let left_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("val", DataType::Utf8, false), + ])); + let left_scan = TestScanBuilder::new(Arc::clone(&left_schema)) + .with_support(true) + .build(); + + let right_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)])); + let right_scan = TestScanBuilder::new(Arc::clone(&right_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("id", &left_schema).unwrap(), + col("id", &right_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + on, + None, + &JoinType::LeftMark, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + + let left_filter = col_lit_predicate("val", "x", &join_schema); + let mark_filter = col_lit_predicate("mark", true, &join_schema); + + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(mark_filter, filter).unwrap()) + as Arc; + + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: mark@2 = true + - FilterExec: val@1 = x + - HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: mark@2 = true + - HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true, predicate=val@1 = x + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true + " + ); +} + +/// Test that filters on join key columns are pushed to both sides of semi/anti joins. +/// For LeftSemi/LeftAnti, the output only contains left columns, but filters on +/// join key columns can also be pushed to the right (non-preserved) side because +/// the equijoin condition guarantees the key values match. +#[test] +fn test_hashjoin_parent_filter_pushdown_semi_anti_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let left_schema = Arc::new(Schema::new(vec![ + Field::new("k", DataType::Utf8, false), + Field::new("v", DataType::Utf8, false), + ])); + let left_scan = TestScanBuilder::new(Arc::clone(&left_schema)) + .with_support(true) + .build(); + + let right_schema = Arc::new(Schema::new(vec![ + Field::new("k", DataType::Utf8, false), + Field::new("w", DataType::Utf8, false), + ])); + let right_scan = TestScanBuilder::new(Arc::clone(&right_schema)) + .with_support(true) + .build(); + + let on = vec![( + col("k", &left_schema).unwrap(), + col("k", &right_schema).unwrap(), + )]; + + let join = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + on, + None, + &JoinType::LeftSemi, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + // Filter on join key column: k = 'x' — should be pushed to BOTH sides + let key_filter = col_lit_predicate("k", "x", &join_schema); + // Filter on non-key column: v = 'y' — should only be pushed to the left side + let val_filter = col_lit_predicate("v", "y", &join_schema); + + let filter = + Arc::new(FilterExec::try_new(key_filter, Arc::clone(&join) as _).unwrap()); + let plan = Arc::new(FilterExec::try_new(val_filter, filter).unwrap()) + as Arc; + + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: v@1 = y + - FilterExec: k@0 = x + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true, predicate=k@0 = x AND v@1 = y + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true, predicate=k@0 = x + " + ); +} + /// Integration test for dynamic filter pushdown with TopK. /// We use an integration test because there are complex interactions in the optimizer rules /// that the unit tests applying a single optimizer rule do not cover. diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 689f629f7bac8..7e82b9e8239e0 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -37,14 +37,13 @@ use std::collections::HashSet; use std::sync::Arc; -use arrow_schema::Schema; +use arrow_schema::SchemaRef; use datafusion_common::{ Result, - tree_node::{TreeNode, TreeNodeRecursion}, + tree_node::{Transformed, TreeNode}, }; -use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use itertools::Itertools; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterPushdownPhase { @@ -310,53 +309,80 @@ pub struct ChildFilterDescription { pub(crate) self_filters: Vec>, } -/// A utility for checking whether a filter expression can be pushed down -/// to a child node based on column availability. +/// Validates and remaps filter column references to a target schema in one step. /// -/// This checker validates that all columns referenced in a filter expression -/// exist in the target schema. If any column in the filter is not present -/// in the schema, the filter cannot be pushed down to that child. -pub(crate) struct FilterColumnChecker<'a> { - column_names: HashSet<&'a str>, +/// When pushing filters from a parent to a child node, we need to: +/// 1. Verify that all columns referenced by the filter exist in the target +/// 2. Remap column indices to match the target schema +/// +/// `allowed_indices` controls which column indices (in the parent schema) are +/// considered valid. For single-input nodes this defaults to +/// `0..child_schema.len()` (all columns are reachable). For join nodes it is +/// restricted to the subset of output columns that map to the target child, +/// which is critical when different sides have same-named columns. +pub(crate) struct FilterRemapper { + /// The target schema to remap column indices into. + child_schema: SchemaRef, + /// Only columns at these indices (in the *parent* schema) are considered + /// valid. For non-join nodes this defaults to `0..child_schema.len()`. + allowed_indices: HashSet, } -impl<'a> FilterColumnChecker<'a> { - /// Creates a new [`FilterColumnChecker`] from the given schema. - /// - /// Extracts all column names from the schema's fields to build - /// a lookup set for efficient column existence checks. - pub(crate) fn new(input_schema: &'a Schema) -> Self { - let column_names: HashSet<&str> = input_schema - .fields() - .iter() - .map(|f| f.name().as_str()) - .collect(); - Self { column_names } +impl FilterRemapper { + /// Create a remapper that accepts any column whose index falls within + /// `0..child_schema.len()` and whose name exists in the target schema. + pub(crate) fn new(child_schema: SchemaRef) -> Self { + let allowed_indices = (0..child_schema.fields().len()).collect(); + Self { + child_schema, + allowed_indices, + } } - /// Checks whether a filter expression can be pushed down to the child - /// whose schema was used to create this checker. - /// - /// Returns `true` if all [`Column`] references in the filter expression - /// exist in the target schema, `false` otherwise. + /// Create a remapper that only accepts columns at the given indices. + /// This is used by join nodes to restrict pushdown to one side of the + /// join when both sides have same-named columns. + fn with_allowed_indices( + child_schema: SchemaRef, + allowed_indices: HashSet, + ) -> Self { + Self { + child_schema, + allowed_indices, + } + } + + /// Try to remap a filter's column references to the target schema. /// - /// This method traverses the entire expression tree, checking each - /// column reference against the available column names. - pub(crate) fn can_pushdown(&self, filter: &Arc) -> bool { - let mut can_apply = true; - filter - .apply(|expr| { - if let Some(column) = expr.as_any().downcast_ref::() - && !self.column_names.contains(column.name()) + /// Validates and remaps in a single tree traversal: for each column, + /// checks that its index is in the allowed set and that + /// its name exists in the target schema, then remaps the index. + /// Returns `Some(remapped)` if all columns are valid, or `None` if any + /// column fails validation. + pub(crate) fn try_remap( + &self, + filter: &Arc, + ) -> Result>> { + let mut all_valid = true; + let transformed = Arc::clone(filter).transform_down(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + if self.allowed_indices.contains(&col.index()) + && let Ok(new_index) = self.child_schema.index_of(col.name()) { - can_apply = false; - return Ok(TreeNodeRecursion::Stop); + Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + new_index, + )))) + } else { + all_valid = false; + Ok(Transformed::complete(expr)) } + } else { + Ok(Transformed::no(expr)) + } + })?; - Ok(TreeNodeRecursion::Continue) - }) - .expect("infallible traversal"); - can_apply + Ok(all_valid.then_some(transformed.data)) } } @@ -372,24 +398,41 @@ impl ChildFilterDescription { parent_filters: &[Arc], child: &Arc, ) -> Result { - let child_schema = child.schema(); + let remapper = FilterRemapper::new(child.schema()); + Self::remap_filters(parent_filters, &remapper) + } - // Build a set of column names in the child schema for quick lookup - let checker = FilterColumnChecker::new(&child_schema); + /// Like [`Self::from_child`], but restricts which parent-level columns are + /// considered reachable through this child. + /// + /// `allowed_indices` is the set of column indices (in the *parent* + /// schema) that map to this child's side of a join. A filter is only + /// eligible for pushdown when **every** column index it references + /// appears in `allowed_indices`. + /// + /// This prevents incorrect pushdown when different join sides have + /// columns with the same name: matching on index ensures a filter + /// referencing the right side's `k@2` is not pushed to the left side + /// which also has a column named `k` but at a different index. + pub fn from_child_with_allowed_indices( + parent_filters: &[Arc], + allowed_indices: HashSet, + child: &Arc, + ) -> Result { + let remapper = + FilterRemapper::with_allowed_indices(child.schema(), allowed_indices); + Self::remap_filters(parent_filters, &remapper) + } - // Analyze each parent filter + fn remap_filters( + parent_filters: &[Arc], + remapper: &FilterRemapper, + ) -> Result { let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); - for filter in parent_filters { - if checker.can_pushdown(filter) { - // All columns exist in child - we can push down - // Need to reassign column indices to match child schema - let reassigned_filter = - reassign_expr_columns(Arc::clone(filter), &child_schema)?; - child_parent_filters - .push(PushedDownPredicate::supported(reassigned_filter)); + if let Some(remapped) = remapper.try_remap(filter)? { + child_parent_filters.push(PushedDownPredicate::supported(remapped)); } else { - // Some columns don't exist in child - cannot push down child_parent_filters .push(PushedDownPredicate::unsupported(Arc::clone(filter))); } @@ -401,6 +444,17 @@ impl ChildFilterDescription { }) } + /// Mark all parent filters as unsupported for this child. + pub fn all_unsupported(parent_filters: &[Arc]) -> Self { + Self { + parent_filters: parent_filters + .iter() + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) + .collect(), + self_filters: vec![], + } + } + /// Add a self filter (from the current node) to be pushed down to this child. pub fn with_self_filter(mut self, filter: Arc) -> Self { self.self_filters.push(filter); @@ -476,15 +530,9 @@ impl FilterDescription { children: &[&Arc], ) -> Self { let mut desc = Self::new(); - let child_filters = parent_filters - .iter() - .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) - .collect_vec(); for _ in 0..children.len() { - desc = desc.with_child(ChildFilterDescription { - parent_filters: child_filters.clone(), - self_filters: vec![], - }); + desc = + desc.with_child(ChildFilterDescription::all_unsupported(parent_filters)); } desc } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6b18b56413b71..7e31f719d7ea1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::fmt; use std::mem::size_of; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -24,7 +25,7 @@ use std::{any::Any, vec}; use crate::ExecutionPlanProperties; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; use crate::joins::Map; @@ -80,7 +81,7 @@ use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumula use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; -use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; @@ -720,7 +721,9 @@ impl HashJoinExec { } fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { - if !config.optimizer.enable_join_dynamic_filter_pushdown { + if self.join_type != JoinType::Inner + || !config.optimizer.enable_join_dynamic_filter_pushdown + { return false; } @@ -1407,26 +1410,107 @@ impl ExecutionPlan for HashJoinExec { parent_filters: Vec>, config: &ConfigOptions, ) -> Result { - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs - // See https://github.com/apache/datafusion/issues/16973 for tracking. - if self.join_type != JoinType::Inner { - return Ok(FilterDescription::all_unsupported( - &parent_filters, - &self.children(), - )); + // This is the physical-plan equivalent of `push_down_all_join` in + // `datafusion/optimizer/src/push_down_filter.rs`. That function uses `lr_is_preserved` + // to decide which parent predicates can be pushed past a logical join to its children, + // then checks column references to route each predicate to the correct side. + // + // We apply the same two-level logic here: + // 1. `lr_is_preserved` gates whether a side is eligible at all. + // 2. For each filter, we check that all column references belong to the + // target child (using `column_indices` to map output column positions + // to join sides). This is critical for correctness: name-based matching + // alone (as done by `ChildFilterDescription::from_child`) can incorrectly + // push filters when different join sides have columns with the same name + // (e.g. nested mark joins both producing "mark" columns). + let (left_preserved, right_preserved) = lr_is_preserved(self.join_type); + + // Build the set of allowed column indices for each side + let column_indices: Vec = match self.projection.as_ref() { + Some(projection) => projection + .iter() + .map(|i| self.column_indices[*i].clone()) + .collect(), + None => self.column_indices.clone(), + }; + + let (mut left_allowed, mut right_allowed) = (HashSet::new(), HashSet::new()); + column_indices + .iter() + .enumerate() + .for_each(|(output_idx, ci)| { + match ci.side { + JoinSide::Left => left_allowed.insert(output_idx), + JoinSide::Right => right_allowed.insert(output_idx), + // Mark columns - don't allow pushdown to either side + JoinSide::None => false, + }; + }); + + // For semi/anti joins, the non-preserved side's columns are not in the + // output, but filters on join key columns can still be pushed there. + // We find output columns that are join keys on the preserved side and + // add their output indices to the non-preserved side's allowed set. + // The name-based remap in FilterRemapper will then match them to the + // corresponding column in the non-preserved child's schema. + match self.join_type { + JoinType::LeftSemi | JoinType::LeftAnti => { + let left_key_indices: HashSet = self + .on + .iter() + .filter_map(|(left_key, _)| { + left_key + .as_any() + .downcast_ref::() + .map(|c| c.index()) + }) + .collect(); + for (output_idx, ci) in column_indices.iter().enumerate() { + if ci.side == JoinSide::Left && left_key_indices.contains(&ci.index) { + right_allowed.insert(output_idx); + } + } + } + JoinType::RightSemi | JoinType::RightAnti => { + let right_key_indices: HashSet = self + .on + .iter() + .filter_map(|(_, right_key)| { + right_key + .as_any() + .downcast_ref::() + .map(|c| c.index()) + }) + .collect(); + for (output_idx, ci) in column_indices.iter().enumerate() { + if ci.side == JoinSide::Right && right_key_indices.contains(&ci.index) + { + left_allowed.insert(output_idx); + } + } + } + _ => {} } - // Get basic filter descriptions for both children - let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, - self.left(), - )?; - let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( - &parent_filters, - self.right(), - )?; + let left_child = if left_preserved { + ChildFilterDescription::from_child_with_allowed_indices( + &parent_filters, + left_allowed, + self.left(), + )? + } else { + ChildFilterDescription::all_unsupported(&parent_filters) + }; + + let mut right_child = if right_preserved { + ChildFilterDescription::from_child_with_allowed_indices( + &parent_filters, + right_allowed, + self.right(), + )? + } else { + ChildFilterDescription::all_unsupported(&parent_filters) + }; // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) @@ -1448,19 +1532,6 @@ impl ExecutionPlan for HashJoinExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for - // non-inner joins in `gather_filters_for_pushdown`. - // However it's a cheap check and serves to inform future devs touching this function that they need to be really - // careful pushing down filters through non-inner joins. - if self.join_type != JoinType::Inner { - // Other types of joins can support *some* filters, but restrictions are complex and error prone. - // For now we don't support them. - // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs - return Ok(FilterPushdownPropagation::all_unsupported( - child_pushdown_result, - )); - } - let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child @@ -1501,6 +1572,27 @@ impl ExecutionPlan for HashJoinExec { } } +/// Determines which sides of a join are "preserved" for filter pushdown. +/// +/// A preserved side means filters on that side's columns can be safely pushed +/// below the join. This mirrors the logic in the logical optimizer's +/// `lr_is_preserved` in `datafusion/optimizer/src/push_down_filter.rs`. +fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { + match join_type { + JoinType::Inner => (true, true), + JoinType::Left => (true, false), + JoinType::Right => (false, true), + JoinType::Full => (false, false), + // Filters in semi/anti joins are either on the preserved side, or on join keys, + // as all output columns come from the preserved side. Join key filters can be + // safely pushed down into the other side. + JoinType::LeftSemi | JoinType::LeftAnti => (true, true), + JoinType::RightSemi | JoinType::RightAnti => (true, true), + JoinType::LeftMark => (true, false), + JoinType::RightMark => (false, true), + } +} + /// Accumulator for collecting min/max bounds from build-side data during hash join. /// /// This struct encapsulates the logic for progressively computing column bounds @@ -5715,4 +5807,18 @@ mod tests { .contains("null_aware anti join only supports single column join key") ); } + + #[test] + fn test_lr_is_preserved() { + assert_eq!(lr_is_preserved(JoinType::Inner), (true, true)); + assert_eq!(lr_is_preserved(JoinType::Left), (true, false)); + assert_eq!(lr_is_preserved(JoinType::Right), (false, true)); + assert_eq!(lr_is_preserved(JoinType::Full), (false, false)); + assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, true)); + assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, true)); + assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false)); + assert_eq!(lr_is_preserved(JoinType::RightSemi), (true, true)); + assert_eq!(lr_is_preserved(JoinType::RightAnti), (true, true)); + assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true)); + } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 76711a8f835f4..a3a27e81f733b 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -29,8 +29,8 @@ use super::{ use crate::column_rewriter::PhysicalColumnRewriter; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildFilterDescription, ChildPushdownResult, FilterColumnChecker, FilterDescription, - FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, FilterRemapper, PushedDownPredicate, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr}; @@ -51,7 +51,7 @@ use datafusion_execution::TaskContext; use datafusion_expr::ExpressionPlacement; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::projection::Projector; -use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, PhysicalSortExpr, @@ -384,22 +384,19 @@ impl ExecutionPlan for ProjectionExec { // expand alias column to original expr in parent filters let invert_alias_map = self.collect_reverse_alias()?; let output_schema = self.schema(); - let checker = FilterColumnChecker::new(&output_schema); + let remapper = FilterRemapper::new(output_schema); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { - if !checker.can_pushdown(&filter) { + // Check that column exists in child, then reassign column indices to match child schema + if let Some(reassigned) = remapper.try_remap(&filter)? { + // rewrite filter expression using invert alias map + let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map); + let rewritten = reassigned.rewrite(&mut rewriter)?.data; + child_parent_filters.push(PushedDownPredicate::supported(rewritten)); + } else { child_parent_filters.push(PushedDownPredicate::unsupported(filter)); - continue; } - // All columns exist in child - we can push down - // Need to reassign column indices to match child schema - let reassigned_filter = reassign_expr_columns(filter, &output_schema)?; - // rewrite filter expression using invert alias map - let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map); - let rewritten = reassigned_filter.rewrite(&mut rewriter)?.data; - - child_parent_filters.push(PushedDownPredicate::supported(rewritten)); } Ok(FilterDescription::new().with_child(ChildFilterDescription { diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 1b037ee2b83af..275b0c9dd490f 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -187,6 +187,197 @@ physical_plan statement ok SET datafusion.optimizer.enable_join_dynamic_filter_pushdown = true; +# Test 2b: Dynamic filter pushdown for non-inner join types +# LEFT JOIN: optimizer swaps to physical Right join (build=right_parquet, probe=left_parquet). +# Dynamic filter is NOT pushed because Right join needs all probe rows in output. +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +LEFT JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Left Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# LEFT JOIN correctness: all left rows appear, unmatched right rows produce NULLs +query ITT +SELECT l.id, l.data, r.info +FROM left_parquet l +LEFT JOIN right_parquet r ON l.id = r.id +ORDER BY l.id; +---- +1 left1 right1 +2 left2 NULL +3 left3 right3 +4 left4 NULL +5 left5 right5 + +# RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet, probe=left_parquet). +# No self-generated dynamic filter (only Inner joins get that), but parent filters +# on the preserved (build) side can still push down. +query TT +EXPLAIN SELECT l.*, r.info +FROM left_parquet l +RIGHT JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, l.data, r.info +02)--Right Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info] +02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[info@1, id@2, data@3] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# RIGHT JOIN correctness: all right rows appear, unmatched left rows produce NULLs +query ITT +SELECT l.id, l.data, r.info +FROM left_parquet l +RIGHT JOIN right_parquet r ON l.id = r.id +ORDER BY r.id; +---- +1 left1 right1 +3 left3 right3 +5 left5 right5 + +# FULL JOIN: dynamic filter should NOT be pushed (both sides must preserve all rows) +query TT +EXPLAIN SELECT l.id, r.id as rid, l.data, r.info +FROM left_parquet l +FULL JOIN right_parquet r ON l.id = r.id; +---- +logical_plan +01)Projection: l.id, r.id AS rid, l.data, r.info +02)--Full Join: l.id = r.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id, info] +physical_plan +01)ProjectionExec: expr=[id@2 as id, id@0 as rid, data@3 as data, info@1 as info] +02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet, probe=left_parquet). +# No self-generated dynamic filter (only Inner joins), but parent filters on +# the preserved (probe) side can push down. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id IN (SELECT r.id FROM right_parquet r); +---- +logical_plan +01)LeftSemi Join: l.id = __correlated_sq_1.id +02)--SubqueryAlias: l +03)----TableScan: left_parquet projection=[id, data] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet + +# LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push +# to the preserved (left/build) side. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id NOT IN (SELECT r.id FROM right_parquet r); +---- +logical_plan +01)LeftAnti Join: l.id = __correlated_sq_1.id +02)--SubqueryAlias: l +03)----TableScan: left_parquet projection=[id, data] +04)--SubqueryAlias: __correlated_sq_1 +05)----SubqueryAlias: r +06)------TableScan: right_parquet projection=[id] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet + +# Test 2c: Parent dynamic filter (from TopK) pushed through semi/anti joins +# Sort on the join key (id) so the TopK dynamic filter pushes to BOTH sides. + +# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join key) +# that pushes through the RightSemi join to both the build and probe sides. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +logical_plan +01)Sort: l.id ASC NULLS LAST, fetch=2 +02)--LeftSemi Join: l.id = __correlated_sq_1.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: __correlated_sq_1 +06)------SubqueryAlias: r +07)--------TableScan: right_parquet projection=[id] +physical_plan +01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Correctness check +query IT +SELECT l.* +FROM left_parquet l +WHERE l.id IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +1 left1 +3 left3 + +# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join key) +# that pushes through the LeftAnti join to both the preserved and non-preserved sides. +query TT +EXPLAIN SELECT l.* +FROM left_parquet l +WHERE l.id NOT IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +logical_plan +01)Sort: l.id ASC NULLS LAST, fetch=2 +02)--LeftAnti Join: l.id = __correlated_sq_1.id +03)----SubqueryAlias: l +04)------TableScan: left_parquet projection=[id, data] +05)----SubqueryAlias: __correlated_sq_1 +06)------SubqueryAlias: r +07)--------TableScan: right_parquet projection=[id] +physical_plan +01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Correctness check +query IT +SELECT l.* +FROM left_parquet l +WHERE l.id NOT IN (SELECT r.id FROM right_parquet r) +ORDER BY l.id LIMIT 2; +---- +2 left2 +4 left4 + # Test 3: Test independent control # Disable TopK, keep Join enabled @@ -438,6 +629,97 @@ physical_plan 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet 04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] +# Test 6: Regression test for issue #20213 - dynamic filter applied to wrong table +# when subquery join has same column names on both sides. +# +# The bug: when an outer join pushes a DynamicFilter for column "k" through an +# inner join where both sides have a column named "k", the name-based routing +# incorrectly pushed the filter to BOTH sides instead of only the correct one. +# This caused wrong results (0 rows instead of expected). + +# Create tables with same column names (k, v) on both sides +statement ok +CREATE TABLE issue_20213_t1(k INT, v INT) AS +SELECT i as k, i as v FROM generate_series(1, 1000) t(i); + +statement ok +CREATE TABLE issue_20213_t2(k INT, v INT) AS +SELECT i + 100 as k, i as v FROM generate_series(1, 100) t(i); + +# Use small row groups to make statistics-based pruning more likely to manifest the bug +statement ok +SET datafusion.execution.parquet.max_row_group_size = 10; + +query I +COPY issue_20213_t1 TO 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t1.parquet' STORED AS PARQUET; +---- +1000 + +query I +COPY issue_20213_t2 TO 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t2.parquet' STORED AS PARQUET; +---- +100 + +# Reset row group size +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1000000; + +statement ok +CREATE EXTERNAL TABLE t1_20213(k INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t1.parquet'; + +statement ok +CREATE EXTERNAL TABLE t2_20213(k INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/issue_20213_t2.parquet'; + +# The query from issue #20213: subquery joins t1 and t2 on v, then outer +# join uses t2's k column. The dynamic filter on k from the outer join +# must only apply to t2 (k range 101-200), NOT to t1 (k range 1-1000). +query I +SELECT count(*) FROM ( + SELECT t2_20213.k as k, t1_20213.k as k2 + FROM t1_20213 + JOIN t2_20213 ON t1_20213.v = t2_20213.v +) a +JOIN t2_20213 b ON a.k = b.k +WHERE b.v < 10; +---- +9 + +# Also verify with SELECT * to catch row-level correctness +query IIII rowsort +SELECT * FROM ( + SELECT t2_20213.k as k, t1_20213.k as k2 + FROM t1_20213 + JOIN t2_20213 ON t1_20213.v = t2_20213.v +) a +JOIN t2_20213 b ON a.k = b.k +WHERE b.v < 10; +---- +101 1 101 1 +102 2 102 2 +103 3 103 3 +104 4 104 4 +105 5 105 5 +106 6 106 6 +107 7 107 7 +108 8 108 8 +109 9 109 9 + +statement ok +DROP TABLE issue_20213_t1; + +statement ok +DROP TABLE issue_20213_t2; + +statement ok +DROP TABLE t1_20213; + +statement ok +DROP TABLE t2_20213; + # Cleanup statement ok