From 37c90e7401e2d0cd171923deed29f8acf610fe5e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 16:53:22 -0500 Subject: [PATCH 01/15] refactor filter pushdown to remove assumptions blocking hashjoinexec pushdown --- .../physical_optimizer/filter_pushdown/mod.rs | 107 ++++++++++++ .../filter_pushdown/util.rs | 30 ++-- datafusion/datasource-parquet/src/source.rs | 24 +-- datafusion/datasource/src/file.rs | 9 +- datafusion/datasource/src/source.rs | 29 ++-- .../physical-optimizer/src/filter_pushdown.rs | 132 +++++++-------- .../physical-plan/src/coalesce_batches.rs | 4 +- .../physical-plan/src/execution_plan.rs | 6 +- datafusion/physical-plan/src/filter.rs | 50 +++--- .../physical-plan/src/filter_pushdown.rs | 154 +++++++++++++++--- .../physical-plan/src/joins/hash_join.rs | 24 ++- .../physical-plan/src/repartition/mod.rs | 4 +- 12 files changed, 390 insertions(+), 183 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 68369bc9d9061..ed44f3bc30405 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -506,6 +506,113 @@ fn schema() -> SchemaRef { Arc::clone(&TEST_SCHEMA) } +#[tokio::test] +async fn test_hashjoin_parent_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("d", Utf8, ["aa", "ab", "ac", "ad"]), + ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("f", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec + let on = vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Create filters that can be pushed down to different sides + // We need to create filters in the context of the join output schema + let join_schema = join.schema(); + + // Filter on build side column: a = 'aa' + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column: e = 'ba' + let right_filter = col_lit_predicate("e", "ba", &join_schema); + // Filter that references both sides: a = d (should not be pushed down) + let cross_filter = Arc::new(BinaryExpr::new( + col("a", &join_schema).unwrap(), + Operator::Eq, + col("d", &join_schema).unwrap(), + )) as Arc; + // Combine all filters into a single predicate using AND + // The left and right filters will be pushed down to their respective sides + // The cross filter will remain at the top level of the join + // and will not be pushed down. + let filter = Arc::new(BinaryExpr::new( + left_filter.clone(), + Operator::And, + right_filter.clone(), + )) as Arc; + let filter = Arc::new(BinaryExpr::new(filter, Operator::And, cross_filter.clone())) + as Arc; + + let plan = Arc::new(FilterExec::try_new(filter, Arc::clone(&join) as _).unwrap()) + as Arc; + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = aa AND e@4 = ba AND a@0 = d@3 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = d@3 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba + " + ); +} + /// Returns a predicate that is a binary expression col = lit fn col_lit_predicate( column_name: &str, diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index d4318235bafbc..79d8895e1431f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -29,7 +29,9 @@ use datafusion_datasource::{ use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPhase; +use datafusion_physical_plan::filter_pushdown::{ + FilterPushdownPhase, PredicateSupportDiscriminant, +}; use datafusion_physical_plan::{ displayable, filter::FilterExec, @@ -227,19 +229,13 @@ impl FileSource for TestSource { predicate: Some(conjunction(filters.clone())), ..self.clone() }); - Ok(FilterPushdownPropagation { - filters: filters - .into_iter() - .map(PredicateSupport::Supported) - .collect(), - updated_node: Some(new_node), - }) + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Supported; filters.len()], + ) + .with_updated_node(new_node)) } else { - Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Unsupported; filters.len()], )) } } @@ -554,19 +550,17 @@ impl ExecutionPlan for TestNode { FilterExec::try_new(Arc::clone(filter), Arc::clone(&self.input))?; let new_self = TestNode::new(false, Arc::new(new_child), self.predicate.clone()); - let mut res = - FilterPushdownPropagation::transparent(child_pushdown_result); + let mut res = FilterPushdownPropagation::all(child_pushdown_result); res.updated_node = Some(Arc::new(new_self) as Arc); Ok(res) } PredicateSupport::Supported(_) => { - let res = - FilterPushdownPropagation::transparent(child_pushdown_result); + let res = FilterPushdownPropagation::all(child_pushdown_result); Ok(res) } } } else { - let res = FilterPushdownPropagation::transparent(child_pushdown_result); + let res = FilterPushdownPropagation::all(child_pushdown_result); Ok(res) } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8ca36e7cd3216..9c2470e578858 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,6 +41,7 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::filter_pushdown::PredicateSupportDiscriminant; use datafusion_physical_plan::filter_pushdown::{ FilterPushdownPropagation, PredicateSupport, }; @@ -622,11 +623,8 @@ impl FileSource for ParquetSource { config: &ConfigOptions, ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { - return Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Unsupported; filters.len()], )); }; // Determine if based on configs we should push filters down. @@ -657,7 +655,9 @@ impl FileSource for ParquetSource { { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. - return Ok(FilterPushdownPropagation::with_filters(filters)); + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + )); } let allowed_filters = filters .iter() @@ -678,15 +678,15 @@ impl FileSource for ParquetSource { // If pushdown_filters is false we tell our parents that they still have to handle the filters, // even if we updated the predicate to include the filters (they will only be used for stats pruning). if !pushdown_filters { - return Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(|f| PredicateSupport::Unsupported(f.into_inner())) - .collect_vec(), + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Unsupported; filters.len()], ) .with_updated_node(source)); } - Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source)) + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + filters.iter().map(|f| f.discriminant()).collect(), + ) + .with_updated_node(source)) } fn with_schema_adapter_factory( diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index a95c07cc319cd..b11071bb1c511 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -31,7 +31,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PredicateSupportDiscriminant, }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -122,11 +122,8 @@ pub trait FileSource: Send + Sync { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Unsupported; filters.len()], )) } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 92c25556382c6..16da1d11f9efd 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -42,7 +42,8 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, + PredicateSupportDiscriminant, }; /// A source of data, typically a list of files or memory @@ -172,11 +173,8 @@ pub trait DataSource: Send + Sync + Debug { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PredicateSupportDiscriminant::Unsupported; filters.len()], )) } } @@ -324,17 +322,14 @@ impl ExecutionPlan for DataSourceExec { config: &ConfigOptions, ) -> Result>> { // Push any remaining filters into our data source - let res = self.data_source.try_pushdown_filters( - child_pushdown_result - .parent_filters - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => expr, - PredicateSupport::Unsupported(expr) => expr, - }) - .collect(), - config, - )?; + let parent_filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|f| f.filter) + .collect_vec(); + let res = self + .data_source + .try_pushdown_filters(parent_filters, config)?; match res.updated_node { Some(data_source) => { let mut new_node = self.clone(); diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index eb7ec81b260ea..dc5905a944de7 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -38,7 +38,8 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, + ChildFitlerPushdownResult, ChildPushdownResult, FilterPushdownPhase, + FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -434,30 +435,43 @@ impl PhysicalOptimizerRule for FilterPushdown { } } -/// Support state of each predicate for the children of the node. -/// These predicates are coming from the parent node. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ParentPredicateStates { - NoChildren, - Unsupported, - Supported, -} - fn push_down_filters( node: Arc, parent_predicates: Vec>, config: &ConfigOptions, phase: FilterPushdownPhase, ) -> Result>> { - // If the node has any child, these will be rewritten as supported or unsupported - let mut parent_predicates_pushdown_states = - vec![ParentPredicateStates::NoChildren; parent_predicates.len()]; + let mut parent_filter_pushdown_supports: Vec> = + vec![vec![]; parent_predicates.len()]; let mut self_filters_pushdown_supports = vec![]; let mut new_children = Vec::with_capacity(node.children().len()); let children = node.children(); let filter_description = node.gather_filters_for_pushdown(phase, parent_predicates.clone(), config)?; + + let filter_description_parent_filters = filter_description.parent_filters(); + let filter_description_self_filters = filter_description.self_filters(); + if filter_description_parent_filters.len() != children.len() { + return Err(datafusion_common::DataFusionError::Internal( + format!( + "Filter pushdown expected FilterDescription to have parent filters for {expected_num_children}, but got {actual_num_children} for node {node_name}", + expected_num_children = children.len(), + actual_num_children = filter_description_parent_filters.len(), + node_name = node.name(), + ), + )); + } + if filter_description_self_filters.len() != children.len() { + return Err(datafusion_common::DataFusionError::Internal( + format!( + "Filter pushdown expected FilterDescription to have self filters for {expected_num_children}, but got {actual_num_children} for node {node_name}", + expected_num_children = children.len(), + actual_num_children = filter_description_self_filters.len(), + node_name = node.name(), + ), + )); + } for (child, parent_filters, self_filters) in izip!( children, @@ -470,31 +484,18 @@ fn push_down_filters( // and tried to be pushed down over the child similarly. let num_self_filters = self_filters.len(); - let mut parent_supported_predicate_indices = vec![]; - let mut all_predicates = self_filters; + let mut all_predicates = self_filters.clone(); // Iterate over each predicate coming from the parent - for (idx, filter) in parent_filters.into_iter().enumerate() { + for filter in parent_filters.into_iter() { // Check if we can push this filter down to our child. // These supports are defined in `gather_filters_for_pushdown()` match filter { PredicateSupport::Supported(predicate) => { // Queue this filter up for pushdown to this child all_predicates.push(predicate); - parent_supported_predicate_indices.push(idx); - // Mark this filter as supported by our children if no child has marked it as unsupported - if parent_predicates_pushdown_states[idx] - != ParentPredicateStates::Unsupported - { - parent_predicates_pushdown_states[idx] = - ParentPredicateStates::Supported; - } - } - PredicateSupport::Unsupported(_) => { - // Mark as unsupported by our children - parent_predicates_pushdown_states[idx] = - ParentPredicateStates::Unsupported; } + PredicateSupport::Unsupported(_) => {} } } @@ -513,57 +514,50 @@ fn push_down_filters( // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. let mut all_filters = result.filters.into_iter().collect_vec(); - let parent_predicates = all_filters.split_off(num_self_filters); - let self_predicates = all_filters; - self_filters_pushdown_supports.push(self_predicates); - - for (idx, result) in parent_supported_predicate_indices - .iter() - .zip(parent_predicates) + if all_filters.len() != num_self_filters + parent_predicates.len() { - let current_node_state = match result { - PredicateSupport::Supported(_) => ParentPredicateStates::Supported, - PredicateSupport::Unsupported(_) => ParentPredicateStates::Unsupported, - }; - match (current_node_state, parent_predicates_pushdown_states[*idx]) { - (r, ParentPredicateStates::NoChildren) => { - // If we have no result, use the current state from this child - parent_predicates_pushdown_states[*idx] = r; - } - (ParentPredicateStates::Supported, ParentPredicateStates::Supported) => { - // If the current child and all previous children are supported, - // the filter continues to support it - parent_predicates_pushdown_states[*idx] = - ParentPredicateStates::Supported; - } - _ => { - // Either the current child or a previous child marked this filter as unsupported - parent_predicates_pushdown_states[*idx] = - ParentPredicateStates::Unsupported; - } - } + return Err(datafusion_common::DataFusionError::Internal( + format!( + "Filter pushdown did not return the expected number of filters: expected {num_self_filters} self filters and {num_parent_filters} parent filters, but got {num_filters_from_child}. Likely culprit is {child}", + num_self_filters = num_self_filters, + num_parent_filters = parent_predicates.len(), + num_filters_from_child = all_filters.len(), + child = child.name(), + ), + )); + } + let parent_filters = all_filters + .split_off(num_self_filters) + .into_iter() + .collect_vec(); + self_filters_pushdown_supports.push(all_filters.into_iter().zip(self_filters).map(|(s, f)| s.wrap_expression(f)).collect()); + for (parent_filter_idx, parent_filter_support) in parent_filters.into_iter().enumerate() + { + parent_filter_pushdown_supports[parent_filter_idx] + .push(parent_filter_support); } } + // Re-create this node with new children let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; - // Remap the result onto the parent filters as they were given to us. - // Any filters that were not pushed down to any children are marked as unsupported. - let parent_pushdown_result = parent_predicates_pushdown_states - .into_iter() - .zip(parent_predicates) - .map(|(state, filter)| match state { - ParentPredicateStates::NoChildren => PredicateSupport::Unsupported(filter), - ParentPredicateStates::Unsupported => PredicateSupport::Unsupported(filter), - ParentPredicateStates::Supported => PredicateSupport::Supported(filter), - }) - .collect(); + // TODO: by calling `handle_child_pushdown_result` we are assuming that the // `ExecutionPlan` implementation will not change the plan itself. // Should we have a separate method for dynamic pushdown that does not allow modifying the plan? let mut res = updated_node.handle_child_pushdown_result( phase, ChildPushdownResult { - parent_filters: parent_pushdown_result, + parent_filters: parent_predicates + .into_iter() + .enumerate() + .map( + |(parent_filter_idx, parent_filter)| ChildFitlerPushdownResult { + filter: parent_filter, + child_results: parent_filter_pushdown_supports[parent_filter_idx] + .clone(), + }, + ) + .collect(), self_filters: self_filters_pushdown_supports, }, config, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index d008db48b82b0..2aa7859e7a867 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -243,9 +243,7 @@ impl ExecutionPlan for CoalesceBatchesExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )) + Ok(FilterPushdownPropagation::all(child_pushdown_result)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 375de7c7385e7..658d3b2346250 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -563,7 +563,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// they have been handled. /// - A `HashJoinExec` might ignore the pushdown result if filters need to /// be applied during the join operation. It passes the parent filters back - /// up wrapped in [`FilterPushdownPropagation::transparent`], discarding + /// up wrapped in [`FilterPushdownPropagation::any`], discarding /// any self-filters from children. /// /// **Example Walkthrough:** @@ -620,9 +620,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )) + Ok(FilterPushdownPropagation::all(child_pushdown_result)) } /// Injects arbitrary run-time state into this execution plan, returning a new plan diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 66886c576620d..30251bc21f07a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -30,7 +30,7 @@ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -481,32 +481,29 @@ impl ExecutionPlan for FilterExec { _config: &ConfigOptions, ) -> Result>> { if !matches!(phase, FilterPushdownPhase::Pre) { - return Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )); + return Ok(FilterPushdownPropagation::all(child_pushdown_result)); } // We absorb any parent filters that were not handled by our children - let mut unhandled_filters = child_pushdown_result - .parent_filters - .iter() - .filter_map(|f| match f { - PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Supported(_) => None, - }) - .collect_vec(); - assert_eq!( - child_pushdown_result.self_filters.len(), - 1, - "FilterExec should only have one child" - ); - let unsupported_self_filters = child_pushdown_result.self_filters[0] - .iter() + let unsupported_parent_filters = + child_pushdown_result.parent_filters.iter().filter_map(|f| { + matches!(f.all(), PredicateSupportDiscriminant::Unsupported) + .then_some(Arc::clone(&f.filter)) + }); + let unsupported_self_filters = child_pushdown_result + .self_filters + .first() + .expect("we have exactly one child") + .into_iter() .filter_map(|f| match f { - PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), PredicateSupport::Supported(_) => None, + PredicateSupport::Unsupported(inner) => Some(inner), }) + .cloned(); + + let unhandled_filters = unsupported_parent_filters + .into_iter() + .chain(unsupported_self_filters) .collect_vec(); - unhandled_filters.extend(unsupported_self_filters); // If we have unhandled filters, we need to create a new FilterExec let filter_input = Arc::clone(self.input()); @@ -555,15 +552,12 @@ impl ExecutionPlan for FilterExec { }; Some(Arc::new(new) as _) }; - // Mark all parent filters as supported since we absorbed them - let supported_filters = child_pushdown_result - .parent_filters - .into_iter() - .map(|f| PredicateSupport::Supported(f.into_inner())) - .collect(); Ok(FilterPushdownPropagation { - filters: supported_filters, + filters: vec![ + PredicateSupportDiscriminant::Supported; + child_pushdown_result.parent_filters.len() + ], updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index b50f86382edf0..eff83300fe87e 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -95,6 +95,7 @@ pub enum PredicateSupport { } impl PredicateSupport { + /// Return the wrapped expression, discarding whether it is supported or unsupported. pub fn into_inner(self) -> Arc { match self { PredicateSupport::Supported(expr) | PredicateSupport::Unsupported(expr) => { @@ -102,6 +103,95 @@ impl PredicateSupport { } } } + + /// Convert the [`PredicateSupport`] into a [`PredicateSupportDiscriminant`]. + pub fn discriminant(&self) -> PredicateSupportDiscriminant { + match self { + PredicateSupport::Supported(_) => PredicateSupportDiscriminant::Supported, + PredicateSupport::Unsupported(_) => PredicateSupportDiscriminant::Unsupported, + } + } +} + +/// Discriminant for the result of pushing down a filter into a child node. +/// This is the same as [`PredicateSupport`], but without the wrapped expression. +#[derive(Debug, Clone, Copy)] +pub enum PredicateSupportDiscriminant { + Supported, + Unsupported, +} + +impl PredicateSupportDiscriminant { + pub fn and( + self, + other: PredicateSupportDiscriminant, + ) -> PredicateSupportDiscriminant { + match (self, other) { + (PredicateSupportDiscriminant::Supported, _) + | (_, PredicateSupportDiscriminant::Supported) => { + PredicateSupportDiscriminant::Supported + } + ( + PredicateSupportDiscriminant::Unsupported, + PredicateSupportDiscriminant::Unsupported, + ) => PredicateSupportDiscriminant::Unsupported, + } + } + + pub fn or(self, other: PredicateSupportDiscriminant) -> PredicateSupportDiscriminant { + match (self, other) { + (PredicateSupportDiscriminant::Supported, _) + | (_, PredicateSupportDiscriminant::Supported) => { + PredicateSupportDiscriminant::Supported + } + ( + PredicateSupportDiscriminant::Unsupported, + PredicateSupportDiscriminant::Unsupported, + ) => PredicateSupportDiscriminant::Unsupported, + } + } + + pub fn wrap_expression(self, expr: Arc) -> PredicateSupport { + match self { + PredicateSupportDiscriminant::Supported => PredicateSupport::Supported(expr), + PredicateSupportDiscriminant::Unsupported => { + PredicateSupport::Unsupported(expr) + } + } + } +} + +/// The result of pushing down a single parent filter into all children. +#[derive(Debug, Clone)] +pub struct ChildFitlerPushdownResult { + pub filter: Arc, + pub child_results: Vec, +} + +impl ChildFitlerPushdownResult { + /// Combien all child results into a single [`PredicateSupport`]. + /// If any child supports the filter, it is considered supported. + /// If all children support the filter, it is considered supported. + /// If no child supports the filter, it is considered unsupported. + pub fn any(&self) -> PredicateSupportDiscriminant { + self.child_results + .iter() + .fold(PredicateSupportDiscriminant::Unsupported, |acc, result| { + acc.or(*result) + }) + } + + /// Combine all child results into a single [`PredicateSupport`]. + /// If all children support the filter, it is considered supported. + /// If any child supports the filter, it is considered supported. + /// If no child supports the filter, it is considered unsupported. + pub fn all(&self) -> PredicateSupportDiscriminant { + self.child_results + .iter() + .fold(PredicateSupportDiscriminant::Supported, |acc, result| { + acc.and(*result) + }) + } } /// The result of pushing down filters into a child node. @@ -112,21 +202,16 @@ impl PredicateSupport { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct ChildPushdownResult { - /// The combined result of pushing down each parent filter into each child. - /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses: - /// - // | filter | child 1 | child 2 | child 3 | result | - // |--------|-------------|-----------|-----------|-------------| - // | a | Supported | Supported | Supported | Supported | - // | b | Unsupported | Supported | Supported | Unsupported | - /// - /// That is: if any child marks a filter as unsupported or if the filter was not pushed - /// down into any child then the result is unsupported. - /// If at least one children and all children that received the filter mark it as supported - /// then the result is supported. - pub parent_filters: Vec, + /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called. + /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them + /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`]. + /// Attached to each filter is a [`PredicateSupportDiscriminant`] *per child* that indicates whether the filter was supported or unsupported by each child. + /// To get combined results see [`ChildFitlerPushdownResult::any`] and [`ChildFitlerPushdownResult::all`]. + pub parent_filters: Vec, /// The result of pushing down each filter this node provided into each of it's children. - /// This is not combined with the parent filters so that nodes can treat each child independently. + /// The outer vector corresponds to each child, and the inner vector corresponds to each filter. + /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all. + /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result). pub self_filters: Vec>, } @@ -140,23 +225,44 @@ pub struct ChildPushdownResult { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { - pub filters: Vec, + pub filters: Vec, pub updated_node: Option, } impl FilterPushdownPropagation { - /// Create a new [`FilterPushdownPropagation`] that tells the parent node - /// that echoes back up to the parent the result of pushing down the filters - /// into the children. - pub fn transparent(child_pushdown_result: ChildPushdownResult) -> Self { + /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter + /// is supported if it was supported by *all* children. + pub fn all(child_pushdown_result: ChildPushdownResult) -> Self { + let filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|result| result.all()) + .collect(); + Self { + filters, + updated_node: None, + } + } + + /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter + /// is supported if it was supported by *any* child. + pub fn any(child_pushdown_result: ChildPushdownResult) -> Self { + let filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|result| result.any()) + .collect(); Self { - filters: child_pushdown_result.parent_filters, + filters, updated_node: None, } } /// Create a new [`FilterPushdownPropagation`] with the specified filter support. - pub fn with_filters(filters: Vec) -> Self { + /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was. + pub fn with_parent_pushdown_result( + filters: Vec, + ) -> Self { Self { filters, updated_node: None, @@ -164,6 +270,8 @@ impl FilterPushdownPropagation { } /// Bind an updated node to the [`FilterPushdownPropagation`]. + /// Use this when the current node wants to update iself in the tree or replace itself with a new node (e.g. one of it's children). + /// You do not need to call this if one of the children of the current node may have updated itself, that is handled by the optimizer. pub fn with_updated_node(mut self, updated_node: T) -> Self { self.updated_node = Some(updated_node); self @@ -175,7 +283,6 @@ pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. - /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters. pub(crate) parent_filters: Vec, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays @@ -244,6 +351,9 @@ impl ChildFilterDescription { } } +/// Result of pushing down filters into children. +/// If a parent filter is supported by *any* child, we will tell the parent node that it is supported. +/// If a parent filter is unsupported by *all* children, we will tell the parent node that it is unsupported. #[derive(Debug, Clone)] pub struct FilterDescription { /// A filter description for each child. diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index c8c4c0806f030..070eaca1d39fe 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -35,6 +35,10 @@ use super::{ use super::{JoinOn, JoinOnRef}; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, @@ -79,7 +83,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; @@ -944,6 +948,24 @@ impl ExecutionPlan for HashJoinExec { try_embed_projection(projection, self) } } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &datafusion_common::config::ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &datafusion_common::config::ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::any(child_pushdown_result)) + } } /// Reads the left (build) side of the input, buffering it in memory, to build a diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 37dc3a7675901..69a4e7cd2bd63 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -821,9 +821,7 @@ impl ExecutionPlan for RepartitionExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )) + Ok(FilterPushdownPropagation::all(child_pushdown_result)) } } From c6bfa01e46513ed6ee8b6ca658dc2f3e86473ec8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 17:11:37 -0500 Subject: [PATCH 02/15] fix assertion --- datafusion/physical-optimizer/src/filter_pushdown.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index dc5905a944de7..29baa3477a2e3 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -499,6 +499,8 @@ fn push_down_filters( } } + let num_parent_filters = all_predicates.len() - num_self_filters; + // Any filters that could not be pushed down to a child are marked as not-supported to our parents let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?; @@ -514,13 +516,13 @@ fn push_down_filters( // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. let mut all_filters = result.filters.into_iter().collect_vec(); - if all_filters.len() != num_self_filters + parent_predicates.len() + if all_filters.len() != num_self_filters + num_parent_filters { return Err(datafusion_common::DataFusionError::Internal( format!( "Filter pushdown did not return the expected number of filters: expected {num_self_filters} self filters and {num_parent_filters} parent filters, but got {num_filters_from_child}. Likely culprit is {child}", num_self_filters = num_self_filters, - num_parent_filters = parent_predicates.len(), + num_parent_filters = num_parent_filters, num_filters_from_child = all_filters.len(), child = child.name(), ), From 1c4b3999f2781ae555e4f80bbb20f9f3c76ab351 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:21:34 -0500 Subject: [PATCH 03/15] fix --- .../physical-optimizer/src/filter_pushdown.rs | 49 ++++++++++++++----- .../physical-plan/src/filter_pushdown.rs | 11 ++--- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 29baa3477a2e3..f2037e47447c2 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -449,7 +449,7 @@ fn push_down_filters( let children = node.children(); let filter_description = node.gather_filters_for_pushdown(phase, parent_predicates.clone(), config)?; - + let filter_description_parent_filters = filter_description.parent_filters(); let filter_description_self_filters = filter_description.self_filters(); if filter_description_parent_filters.len() != children.len() { @@ -473,11 +473,13 @@ fn push_down_filters( )); } - for (child, parent_filters, self_filters) in izip!( + for (child_idx, (child, parent_filters, self_filters)) in izip!( children, filter_description.parent_filters(), filter_description.self_filters() - ) { + ) + .enumerate() + { // Here, `parent_filters` are the predicates which are provided by the parent node of // the current node, and tried to be pushed down over the child which the loop points // currently. `self_filters` are the predicates which are provided by the current node, @@ -486,16 +488,23 @@ fn push_down_filters( let num_self_filters = self_filters.len(); let mut all_predicates = self_filters.clone(); + // Track which parent filters are supported for this child + let mut parent_filter_indices = vec![]; + // Iterate over each predicate coming from the parent - for filter in parent_filters.into_iter() { + for (parent_filter_idx, filter) in parent_filters.into_iter().enumerate() { // Check if we can push this filter down to our child. // These supports are defined in `gather_filters_for_pushdown()` match filter { PredicateSupport::Supported(predicate) => { // Queue this filter up for pushdown to this child all_predicates.push(predicate); + parent_filter_indices.push(parent_filter_idx); + } + PredicateSupport::Unsupported(_) => { + // This filter won't be pushed down to this child + // Will be marked as unsupported later in the initialization loop } - PredicateSupport::Unsupported(_) => {} } } @@ -516,8 +525,7 @@ fn push_down_filters( // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. let mut all_filters = result.filters.into_iter().collect_vec(); - if all_filters.len() != num_self_filters + num_parent_filters - { + if all_filters.len() != num_self_filters + num_parent_filters { return Err(datafusion_common::DataFusionError::Internal( format!( "Filter pushdown did not return the expected number of filters: expected {num_self_filters} self filters and {num_parent_filters} parent filters, but got {num_filters_from_child}. Likely culprit is {child}", @@ -532,11 +540,30 @@ fn push_down_filters( .split_off(num_self_filters) .into_iter() .collect_vec(); - self_filters_pushdown_supports.push(all_filters.into_iter().zip(self_filters).map(|(s, f)| s.wrap_expression(f)).collect()); - for (parent_filter_idx, parent_filter_support) in parent_filters.into_iter().enumerate() + self_filters_pushdown_supports.push( + all_filters + .into_iter() + .zip(self_filters) + .map(|(s, f)| s.wrap_expression(f)) + .collect(), + ); + + // Start by marking all parent filters as unsupported for this child + for original_parent_idx in 0..parent_predicates.len() { + parent_filter_pushdown_supports[original_parent_idx] + .push(PredicateSupportDiscriminant::Unsupported); + assert_eq!( + parent_filter_pushdown_supports[original_parent_idx].len(), + child_idx + 1, + "Parent filter pushdown supports should have the same length as the number of children" + ); + } + // Map results from pushed-down filters back to original parent filter indices + for (result_idx, parent_filter_support) in parent_filters.into_iter().enumerate() { - parent_filter_pushdown_supports[parent_filter_idx] - .push(parent_filter_support); + let original_parent_idx = parent_filter_indices[result_idx]; + parent_filter_pushdown_supports[original_parent_idx][child_idx] = + parent_filter_support; } } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index eff83300fe87e..9c73b7288b7bf 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -127,14 +127,11 @@ impl PredicateSupportDiscriminant { other: PredicateSupportDiscriminant, ) -> PredicateSupportDiscriminant { match (self, other) { - (PredicateSupportDiscriminant::Supported, _) - | (_, PredicateSupportDiscriminant::Supported) => { - PredicateSupportDiscriminant::Supported - } ( - PredicateSupportDiscriminant::Unsupported, - PredicateSupportDiscriminant::Unsupported, - ) => PredicateSupportDiscriminant::Unsupported, + PredicateSupportDiscriminant::Supported, + PredicateSupportDiscriminant::Supported, + ) => PredicateSupportDiscriminant::Supported, + _ => PredicateSupportDiscriminant::Unsupported, } } From 3166ed07034f112b917dadc83af08de9c64316f8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:38:35 -0500 Subject: [PATCH 04/15] fix any/all --- .../physical-plan/src/filter_pushdown.rs | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 9c73b7288b7bf..a32adf63a6fbd 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -171,11 +171,16 @@ impl ChildFitlerPushdownResult { /// If all children support the filter, it is considered supported. /// If no child supports the filter, it is considered unsupported. pub fn any(&self) -> PredicateSupportDiscriminant { - self.child_results - .iter() - .fold(PredicateSupportDiscriminant::Unsupported, |acc, result| { - acc.or(*result) - }) + if self.child_results.is_empty() { + // If there are no children, filters cannot be supported + PredicateSupportDiscriminant::Unsupported + } else { + self.child_results + .iter() + .fold(PredicateSupportDiscriminant::Unsupported, |acc, result| { + acc.or(*result) + }) + } } /// Combine all child results into a single [`PredicateSupport`]. @@ -183,11 +188,16 @@ impl ChildFitlerPushdownResult { /// If any child supports the filter, it is considered supported. /// If no child supports the filter, it is considered unsupported. pub fn all(&self) -> PredicateSupportDiscriminant { - self.child_results - .iter() - .fold(PredicateSupportDiscriminant::Supported, |acc, result| { - acc.and(*result) - }) + if self.child_results.is_empty() { + // If there are no children, filters cannot be supported + PredicateSupportDiscriminant::Unsupported + } else { + self.child_results + .iter() + .fold(PredicateSupportDiscriminant::Supported, |acc, result| { + acc.and(*result) + }) + } } } From 1f1da8712f257ed4cafecab6bf4897836952e930 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:51:08 -0500 Subject: [PATCH 05/15] remove hashjoinexec impl --- .../physical_optimizer/filter_pushdown/mod.rs | 107 ------------------ .../physical-plan/src/joins/hash_join.rs | 20 +--- 2 files changed, 1 insertion(+), 126 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index ed44f3bc30405..68369bc9d9061 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -506,113 +506,6 @@ fn schema() -> SchemaRef { Arc::clone(&TEST_SCHEMA) } -#[tokio::test] -async fn test_hashjoin_parent_filter_pushdown() { - use datafusion_common::JoinType; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - - // Create build side with limited values - let build_batches = vec![record_batch!( - ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["ba", "bb"]), - ("c", Float64, [1.0, 2.0]) - ) - .unwrap()]; - let build_side_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), - Field::new("c", DataType::Float64, false), - ])); - let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) - .with_support(true) - .with_batches(build_batches) - .build(); - - // Create probe side with more values - let probe_batches = vec![record_batch!( - ("d", Utf8, ["aa", "ab", "ac", "ad"]), - ("e", Utf8, ["ba", "bb", "bc", "bd"]), - ("f", Float64, [1.0, 2.0, 3.0, 4.0]) - ) - .unwrap()]; - let probe_side_schema = Arc::new(Schema::new(vec![ - Field::new("d", DataType::Utf8, false), - Field::new("e", DataType::Utf8, false), - Field::new("f", DataType::Float64, false), - ])); - let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) - .with_support(true) - .with_batches(probe_batches) - .build(); - - // Create HashJoinExec - let on = vec![( - col("a", &build_side_schema).unwrap(), - col("d", &probe_side_schema).unwrap(), - )]; - let join = Arc::new( - HashJoinExec::try_new( - build_scan, - probe_scan, - on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - datafusion_common::NullEquality::NullEqualsNothing, - ) - .unwrap(), - ); - - // Create filters that can be pushed down to different sides - // We need to create filters in the context of the join output schema - let join_schema = join.schema(); - - // Filter on build side column: a = 'aa' - let left_filter = col_lit_predicate("a", "aa", &join_schema); - // Filter on probe side column: e = 'ba' - let right_filter = col_lit_predicate("e", "ba", &join_schema); - // Filter that references both sides: a = d (should not be pushed down) - let cross_filter = Arc::new(BinaryExpr::new( - col("a", &join_schema).unwrap(), - Operator::Eq, - col("d", &join_schema).unwrap(), - )) as Arc; - // Combine all filters into a single predicate using AND - // The left and right filters will be pushed down to their respective sides - // The cross filter will remain at the top level of the join - // and will not be pushed down. - let filter = Arc::new(BinaryExpr::new( - left_filter.clone(), - Operator::And, - right_filter.clone(), - )) as Arc; - let filter = Arc::new(BinaryExpr::new(filter, Operator::And, cross_filter.clone())) - as Arc; - - let plan = Arc::new(FilterExec::try_new(filter, Arc::clone(&join) as _).unwrap()) - as Arc; - - // Test that filters are pushed down correctly to each side of the join - insta::assert_snapshot!( - OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), - @r" - OptimizationTest: - input: - - FilterExec: a@0 = aa AND e@4 = ba AND a@0 = d@3 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true - output: - Ok: - - FilterExec: a@0 = d@3 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba - " - ); -} - /// Returns a predicate that is a binary expression col = lit fn col_lit_predicate( column_name: &str, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 070eaca1d39fe..78e9a54f02b6f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -83,7 +83,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; +use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; @@ -948,24 +948,6 @@ impl ExecutionPlan for HashJoinExec { try_embed_projection(projection, self) } } - - fn gather_filters_for_pushdown( - &self, - _phase: FilterPushdownPhase, - parent_filters: Vec>, - _config: &datafusion_common::config::ConfigOptions, - ) -> Result { - FilterDescription::from_children(parent_filters, &self.children()) - } - - fn handle_child_pushdown_result( - &self, - _phase: FilterPushdownPhase, - child_pushdown_result: ChildPushdownResult, - _config: &datafusion_common::config::ConfigOptions, - ) -> Result>> { - Ok(FilterPushdownPropagation::any(child_pushdown_result)) - } } /// Reads the left (build) side of the input, buffering it in memory, to build a From e279b85bf20211c6fcedea5516b64f1d419bfc6c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 19:53:01 -0500 Subject: [PATCH 06/15] lint --- datafusion/physical-plan/src/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 30251bc21f07a..1156c6c1c3a95 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -493,7 +493,7 @@ impl ExecutionPlan for FilterExec { .self_filters .first() .expect("we have exactly one child") - .into_iter() + .iter() .filter_map(|f| match f { PredicateSupport::Supported(_) => None, PredicateSupport::Unsupported(inner) => Some(inner), From c9a330c5ff3e984824067fdc23a5fb5d047c94c7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 20:06:40 -0500 Subject: [PATCH 07/15] resolve merge --- datafusion/physical-plan/src/joins/hash_join.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 78e9a54f02b6f..c8c4c0806f030 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -35,10 +35,6 @@ use super::{ use super::{JoinOn, JoinOnRef}; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; -use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, -}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, From 3ae2e4eaddd164300b936692cd6a866e6c6864b3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 20:12:17 -0500 Subject: [PATCH 08/15] fix imports --- datafusion/datasource/src/source.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 16da1d11f9efd..6572bd97b07cf 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -42,8 +42,7 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, - PredicateSupportDiscriminant, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant }; /// A source of data, typically a list of files or memory From 7e7d3664afbcf55a08bd75463b6b6d440c750930 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 20:14:49 -0500 Subject: [PATCH 09/15] fix merge --- datafusion/datasource/src/source.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 6572bd97b07cf..98a3a2f94f3bc 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -42,7 +42,8 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, + PredicateSupportDiscriminant, }; /// A source of data, typically a list of files or memory @@ -328,7 +329,7 @@ impl ExecutionPlan for DataSourceExec { .collect_vec(); let res = self .data_source - .try_pushdown_filters(parent_filters, config)?; + .try_pushdown_filters(parent_filters.clone(), config)?; match res.updated_node { Some(data_source) => { let mut new_node = self.clone(); @@ -340,9 +341,10 @@ impl ExecutionPlan for DataSourceExec { let filter = conjunction( res.filters .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Unsupported(_) => None, + .zip(parent_filters) + .filter_map(|(s, f)| match s { + PredicateSupportDiscriminant::Supported => Some(f), + PredicateSupportDiscriminant::Unsupported => None, }) .collect_vec(), ); From 20c74b9bedc049338c66ca04d11f052df3487d5e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 20:24:06 -0500 Subject: [PATCH 10/15] fix typo --- datafusion/physical-plan/src/filter_pushdown.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index a32adf63a6fbd..808573726a695 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -166,7 +166,7 @@ pub struct ChildFitlerPushdownResult { } impl ChildFitlerPushdownResult { - /// Combien all child results into a single [`PredicateSupport`]. + /// Combine all child results into a single [`PredicateSupport`]. /// If any child supports the filter, it is considered supported. /// If all children support the filter, it is considered supported. /// If no child supports the filter, it is considered unsupported. From 542d3f0c27063894017387db5bd83c2f78a12eb1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 9 Jul 2025 23:41:13 -0500 Subject: [PATCH 11/15] lint --- datafusion/physical-optimizer/src/filter_pushdown.rs | 10 +++++----- datafusion/physical-plan/src/execution_plan.rs | 9 +++++---- datafusion/physical-plan/src/filter_pushdown.rs | 8 ++++---- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index f2037e47447c2..d5fc5c3729460 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -38,7 +38,7 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildFitlerPushdownResult, ChildPushdownResult, FilterPushdownPhase, + ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -549,11 +549,11 @@ fn push_down_filters( ); // Start by marking all parent filters as unsupported for this child - for original_parent_idx in 0..parent_predicates.len() { - parent_filter_pushdown_supports[original_parent_idx] + for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { + parent_filter_pushdown_support .push(PredicateSupportDiscriminant::Unsupported); assert_eq!( - parent_filter_pushdown_supports[original_parent_idx].len(), + parent_filter_pushdown_support.len(), child_idx + 1, "Parent filter pushdown supports should have the same length as the number of children" ); @@ -580,7 +580,7 @@ fn push_down_filters( .into_iter() .enumerate() .map( - |(parent_filter_idx, parent_filter)| ChildFitlerPushdownResult { + |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult { filter: parent_filter, child_results: parent_filter_pushdown_supports[parent_filter_idx] .clone(), diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 658d3b2346250..d35339b41cbad 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -597,10 +597,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// **Helper Methods for Customization:** /// There are various helper methods to simplify implementing this method: - /// - [`FilterPushdownPropagation::transparent`]: Indicates that the node - /// supports filter pushdown but does not modify it, simply transmitting - /// the children's pushdown results back up to its parent. - /// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters + /// - [`FilterPushdownPropagation::any`]: Marks all parent filters as + /// supported as long as at least one child supports them. + /// - [`FilterPushdownPropagation::all`]: Marks all parent filters as + /// supported as long as all children support them. + /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters /// to the propagation result, indicating which filters are supported by /// the current node. /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 808573726a695..52ae2522af53a 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -160,12 +160,12 @@ impl PredicateSupportDiscriminant { /// The result of pushing down a single parent filter into all children. #[derive(Debug, Clone)] -pub struct ChildFitlerPushdownResult { +pub struct ChildFilterPushdownResult { pub filter: Arc, pub child_results: Vec, } -impl ChildFitlerPushdownResult { +impl ChildFilterPushdownResult { /// Combine all child results into a single [`PredicateSupport`]. /// If any child supports the filter, it is considered supported. /// If all children support the filter, it is considered supported. @@ -213,8 +213,8 @@ pub struct ChildPushdownResult { /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`]. /// Attached to each filter is a [`PredicateSupportDiscriminant`] *per child* that indicates whether the filter was supported or unsupported by each child. - /// To get combined results see [`ChildFitlerPushdownResult::any`] and [`ChildFitlerPushdownResult::all`]. - pub parent_filters: Vec, + /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`]. + pub parent_filters: Vec, /// The result of pushing down each filter this node provided into each of it's children. /// The outer vector corresponds to each child, and the inner vector corresponds to each filter. /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all. From 46f04697de2b873711ee7cf6f9ea803ae6aef216 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 10 Jul 2025 08:39:43 -0400 Subject: [PATCH 12/15] Improve some documentation in filter pushdown (#32) --- .../physical-plan/src/filter_pushdown.rs | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 52ae2522af53a..b9169725f94ee 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -90,12 +90,14 @@ impl std::fmt::Display for FilterPushdownPhase { /// could not handle. #[derive(Debug, Clone)] pub enum PredicateSupport { + /// The predicate was successfully pushed down into the child node. Supported(Arc), + /// The predicate could not be pushed down into the child node. Unsupported(Arc), } impl PredicateSupport { - /// Return the wrapped expression, discarding whether it is supported or unsupported. + /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported. pub fn into_inner(self) -> Arc { match self { PredicateSupport::Supported(expr) | PredicateSupport::Unsupported(expr) => { @@ -113,11 +115,16 @@ impl PredicateSupport { } } -/// Discriminant for the result of pushing down a filter into a child node. +/// Used by [`FilterPushdownPropagation`] and [`PredicateSupport`] to +/// communicate the result of attempting to push down a filter into a child +/// node. +/// /// This is the same as [`PredicateSupport`], but without the wrapped expression. #[derive(Debug, Clone, Copy)] pub enum PredicateSupportDiscriminant { + /// The predicate was successfully pushed down into the child node. Supported, + /// The predicate could not be pushed down into the child node. Unsupported, } @@ -202,6 +209,7 @@ impl ChildFilterPushdownResult { } /// The result of pushing down filters into a child node. +/// /// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. /// Nodes process this result and convert it into a [`FilterPushdownPropagation`] /// that is returned to their parent. @@ -222,17 +230,20 @@ pub struct ChildPushdownResult { pub self_filters: Vec>, } -/// The result of pushing down filters into a node that it returns to its parent. -/// This is what nodes return from [`ExecutionPlan::handle_child_pushdown_result`] to communicate +/// The result of pushing down filters into a node. +/// +/// Returned from [`ExecutionPlan::handle_child_pushdown_result`] to communicate /// to the optimizer: /// -/// 1. What to do with any parent filters that were not completely handled by the children. +/// 1. What to do with any parent filters that were could not be pushed down into the children. /// 2. If the node needs to be replaced in the execution plan with a new node or not. /// /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { + /// What filters were pushed into the parent node. pub filters: Vec, + /// The updated node, if it was updated during pushdown pub updated_node: Option, } From fb6b8a2c6497f7ff243ea24e0a8cb289b3f67572 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 10 Jul 2025 08:28:06 -0500 Subject: [PATCH 13/15] refactor to a struct with a field instead of enum --- .../filter_pushdown/util.rs | 24 ++-- datafusion/datasource-parquet/src/source.rs | 26 ++-- datafusion/datasource/src/file.rs | 6 +- datafusion/datasource/src/source.rs | 9 +- .../physical-optimizer/src/filter_pushdown.rs | 15 +-- .../physical-plan/src/execution_plan.rs | 6 +- datafusion/physical-plan/src/filter.rs | 18 +-- .../physical-plan/src/filter_pushdown.rs | 125 ++++++++---------- 8 files changed, 102 insertions(+), 127 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 79d8895e1431f..507322f8b34c2 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -29,15 +29,13 @@ use datafusion_datasource::{ use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPhase, PredicateSupportDiscriminant, -}; +use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown}; use datafusion_physical_plan::{ displayable, filter::FilterExec, filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, }, metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -230,12 +228,12 @@ impl FileSource for TestSource { ..self.clone() }); Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Supported; filters.len()], + vec![PushedDown::Yes; filters.len()], ) .with_updated_node(new_node)) } else { Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + vec![PushedDown::No; filters.len()], )) } } @@ -543,18 +541,22 @@ impl ExecutionPlan for TestNode { assert_eq!(self_pushdown_result.len(), 1); let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect(); - match &self_pushdown_result[0] { - PredicateSupport::Unsupported(filter) => { + let first_pushdown_result = self_pushdown_result[0].clone(); + + match &first_pushdown_result.discriminant { + PushedDown::No => { // We have a filter to push down - let new_child = - FilterExec::try_new(Arc::clone(filter), Arc::clone(&self.input))?; + let new_child = FilterExec::try_new( + Arc::clone(&first_pushdown_result.predicate), + Arc::clone(&self.input), + )?; let new_self = TestNode::new(false, Arc::new(new_child), self.predicate.clone()); let mut res = FilterPushdownPropagation::all(child_pushdown_result); res.updated_node = Some(Arc::new(new_self) as Arc); Ok(res) } - PredicateSupport::Supported(_) => { + PushedDown::Yes => { let res = FilterPushdownPropagation::all(child_pushdown_result); Ok(res) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 9c2470e578858..0c73661a6de66 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,9 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_plan::filter_pushdown::PredicateSupportDiscriminant; +use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PushedDownPredicate, }; use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -624,7 +624,7 @@ impl FileSource for ParquetSource { ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + vec![PushedDown::No; filters.len()], )); }; // Determine if based on configs we should push filters down. @@ -639,31 +639,31 @@ impl FileSource for ParquetSource { let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled; let mut source = self.clone(); - let filters: Vec = filters + let filters: Vec = filters .into_iter() .map(|filter| { if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) { - PredicateSupport::Supported(filter) + PushedDownPredicate::supported(filter) } else { - PredicateSupport::Unsupported(filter) + PushedDownPredicate::unsupported(filter) } }) .collect(); if filters .iter() - .all(|f| matches!(f, PredicateSupport::Unsupported(_))) + .all(|f| matches!(f.discriminant, PushedDown::No)) { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + vec![PushedDown::No; filters.len()], )); } let allowed_filters = filters .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Unsupported(_) => None, + .filter_map(|f| match f.discriminant { + PushedDown::Yes => Some(Arc::clone(&f.predicate)), + PushedDown::No => None, }) .collect_vec(); let predicate = match source.predicate { @@ -679,12 +679,12 @@ impl FileSource for ParquetSource { // even if we updated the predicate to include the filters (they will only be used for stats pruning). if !pushdown_filters { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + vec![PushedDown::No; filters.len()], ) .with_updated_node(source)); } Ok(FilterPushdownPropagation::with_parent_pushdown_result( - filters.iter().map(|f| f.discriminant()).collect(), + filters.iter().map(|f| f.discriminant).collect(), ) .with_updated_node(source)) } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index b11071bb1c511..29fa38a8ee36e 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -30,9 +30,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; -use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PredicateSupportDiscriminant, -}; +use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -123,7 +121,7 @@ pub trait FileSource: Send + Sync { _config: &ConfigOptions, ) -> Result>> { Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + vec![PushedDown::No; filters.len()], )) } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 98a3a2f94f3bc..a002d4992cb26 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -42,8 +42,7 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, - PredicateSupportDiscriminant, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, }; /// A source of data, typically a list of files or memory @@ -174,7 +173,7 @@ pub trait DataSource: Send + Sync + Debug { _config: &ConfigOptions, ) -> Result>> { Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PredicateSupportDiscriminant::Unsupported; filters.len()], + vec![PushedDown::No; filters.len()], )) } } @@ -343,8 +342,8 @@ impl ExecutionPlan for DataSourceExec { .iter() .zip(parent_filters) .filter_map(|(s, f)| match s { - PredicateSupportDiscriminant::Supported => Some(f), - PredicateSupportDiscriminant::Unsupported => None, + PushedDown::Yes => Some(f), + PushedDown::No => None, }) .collect_vec(), ); diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index d5fc5c3729460..66ccc1a798537 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -39,7 +39,7 @@ use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant, + FilterPushdownPropagation, PushedDown, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -441,7 +441,7 @@ fn push_down_filters( config: &ConfigOptions, phase: FilterPushdownPhase, ) -> Result>> { - let mut parent_filter_pushdown_supports: Vec> = + let mut parent_filter_pushdown_supports: Vec> = vec![vec![]; parent_predicates.len()]; let mut self_filters_pushdown_supports = vec![]; let mut new_children = Vec::with_capacity(node.children().len()); @@ -495,13 +495,13 @@ fn push_down_filters( for (parent_filter_idx, filter) in parent_filters.into_iter().enumerate() { // Check if we can push this filter down to our child. // These supports are defined in `gather_filters_for_pushdown()` - match filter { - PredicateSupport::Supported(predicate) => { + match filter.discriminant { + PushedDown::Yes => { // Queue this filter up for pushdown to this child - all_predicates.push(predicate); + all_predicates.push(filter.predicate); parent_filter_indices.push(parent_filter_idx); } - PredicateSupport::Unsupported(_) => { + PushedDown::No => { // This filter won't be pushed down to this child // Will be marked as unsupported later in the initialization loop } @@ -550,8 +550,7 @@ fn push_down_filters( // Start by marking all parent filters as unsupported for this child for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { - parent_filter_pushdown_support - .push(PredicateSupportDiscriminant::Unsupported); + parent_filter_pushdown_support.push(PushedDown::No); assert_eq!( parent_filter_pushdown_support.len(), child_idx + 1, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d35339b41cbad..b905ae6e091cf 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -18,7 +18,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PushedDownPredicate, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -525,7 +525,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { let mut desc = FilterDescription::new(); let child_filters = parent_filters .iter() - .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) .collect_vec(); for _ in 0..self.children().len() { desc = desc.with_child(ChildFilterDescription { @@ -614,7 +614,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// operator may or may not be allowed to modify the plan. See /// [`FilterPushdownPhase`] for more details on phase-specific behavior. /// - /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported + /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 1156c6c1c3a95..9688c4fb6d595 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -30,7 +30,7 @@ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, PredicateSupportDiscriminant, + FilterPushdownPropagation, PushedDown, PushedDownPredicate, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -455,7 +455,7 @@ impl ExecutionPlan for FilterExec { // For non-pre phase, filters pass through unchanged let filter_supports = parent_filters .into_iter() - .map(PredicateSupport::Supported) + .map(PushedDownPredicate::supported) .collect(); return Ok(FilterDescription::new().with_child(ChildFilterDescription { parent_filters: filter_supports, @@ -486,17 +486,16 @@ impl ExecutionPlan for FilterExec { // We absorb any parent filters that were not handled by our children let unsupported_parent_filters = child_pushdown_result.parent_filters.iter().filter_map(|f| { - matches!(f.all(), PredicateSupportDiscriminant::Unsupported) - .then_some(Arc::clone(&f.filter)) + matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter)) }); let unsupported_self_filters = child_pushdown_result .self_filters .first() .expect("we have exactly one child") .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(_) => None, - PredicateSupport::Unsupported(inner) => Some(inner), + .filter_map(|f| match f.discriminant { + PushedDown::Yes => None, + PushedDown::No => Some(&f.predicate), }) .cloned(); @@ -554,10 +553,7 @@ impl ExecutionPlan for FilterExec { }; Ok(FilterPushdownPropagation { - filters: vec![ - PredicateSupportDiscriminant::Supported; - child_pushdown_result.parent_filters.len() - ], + filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()], updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index b9169725f94ee..3089d1cd0ebe7 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -86,81 +86,63 @@ impl std::fmt::Display for FilterPushdownPhase { /// The result of a plan for pushing down a filter into a child node. /// This contains references to filters so that nodes can mutate a filter /// before pushing it down to a child node (e.g. to adjust a projection) -/// or can directly take ownership of `Unsupported` filters that their children +/// or can directly take ownership of filters that their children /// could not handle. #[derive(Debug, Clone)] -pub enum PredicateSupport { - /// The predicate was successfully pushed down into the child node. - Supported(Arc), - /// The predicate could not be pushed down into the child node. - Unsupported(Arc), +pub struct PushedDownPredicate { + pub discriminant: PushedDown, + pub predicate: Arc, } -impl PredicateSupport { - /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported. +impl PushedDownPredicate { + /// Return the wrapped expression, discarding whether it is supported or unsupported. pub fn into_inner(self) -> Arc { - match self { - PredicateSupport::Supported(expr) | PredicateSupport::Unsupported(expr) => { - expr - } + self.predicate + } + + /// Create a new [`PushedDownPredicate`] with supported pushdown. + pub fn supported(predicate: Arc) -> Self { + Self { + discriminant: PushedDown::Yes, + predicate, } } - /// Convert the [`PredicateSupport`] into a [`PredicateSupportDiscriminant`]. - pub fn discriminant(&self) -> PredicateSupportDiscriminant { - match self { - PredicateSupport::Supported(_) => PredicateSupportDiscriminant::Supported, - PredicateSupport::Unsupported(_) => PredicateSupportDiscriminant::Unsupported, + /// Create a new [`PushedDownPredicate`] with unsupported pushdown. + pub fn unsupported(predicate: Arc) -> Self { + Self { + discriminant: PushedDown::No, + predicate, } } } -/// Used by [`FilterPushdownPropagation`] and [`PredicateSupport`] to -/// communicate the result of attempting to push down a filter into a child -/// node. -/// -/// This is the same as [`PredicateSupport`], but without the wrapped expression. +/// Discriminant for the result of pushing down a filter into a child node. #[derive(Debug, Clone, Copy)] -pub enum PredicateSupportDiscriminant { - /// The predicate was successfully pushed down into the child node. - Supported, - /// The predicate could not be pushed down into the child node. - Unsupported, +pub enum PushedDown { + Yes, + No, } -impl PredicateSupportDiscriminant { - pub fn and( - self, - other: PredicateSupportDiscriminant, - ) -> PredicateSupportDiscriminant { +impl PushedDown { + pub fn and(self, other: PushedDown) -> PushedDown { match (self, other) { - ( - PredicateSupportDiscriminant::Supported, - PredicateSupportDiscriminant::Supported, - ) => PredicateSupportDiscriminant::Supported, - _ => PredicateSupportDiscriminant::Unsupported, + (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes, + _ => PushedDown::No, } } - pub fn or(self, other: PredicateSupportDiscriminant) -> PredicateSupportDiscriminant { + pub fn or(self, other: PushedDown) -> PushedDown { match (self, other) { - (PredicateSupportDiscriminant::Supported, _) - | (_, PredicateSupportDiscriminant::Supported) => { - PredicateSupportDiscriminant::Supported - } - ( - PredicateSupportDiscriminant::Unsupported, - PredicateSupportDiscriminant::Unsupported, - ) => PredicateSupportDiscriminant::Unsupported, + (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes, + (PushedDown::No, PushedDown::No) => PushedDown::No, } } - pub fn wrap_expression(self, expr: Arc) -> PredicateSupport { - match self { - PredicateSupportDiscriminant::Supported => PredicateSupport::Supported(expr), - PredicateSupportDiscriminant::Unsupported => { - PredicateSupport::Unsupported(expr) - } + pub fn wrap_expression(self, expr: Arc) -> PushedDownPredicate { + PushedDownPredicate { + discriminant: self, + predicate: expr, } } } @@ -169,7 +151,7 @@ impl PredicateSupportDiscriminant { #[derive(Debug, Clone)] pub struct ChildFilterPushdownResult { pub filter: Arc, - pub child_results: Vec, + pub child_results: Vec, } impl ChildFilterPushdownResult { @@ -177,16 +159,14 @@ impl ChildFilterPushdownResult { /// If any child supports the filter, it is considered supported. /// If all children support the filter, it is considered supported. /// If no child supports the filter, it is considered unsupported. - pub fn any(&self) -> PredicateSupportDiscriminant { + pub fn any(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported - PredicateSupportDiscriminant::Unsupported + PushedDown::No } else { self.child_results .iter() - .fold(PredicateSupportDiscriminant::Unsupported, |acc, result| { - acc.or(*result) - }) + .fold(PushedDown::No, |acc, result| acc.or(*result)) } } @@ -194,16 +174,14 @@ impl ChildFilterPushdownResult { /// If all children support the filter, it is considered supported. /// If any child supports the filter, it is considered supported. /// If no child supports the filter, it is considered unsupported. - pub fn all(&self) -> PredicateSupportDiscriminant { + pub fn all(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported - PredicateSupportDiscriminant::Unsupported + PushedDown::No } else { self.child_results .iter() - .fold(PredicateSupportDiscriminant::Supported, |acc, result| { - acc.and(*result) - }) + .fold(PushedDown::Yes, |acc, result| acc.and(*result)) } } } @@ -220,14 +198,14 @@ pub struct ChildPushdownResult { /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called. /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`]. - /// Attached to each filter is a [`PredicateSupportDiscriminant`] *per child* that indicates whether the filter was supported or unsupported by each child. + /// Attached to each filter is a [`PushedDown`] *per child* that indicates whether the filter was supported or unsupported by each child. /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`]. pub parent_filters: Vec, /// The result of pushing down each filter this node provided into each of it's children. /// The outer vector corresponds to each child, and the inner vector corresponds to each filter. /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all. /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result). - pub self_filters: Vec>, + pub self_filters: Vec>, } /// The result of pushing down filters into a node. @@ -241,9 +219,13 @@ pub struct ChildPushdownResult { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { +<<<<<<< HEAD /// What filters were pushed into the parent node. pub filters: Vec, /// The updated node, if it was updated during pushdown +======= + pub filters: Vec, +>>>>>>> ecbe71f8c (refactor to a struct with a field instead of enum) pub updated_node: Option, } @@ -278,9 +260,7 @@ impl FilterPushdownPropagation { /// Create a new [`FilterPushdownPropagation`] with the specified filter support. /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was. - pub fn with_parent_pushdown_result( - filters: Vec, - ) -> Self { + pub fn with_parent_pushdown_result(filters: Vec) -> Self { Self { filters, updated_node: None, @@ -301,7 +281,7 @@ pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. - pub(crate) parent_filters: Vec, + pub(crate) parent_filters: Vec, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays /// instead of having to track supported/unsupported. @@ -342,11 +322,12 @@ impl ChildFilterDescription { // Need to reassign column indices to match child schema let reassigned_filter = reassign_predicate_columns(Arc::clone(filter), &child_schema, false)?; - child_parent_filters.push(PredicateSupport::Supported(reassigned_filter)); + child_parent_filters + .push(PushedDownPredicate::supported(reassigned_filter)); } else { // Some columns don't exist in child - cannot push down child_parent_filters - .push(PredicateSupport::Unsupported(Arc::clone(filter))); + .push(PushedDownPredicate::unsupported(Arc::clone(filter))); } } @@ -419,7 +400,7 @@ impl FilterDescription { Ok(desc) } - pub fn parent_filters(&self) -> Vec> { + pub fn parent_filters(&self) -> Vec> { self.child_filter_descriptions .iter() .map(|d| &d.parent_filters) From 39d1d0c42a8ce10d291440593127f1f0e15a11ad Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 10 Jul 2025 08:37:18 -0500 Subject: [PATCH 14/15] better docs, rename methods --- .../filter_pushdown/util.rs | 7 +-- .../physical-plan/src/coalesce_batches.rs | 2 +- .../physical-plan/src/execution_plan.rs | 2 +- datafusion/physical-plan/src/filter.rs | 2 +- .../physical-plan/src/filter_pushdown.rs | 51 ++++++++++++------- .../physical-plan/src/repartition/mod.rs | 2 +- 6 files changed, 40 insertions(+), 26 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 507322f8b34c2..ea12bea1cf89a 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -552,17 +552,18 @@ impl ExecutionPlan for TestNode { )?; let new_self = TestNode::new(false, Arc::new(new_child), self.predicate.clone()); - let mut res = FilterPushdownPropagation::all(child_pushdown_result); + let mut res = + FilterPushdownPropagation::if_all(child_pushdown_result); res.updated_node = Some(Arc::new(new_self) as Arc); Ok(res) } PushedDown::Yes => { - let res = FilterPushdownPropagation::all(child_pushdown_result); + let res = FilterPushdownPropagation::if_all(child_pushdown_result); Ok(res) } } } else { - let res = FilterPushdownPropagation::all(child_pushdown_result); + let res = FilterPushdownPropagation::if_all(child_pushdown_result); Ok(res) } } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 2aa7859e7a867..d98530d28e918 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -243,7 +243,7 @@ impl ExecutionPlan for CoalesceBatchesExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::all(child_pushdown_result)) + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b905ae6e091cf..15e504a4d8f0b 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -621,7 +621,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::all(child_pushdown_result)) + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } /// Injects arbitrary run-time state into this execution plan, returning a new plan diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 9688c4fb6d595..8157e1b721a68 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -481,7 +481,7 @@ impl ExecutionPlan for FilterExec { _config: &ConfigOptions, ) -> Result>> { if !matches!(phase, FilterPushdownPhase::Pre) { - return Ok(FilterPushdownPropagation::all(child_pushdown_result)); + return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); } // We absorb any parent filters that were not handled by our children let unsupported_parent_filters = diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 3089d1cd0ebe7..a3e94a75c8e77 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -95,7 +95,7 @@ pub struct PushedDownPredicate { } impl PushedDownPredicate { - /// Return the wrapped expression, discarding whether it is supported or unsupported. + /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported. pub fn into_inner(self) -> Arc { self.predicate } @@ -120,11 +120,14 @@ impl PushedDownPredicate { /// Discriminant for the result of pushing down a filter into a child node. #[derive(Debug, Clone, Copy)] pub enum PushedDown { + /// The predicate was successfully pushed down into the child node. Yes, + /// The predicate could not be pushed down into the child node. No, } impl PushedDown { + /// Logical AND operation: returns `Yes` only if both operands are `Yes`. pub fn and(self, other: PushedDown) -> PushedDown { match (self, other) { (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes, @@ -132,6 +135,7 @@ impl PushedDown { } } + /// Logical OR operation: returns `Yes` if either operand is `Yes`. pub fn or(self, other: PushedDown) -> PushedDown { match (self, other) { (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes, @@ -139,6 +143,7 @@ impl PushedDown { } } + /// Wrap a [`PhysicalExpr`] with this pushdown result. pub fn wrap_expression(self, expr: Arc) -> PushedDownPredicate { PushedDownPredicate { discriminant: self, @@ -155,10 +160,9 @@ pub struct ChildFilterPushdownResult { } impl ChildFilterPushdownResult { - /// Combine all child results into a single [`PredicateSupport`]. - /// If any child supports the filter, it is considered supported. - /// If all children support the filter, it is considered supported. - /// If no child supports the filter, it is considered unsupported. + /// Combine all child results using OR logic. + /// Returns `Yes` if **any** child supports the filter. + /// Returns `No` if **all** children reject the filter or if there are no children. pub fn any(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported @@ -170,10 +174,9 @@ impl ChildFilterPushdownResult { } } - /// Combine all child results into a single [`PredicateSupport`]. - /// If all children support the filter, it is considered supported. - /// If any child supports the filter, it is considered supported. - /// If no child supports the filter, it is considered unsupported. + /// Combine all child results using AND logic. + /// Returns `Yes` if **all** children support the filter. + /// Returns `No` if **any** child rejects the filter or if there are no children. pub fn all(&self) -> PushedDown { if self.child_results.is_empty() { // If there are no children, filters cannot be supported @@ -219,20 +222,16 @@ pub struct ChildPushdownResult { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { -<<<<<<< HEAD /// What filters were pushed into the parent node. - pub filters: Vec, - /// The updated node, if it was updated during pushdown -======= pub filters: Vec, ->>>>>>> ecbe71f8c (refactor to a struct with a field instead of enum) + /// The updated node, if it was updated during pushdown pub updated_node: Option, } impl FilterPushdownPropagation { /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter /// is supported if it was supported by *all* children. - pub fn all(child_pushdown_result: ChildPushdownResult) -> Self { + pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self { let filters = child_pushdown_result .parent_filters .into_iter() @@ -246,7 +245,7 @@ impl FilterPushdownPropagation { /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter /// is supported if it was supported by *any* child. - pub fn any(child_pushdown_result: ChildPushdownResult) -> Self { + pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self { let filters = child_pushdown_result .parent_filters .into_iter() @@ -276,6 +275,11 @@ impl FilterPushdownPropagation { } } +/// Describes filter pushdown for a single child node. +/// +/// This structure contains two types of filters: +/// - **Parent filters**: Filters received from the parent node, marked as supported or unsupported +/// - **Self filters**: Filters generated by the current node to be pushed down to this child #[derive(Debug, Clone)] pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. @@ -291,6 +295,10 @@ pub struct ChildFilterDescription { impl ChildFilterDescription { /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. /// + /// This method performs column analysis to determine which filters can be pushed down: + /// - If all columns referenced by a filter exist in the child's schema, it can be pushed down + /// - Otherwise, it cannot be pushed down to that child + /// /// See [`FilterDescription::from_children`] for more details pub fn from_child( parent_filters: &[Arc], @@ -350,9 +358,14 @@ impl ChildFilterDescription { } } -/// Result of pushing down filters into children. -/// If a parent filter is supported by *any* child, we will tell the parent node that it is supported. -/// If a parent filter is unsupported by *all* children, we will tell the parent node that it is unsupported. +/// Describes how filters should be pushed down to children. +/// +/// This structure contains filter descriptions for each child node, specifying: +/// - Which parent filters can be pushed down to each child +/// - Which self-generated filters should be pushed down to each child +/// +/// The filter routing is determined by column analysis - filters can only be pushed +/// to children whose schemas contain all the referenced columns. #[derive(Debug, Clone)] pub struct FilterDescription { /// A filter description for each child. diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 69a4e7cd2bd63..754a208126eea 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -821,7 +821,7 @@ impl ExecutionPlan for RepartitionExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::all(child_pushdown_result)) + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } } From caa3cb62748641c9834ca52d9c7dc8a7002558a7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 10 Jul 2025 08:54:43 -0500 Subject: [PATCH 15/15] fix cargo doc --- datafusion/physical-plan/src/execution_plan.rs | 6 +++--- datafusion/sqllogictest/src/engines/postgres_engine/mod.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 15e504a4d8f0b..6d51bf195dc6f 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -563,7 +563,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// they have been handled. /// - A `HashJoinExec` might ignore the pushdown result if filters need to /// be applied during the join operation. It passes the parent filters back - /// up wrapped in [`FilterPushdownPropagation::any`], discarding + /// up wrapped in [`FilterPushdownPropagation::if_any`], discarding /// any self-filters from children. /// /// **Example Walkthrough:** @@ -597,9 +597,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// **Helper Methods for Customization:** /// There are various helper methods to simplify implementing this method: - /// - [`FilterPushdownPropagation::any`]: Marks all parent filters as + /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as /// supported as long as at least one child supports them. - /// - [`FilterPushdownPropagation::all`]: Marks all parent filters as + /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as /// supported as long as all children support them. /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters /// to the propagation result, indicating which filters are supported by diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 68816626bf672..375f06d34b44f 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -93,7 +93,7 @@ impl Postgres { let spawned_task = SpawnedTask::spawn(async move { if let Err(e) = connection.await { - log::error!("Postgres connection error: {:?}", e); + log::error!("Postgres connection error: {e:?}"); } });