From 94318f3e747f893b0f7dfb0f2c4b4a15a1304591 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 01:29:01 -0500 Subject: [PATCH 01/15] wip --- .../physical_optimizer/filter_pushdown/mod.rs | 101 +++++++ .../physical-plan/src/coalesce_batches.rs | 3 +- .../physical-plan/src/execution_plan.rs | 21 +- datafusion/physical-plan/src/filter.rs | 85 ++---- .../physical-plan/src/filter_pushdown.rs | 281 +++++++----------- .../physical-plan/src/joins/hash_join.rs | 16 +- .../physical-plan/src/repartition/mod.rs | 3 +- datafusion/physical-plan/src/sorts/sort.rs | 20 +- 8 files changed, 283 insertions(+), 247 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index f1ef365c92205..60d89deaab742 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -506,6 +506,107 @@ fn schema() -> SchemaRef { Arc::clone(&TEST_SCHEMA) } +#[tokio::test] +async fn test_hashjoin_parent_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("d", Utf8, ["aa", "ab", "ac", "ad"]), + ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("f", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec + let on = vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Create filters that can be pushed down to different sides + // We need to create filters in the context of the join output schema + let join_schema = join.schema(); + + // Filter on build side column: a = 'aa' + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column: e = 'ba' + let right_filter = col_lit_predicate("e", "ba", &join_schema); + // Filter that references both sides: a = d (should not be pushed down) + let cross_filter = Arc::new(BinaryExpr::new( + col("a", &join_schema).unwrap(), + Operator::Eq, + col("d", &join_schema).unwrap(), + )) as Arc; + + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()); + let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap()) + as Arc; + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = d@3 + - FilterExec: e@4 = ba + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: e@4 = ba AND a@0 = d@3 AND a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba + " + ); +} + /// Returns a predicate that is a binary expression col = lit fn col_lit_predicate( column_name: &str, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 78bd4b4fc3a0b..d008db48b82b0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -234,8 +234,7 @@ impl ExecutionPlan for CoalesceBatchesExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)) + FilterDescription::from_children(parent_filters, &self.children()) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 90385c58a6ac2..b3a8792bfdb1b 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,8 +17,8 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PredicateSupport, PredicateSupports, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -520,10 +520,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok( - FilterDescription::new_with_child_count(self.children().len()) - .all_parent_filters_unsupported(parent_filters), - ) + // Default implementation: mark all filters as unsupported for all children + let mut desc = FilterDescription::new(); + for _child in self.children() { + let child_filters = parent_filters + .iter() + .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) + .collect(); + desc = desc.with_child(ChildFilterDescription { + parent_filters: PredicateSupports::new(child_filters), + self_filters: vec![], + }); + } + Ok(desc) } /// Handle the result of a child pushdown. diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 252af9ebcd496..d842cbaa8cddf 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -16,7 +16,6 @@ // under the License. use std::any::Any; -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -28,8 +27,8 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PredicateSupport, PredicateSupports, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -46,9 +45,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; @@ -65,7 +61,6 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; -use itertools::Itertools; use log::trace; const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; @@ -455,56 +450,29 @@ impl ExecutionPlan for FilterExec { _config: &ConfigOptions, ) -> Result { if !matches!(phase, FilterPushdownPhase::Pre) { - return Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)); + // For non-pre phase, filters pass through unchanged + let filter_supports = parent_filters + .into_iter() + .map(PredicateSupport::Supported) + .collect(); + return Ok(FilterDescription::new().with_child(ChildFilterDescription { + parent_filters: PredicateSupports::new(filter_supports), + self_filters: vec![], + })); } - let self_filter = split_conjunction(&self.predicate) - .into_iter() - .cloned() - .collect_vec(); - - let parent_filters = if let Some(projection_indices) = self.projection.as_ref() { - // We need to invert the projection on any referenced columns in the filter - // Create a mapping from the output columns to the input columns (the inverse of the projection) - let inverse_projection = projection_indices - .iter() - .enumerate() - .map(|(i, &p)| (p, i)) - .collect::>(); - parent_filters + + let child = ChildFilterDescription::from_child( + parent_filters, + &self.input(), + )? + .with_self_filters( + split_conjunction(&self.predicate) .into_iter() - .map(|f| { - f.transform_up(|expr| { - let mut res = - if let Some(col) = expr.as_any().downcast_ref::() { - let index = col.index(); - let index_in_input_schema = - inverse_projection.get(&index).ok_or_else(|| { - DataFusionError::Internal(format!( - "Column {index} not found in projection" - )) - })?; - Transformed::yes(Arc::new(Column::new( - col.name(), - *index_in_input_schema, - )) as _) - } else { - Transformed::no(expr) - }; - // Columns can only exist in the leaves, no need to try all nodes - res.tnr = TreeNodeRecursion::Jump; - Ok(res) - }) - .data() - }) - .collect::>>()? - } else { - parent_filters - }; + .cloned() + .collect(), + ); - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters) - .with_self_filters_for_children(vec![self_filter])) + Ok(FilterDescription::new().with_child(child)) } fn handle_child_pushdown_result( @@ -577,8 +545,15 @@ impl ExecutionPlan for FilterExec { }; Some(Arc::new(new) as _) }; + // Mark all parent filters as supported since we absorbed them + let supported_filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|f| PredicateSupport::Supported(f.into_inner())) + .collect(); + Ok(FilterPushdownPropagation { - filters: child_pushdown_result.parent_filters.make_supported(), + filters: PredicateSupports::new(supported_filters), updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 725abd7fc8b5d..7a0490db26dcf 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::sync::Arc; use std::vec::IntoIter; +use datafusion_common::Result; +use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; #[derive(Debug, Clone, Copy)] @@ -83,27 +86,17 @@ impl PredicateSupport { } } -/// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of -/// supported and unsupported filters. Inner vector stores each predicate for one node. +/// A collection of filters with their support status. #[derive(Debug, Clone)] pub struct PredicateSupports(Vec); impl PredicateSupports { - /// Create a new FilterPushdowns with the given filters and their pushdown status. + /// Create a new PredicateSupports with the given filters and their pushdown status. pub fn new(pushdowns: Vec) -> Self { Self(pushdowns) } - /// Create a new [`PredicateSupport`] with all filters as supported. - pub fn all_supported(filters: Vec>) -> Self { - let pushdowns = filters - .into_iter() - .map(PredicateSupport::Supported) - .collect(); - Self::new(pushdowns) - } - - /// Create a new [`PredicateSupport`] with all filters as unsupported. + /// Create a new PredicateSupports with all filters as unsupported. pub fn all_unsupported(filters: Vec>) -> Self { let pushdowns = filters .into_iter() @@ -112,57 +105,7 @@ impl PredicateSupports { Self::new(pushdowns) } - /// Create a new [`PredicateSupport`] with filterrs marked as supported if - /// `f` returns true and unsupported otherwise. - pub fn new_with_supported_check( - filters: Vec>, - check: impl Fn(&Arc) -> bool, - ) -> Self { - let pushdowns = filters - .into_iter() - .map(|f| { - if check(&f) { - PredicateSupport::Supported(f) - } else { - PredicateSupport::Unsupported(f) - } - }) - .collect(); - Self::new(pushdowns) - } - - /// Transform all filters to supported, returning a new [`PredicateSupports`] - /// with all filters as [`PredicateSupport::Supported`]. - /// This does not modify the original [`PredicateSupport`]. - pub fn make_supported(self) -> Self { - let pushdowns = self - .0 - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => PredicateSupport::Supported(expr), - PredicateSupport::Unsupported(expr) => PredicateSupport::Supported(expr), - }) - .collect(); - Self::new(pushdowns) - } - - /// Transform all filters to unsupported, returning a new [`PredicateSupports`] - /// with all filters as [`PredicateSupport::Supported`]. - /// This does not modify the original [`PredicateSupport`]. - pub fn make_unsupported(self) -> Self { - let pushdowns = self - .0 - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => PredicateSupport::Unsupported(expr), - u @ PredicateSupport::Unsupported(_) => u, - }) - .collect(); - Self::new(pushdowns) - } - - /// Collect unsupported filters into a Vec, without removing them from the original - /// [`PredicateSupport`]. + /// Collect unsupported filters into a Vec. pub fn collect_unsupported(&self) -> Vec> { self.0 .iter() @@ -173,26 +116,13 @@ impl PredicateSupports { .collect() } - /// Collect supported filters into a Vec, without removing them from the original - /// [`PredicateSupport`]. - pub fn collect_supported(&self) -> Vec> { + /// Collect all filters (both supported and unsupported) into a Vec. + pub fn collect_all(&self) -> Vec> { self.0 .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Unsupported(_) => None, - }) - .collect() - } - - /// Collect all filters into a Vec, without removing them from the original - /// FilterPushdowns. - pub fn collect_all(self) -> Vec> { - self.0 - .into_iter() .map(|f| match f { - PredicateSupport::Supported(expr) - | PredicateSupport::Unsupported(expr) => expr, + PredicateSupport::Supported(expr) => Arc::clone(expr), + PredicateSupport::Unsupported(expr) => Arc::clone(expr), }) .collect() } @@ -201,34 +131,10 @@ impl PredicateSupports { self.0 } - /// Return an iterator over the inner `Vec`. + /// Return an iterator over the inner Vec. pub fn iter(&self) -> impl Iterator { self.0.iter() } - - /// Return the number of filters in the inner `Vec`. - pub fn len(&self) -> usize { - self.0.len() - } - - /// Check if the inner `Vec` is empty. - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - /// Check if all filters are supported. - pub fn is_all_supported(&self) -> bool { - self.0 - .iter() - .all(|f| matches!(f, PredicateSupport::Supported(_))) - } - - /// Check if all filters are unsupported. - pub fn is_all_unsupported(&self) -> bool { - self.0 - .iter() - .all(|f| matches!(f, PredicateSupport::Unsupported(_))) - } } impl IntoIterator for PredicateSupports { @@ -317,16 +223,16 @@ impl FilterPushdownPropagation { } #[derive(Debug, Clone)] -struct ChildFilterDescription { +pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters. - parent_filters: PredicateSupports, + pub(crate) parent_filters: PredicateSupports, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays /// instead of having to track supported/unsupported. - self_filters: Vec>, + pub(crate) self_filters: Vec>, } impl ChildFilterDescription { @@ -336,6 +242,65 @@ impl ChildFilterDescription { self_filters: vec![], } } + + /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. + pub fn from_child( + parent_filters: Vec>, + child: &Arc, + ) -> Result { + let child_schema = child.schema(); + + // Get column names from child schema for quick lookup + let child_column_names: HashSet<&str> = child_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + + // Analyze each parent filter + let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); + + for filter in &parent_filters { + // Check which columns the filter references + let referenced_columns = collect_columns(filter); + + // Check if all referenced columns exist in the child schema + let all_columns_exist = referenced_columns + .iter() + .all(|col| child_column_names.contains(col.name())); + + if all_columns_exist { + // All columns exist in child - we can push down + // Need to reassign column indices to match child schema + let reassigned_filter = reassign_predicate_columns( + Arc::clone(filter), + &child_schema, + false, + )?; + child_parent_filters.push(PredicateSupport::Supported(reassigned_filter)); + } else { + // Some columns don't exist in child - cannot push down + child_parent_filters.push(PredicateSupport::Unsupported(Arc::clone(filter))); + } + } + + Ok(Self { + parent_filters: PredicateSupports::new(child_parent_filters), + self_filters: vec![], + }) + } + + /// Add a self filter (from the current node) to be pushed down to this child. + pub fn with_self_filter(mut self, filter: Arc) -> Self { + self.self_filters.push(filter); + self + } + + /// Add multiple self filters. + pub fn with_self_filters(mut self, filters: Vec>) -> Self { + self.self_filters.extend(filters); + self + } } #[derive(Debug, Clone)] @@ -347,12 +312,47 @@ pub struct FilterDescription { } impl FilterDescription { + /// Create a new empty FilterDescription + pub fn new() -> Self { + Self { + child_filter_descriptions: vec![], + } + } + pub fn new_with_child_count(num_children: usize) -> Self { Self { child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], } } + /// Add a child filter description + pub fn with_child(mut self, child: ChildFilterDescription) -> Self { + self.child_filter_descriptions.push(child); + self + } + + /// Build a filter description by analyzing which parent filters can be pushed to each child. + /// This method automatically determines filter routing based on column analysis: + /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down + /// - Otherwise, it cannot be pushed down to that child + pub fn from_children( + parent_filters: Vec>, + children: &[&Arc], + ) -> Result { + let mut desc = Self::new(); + + // For each child, create a ChildFilterDescription + for child in children { + desc = desc.with_child(ChildFilterDescription::from_child( + parent_filters.clone(), + child, + )?); + } + + Ok(desc) + } + + pub fn parent_filters(&self) -> Vec { self.child_filter_descriptions .iter() @@ -369,69 +369,6 @@ impl FilterDescription { .collect() } - /// Mark all parent filters as supported for all children. - /// This is the case if the node allows filters to be pushed down through it - /// without any modification. - /// This broadcasts the parent filters to all children. - /// If handling of parent filters is different for each child then you should set the - /// field direclty. - /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently - /// use this to mark all parent filters as supported. - /// - /// [`RepartitionExec`]: crate::repartition::RepartitionExec - pub fn all_parent_filters_supported( - mut self, - parent_filters: Vec>, - ) -> Self { - let supported = PredicateSupports::all_supported(parent_filters); - for child in &mut self.child_filter_descriptions { - child.parent_filters = supported.clone(); - } - self - } - /// Mark all parent filters as unsupported for all children. - /// This is the case if the node does not allow filters to be pushed down through it. - /// This broadcasts the parent filters to all children. - /// If handling of parent filters is different for each child then you should set the - /// field direclty. - /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] - /// assumes that filters cannot be pushed down to children. - /// - /// [`ExecutionPlan`]: crate::ExecutionPlan - pub fn all_parent_filters_unsupported( - mut self, - parent_filters: Vec>, - ) -> Self { - let unsupported = PredicateSupports::all_unsupported(parent_filters); - for child in &mut self.child_filter_descriptions { - child.parent_filters = unsupported.clone(); - } - self - } - /// Add a filter generated / owned by the current node to be pushed down to all children. - /// This assumes that there is a single filter that that gets pushed down to all children - /// equally. - /// If there are multiple filters or pushdown to children is not homogeneous then - /// you should set the field directly. - /// For example: - /// - `TopK` uses this to push down a single filter to all children, it can use this method. - /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method. - pub fn with_self_filter(mut self, predicate: Arc) -> Self { - for child in &mut self.child_filter_descriptions { - child.self_filters = vec![Arc::clone(&predicate)]; - } - self - } - - pub fn with_self_filters_for_children( - mut self, - filters: Vec>>, - ) -> Self { - for (child, filters) in self.child_filter_descriptions.iter_mut().zip(filters) { - child.self_filters = filters; - } - self - } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 770399290dca5..75ff55048b75e 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -34,6 +34,7 @@ use super::{ }; use super::{JoinOn, JoinOnRef}; use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, @@ -78,7 +79,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; @@ -943,6 +944,19 @@ impl ExecutionPlan for HashJoinExec { try_embed_projection(projection, self) } } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &datafusion_common::config::ConfigOptions, + ) -> Result { + // Use the new from_children API - it automatically handles column analysis + FilterDescription::from_children( + parent_filters, + &self.children(), + ) + } } /// Reads the left (build) side of the input, buffering it in memory, to build a diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 620bfa2809a90..37dc3a7675901 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -812,8 +812,7 @@ impl ExecutionPlan for RepartitionExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)) + FilterDescription::from_children(parent_filters, &self.children()) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 21f98fd012605..68103c73b6de9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; -use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; +use crate::filter_pushdown::{ChildFilterDescription, FilterDescription, FilterPushdownPhase}; use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, @@ -1268,19 +1268,21 @@ impl ExecutionPlan for SortExec { config: &datafusion_common::config::ConfigOptions, ) -> Result { if !matches!(phase, FilterPushdownPhase::Post) { - return Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)); + return FilterDescription::from_children(parent_filters, &self.children()) } + + let mut child = ChildFilterDescription::from_child( + parent_filters, + &self.input(), + )?; + if let Some(filter) = &self.filter { if config.optimizer.enable_dynamic_filter_pushdown { - let filter = Arc::clone(filter) as Arc; - return Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters) - .with_self_filter(filter)); + child = child.with_self_filter(Arc::clone(filter) as Arc); } } - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)) + + Ok(FilterDescription::new().with_child(child)) } } From 91176f919e057d1973cc98cec84267da01b25e6e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 10:56:51 -0500 Subject: [PATCH 02/15] implement filter passthrough for joins and refactor fitler pushdown helpers to simplify APIs --- .../filter_pushdown/util.rs | 27 ++-- datafusion/datasource-parquet/src/source.rs | 51 +++++-- datafusion/datasource/src/file.rs | 11 +- datafusion/datasource/src/source.rs | 18 ++- .../physical-optimizer/src/filter_pushdown.rs | 31 ++--- .../physical-plan/src/execution_plan.rs | 13 +- datafusion/physical-plan/src/filter.rs | 44 +++--- .../physical-plan/src/filter_pushdown.rs | 126 +++--------------- .../physical-plan/src/joins/hash_join.rs | 5 +- datafusion/physical-plan/src/sorts/sort.rs | 18 +-- 10 files changed, 156 insertions(+), 188 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index e793af8ed4b03..22ddd23d62385 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -34,8 +34,8 @@ use datafusion_physical_plan::{ displayable, filter::FilterExec, filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPropagation, - PredicateSupport, PredicateSupports, + ChildFilterDescription, ChildPushdownResult, FilterDescription, + FilterPushdownPropagation, PredicateSupport, }, metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -228,11 +228,19 @@ impl FileSource for TestSource { ..self.clone() }); Ok(FilterPushdownPropagation { - filters: PredicateSupports::all_supported(filters), + filters: filters + .into_iter() + .map(PredicateSupport::Supported) + .collect(), updated_node: Some(new_node), }) } else { - Ok(FilterPushdownPropagation::unsupported(filters)) + Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )) } } @@ -515,9 +523,12 @@ impl ExecutionPlan for TestNode { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters) - .with_self_filter(Arc::clone(&self.predicate))) + // Since TestNode marks all parent filters as supported and adds its own filter, + // we use from_child to create a description with all parent filters supported + let child = &self.input; + let child_desc = ChildFilterDescription::from_child(parent_filters, child)? + .with_self_filter(Arc::clone(&self.predicate)); + Ok(FilterDescription::new().with_child(child_desc)) } fn handle_child_pushdown_result( @@ -534,7 +545,7 @@ impl ExecutionPlan for TestNode { let self_pushdown_result = child_pushdown_result.self_filters[0].clone(); // And pushed down 1 filter assert_eq!(self_pushdown_result.len(), 1); - let self_pushdown_result = self_pushdown_result.into_inner(); + let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect(); match &self_pushdown_result[0] { PredicateSupport::Unsupported(filter) => { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index b7c5b5d37686d..41895751ea9e7 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; -use datafusion_physical_plan::filter_pushdown::PredicateSupports; +use datafusion_physical_plan::filter_pushdown::{ + FilterPushdownPropagation, PredicateSupport, +}; use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -621,7 +622,12 @@ impl FileSource for ParquetSource { config: &ConfigOptions, ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { - return Ok(FilterPushdownPropagation::unsupported(filters)); + return Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )); }; // Determine if based on configs we should push filters down. // If either the table / scan itself or the config has pushdown enabled, @@ -635,20 +641,36 @@ impl FileSource for ParquetSource { let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled; let mut source = self.clone(); - let filters = PredicateSupports::new_with_supported_check(filters, |filter| { - can_expr_be_pushed_down_with_schemas(filter, &file_schema) - }); - if filters.is_all_unsupported() { + let filters: Vec = filters + .into_iter() + .map(|filter| { + if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) { + PredicateSupport::Supported(filter) + } else { + PredicateSupport::Unsupported(filter) + } + }) + .collect(); + if filters + .iter() + .all(|f| matches!(f, PredicateSupport::Unsupported(_))) + { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. return Ok(FilterPushdownPropagation::with_filters(filters)); } - let allowed_filters = filters.collect_supported(); + let allowed_filters = filters + .iter() + .filter_map(|f| match f { + PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Unsupported(_) => None, + }) + .collect::>(); let predicate = match source.predicate { - Some(predicate) => conjunction( - std::iter::once(predicate).chain(allowed_filters.iter().cloned()), - ), - None => conjunction(allowed_filters.iter().cloned()), + Some(predicate) => { + conjunction(std::iter::once(predicate).chain(allowed_filters)) + } + None => conjunction(allowed_filters), }; source.predicate = Some(predicate); source = source.with_pushdown_filters(pushdown_filters); @@ -657,7 +679,10 @@ impl FileSource for ParquetSource { // even if we updated the predicate to include the filters (they will only be used for stats pruning). if !pushdown_filters { return Ok(FilterPushdownPropagation::with_filters( - filters.make_unsupported(), + filters + .into_iter() + .map(|f| PredicateSupport::Unsupported(f.into_inner())) + .collect::>(), ) .with_updated_node(source)); } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index c5f21ebf1a0f3..a95c07cc319cd 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -30,7 +30,9 @@ use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; +use datafusion_physical_plan::filter_pushdown::{ + FilterPushdownPropagation, PredicateSupport, +}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -120,7 +122,12 @@ pub trait FileSource: Send + Sync { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::unsupported(filters)) + Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )) } /// Set optional schema adapter factory. diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 4dda95b0856b1..78d7e295604d9 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -38,7 +38,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, }; /// A source of data, typically a list of files or memory @@ -168,7 +168,12 @@ pub trait DataSource: Send + Sync + Debug { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::unsupported(filters)) + Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )) } } @@ -316,7 +321,14 @@ impl ExecutionPlan for DataSourceExec { ) -> Result>> { // Push any remaining filters into our data source let res = self.data_source.try_pushdown_filters( - child_pushdown_result.parent_filters.collect_all(), + child_pushdown_result + .parent_filters + .into_iter() + .map(|f| match f { + PredicateSupport::Supported(expr) => expr, + PredicateSupport::Unsupported(expr) => expr, + }) + .collect(), config, )?; match res.updated_node { diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 885280576b4b8..257bb7bc6d38f 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -22,8 +22,7 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, - PredicateSupport, PredicateSupports, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -497,10 +496,10 @@ fn push_down_filters( // Our child doesn't know the difference between filters that were passed down // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. - let mut all_filters = result.filters.into_inner(); + let mut all_filters: Vec<_> = result.filters.into_iter().collect(); let parent_predicates = all_filters.split_off(num_self_filters); let self_predicates = all_filters; - self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates)); + self_filters_pushdown_supports.push(self_predicates); for (idx, result) in parent_supported_predicate_indices .iter() @@ -533,21 +532,15 @@ fn push_down_filters( let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; // Remap the result onto the parent filters as they were given to us. // Any filters that were not pushed down to any children are marked as unsupported. - let parent_pushdown_result = PredicateSupports::new( - parent_predicates_pushdown_states - .into_iter() - .zip(parent_predicates) - .map(|(state, filter)| match state { - ParentPredicateStates::NoChildren => { - PredicateSupport::Unsupported(filter) - } - ParentPredicateStates::Unsupported => { - PredicateSupport::Unsupported(filter) - } - ParentPredicateStates::Supported => PredicateSupport::Supported(filter), - }) - .collect(), - ); + let parent_pushdown_result = parent_predicates_pushdown_states + .into_iter() + .zip(parent_predicates) + .map(|(state, filter)| match state { + ParentPredicateStates::NoChildren => PredicateSupport::Unsupported(filter), + ParentPredicateStates::Unsupported => PredicateSupport::Unsupported(filter), + ParentPredicateStates::Supported => PredicateSupport::Supported(filter), + }) + .collect(); // TODO: by calling `handle_child_pushdown_result` we are assuming that the // `ExecutionPlan` implementation will not change the plan itself. // Should we have a separate method for dynamic pushdown that does not allow modifying the plan? diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b3a8792bfdb1b..d223cc21362d6 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -18,7 +18,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, PredicateSupports, + FilterPushdownPropagation, PredicateSupport, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -528,7 +528,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) .collect(); desc = desc.with_child(ChildFilterDescription { - parent_filters: PredicateSupports::new(child_filters), + parent_filters: child_filters, self_filters: vec![], }); } @@ -601,10 +601,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// - [`FilterPushdownPropagation::transparent`]: Indicates that the node /// supports filter pushdown but does not modify it, simply transmitting /// the children's pushdown results back up to its parent. - /// - [`PredicateSupports::new_with_supported_check`]: Takes a callback to - /// dynamically determine support for each filter, useful with - /// [`FilterPushdownPropagation::with_filters`] and - /// [`FilterPushdownPropagation::with_updated_node`] to build mixed results + /// - [`PredicateSupports::new`]: Create a new collection of filters with + /// their support status, useful with [`FilterPushdownPropagation::with_filters`] + /// and [`FilterPushdownPropagation::with_updated_node`] to build mixed results /// of supported and unsupported filters. /// /// **Filter Pushdown Phases:** @@ -614,7 +613,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`FilterPushdownPhase`] for more details on phase-specific behavior. /// /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported - /// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check + /// [`PredicateSupports::new`]: crate::filter_pushdown::PredicateSupports::new fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index d842cbaa8cddf..58e464e99f1cb 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -28,7 +28,7 @@ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, PredicateSupports, + FilterPushdownPropagation, PredicateSupport, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -456,21 +456,18 @@ impl ExecutionPlan for FilterExec { .map(PredicateSupport::Supported) .collect(); return Ok(FilterDescription::new().with_child(ChildFilterDescription { - parent_filters: PredicateSupports::new(filter_supports), + parent_filters: filter_supports, self_filters: vec![], })); } - let child = ChildFilterDescription::from_child( - parent_filters, - &self.input(), - )? - .with_self_filters( - split_conjunction(&self.predicate) - .into_iter() - .cloned() - .collect(), - ); + let child = ChildFilterDescription::from_child(parent_filters, self.input())? + .with_self_filters( + split_conjunction(&self.predicate) + .into_iter() + .cloned() + .collect(), + ); Ok(FilterDescription::new().with_child(child)) } @@ -487,15 +484,26 @@ impl ExecutionPlan for FilterExec { )); } // We absorb any parent filters that were not handled by our children - let mut unhandled_filters = - child_pushdown_result.parent_filters.collect_unsupported(); + let mut unhandled_filters = child_pushdown_result + .parent_filters + .iter() + .filter_map(|f| match f { + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, + }) + .collect::>(); assert_eq!( child_pushdown_result.self_filters.len(), 1, "FilterExec should only have one child" ); - let unsupported_self_filters = - child_pushdown_result.self_filters[0].collect_unsupported(); + let unsupported_self_filters = child_pushdown_result.self_filters[0] + .iter() + .filter_map(|f| match f { + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, + }) + .collect::>(); unhandled_filters.extend(unsupported_self_filters); // If we have unhandled filters, we need to create a new FilterExec @@ -551,9 +559,9 @@ impl ExecutionPlan for FilterExec { .into_iter() .map(|f| PredicateSupport::Supported(f.into_inner())) .collect(); - + Ok(FilterPushdownPropagation { - filters: PredicateSupports::new(supported_filters), + filters: supported_filters, updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 7a0490db26dcf..ec4647e6f16ea 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -17,7 +17,6 @@ use std::collections::HashSet; use std::sync::Arc; -use std::vec::IntoIter; use datafusion_common::Result; use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; @@ -86,66 +85,6 @@ impl PredicateSupport { } } -/// A collection of filters with their support status. -#[derive(Debug, Clone)] -pub struct PredicateSupports(Vec); - -impl PredicateSupports { - /// Create a new PredicateSupports with the given filters and their pushdown status. - pub fn new(pushdowns: Vec) -> Self { - Self(pushdowns) - } - - /// Create a new PredicateSupports with all filters as unsupported. - pub fn all_unsupported(filters: Vec>) -> Self { - let pushdowns = filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(); - Self::new(pushdowns) - } - - /// Collect unsupported filters into a Vec. - pub fn collect_unsupported(&self) -> Vec> { - self.0 - .iter() - .filter_map(|f| match f { - PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Supported(_) => None, - }) - .collect() - } - - /// Collect all filters (both supported and unsupported) into a Vec. - pub fn collect_all(&self) -> Vec> { - self.0 - .iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => Arc::clone(expr), - PredicateSupport::Unsupported(expr) => Arc::clone(expr), - }) - .collect() - } - - pub fn into_inner(self) -> Vec { - self.0 - } - - /// Return an iterator over the inner Vec. - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } -} - -impl IntoIterator for PredicateSupports { - type Item = PredicateSupport; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - /// The result of pushing down filters into a child node. /// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. /// Nodes process this result and convert it into a [`FilterPushdownPropagation`] @@ -166,10 +105,10 @@ pub struct ChildPushdownResult { /// down into any child then the result is unsupported. /// If at least one children and all children that received the filter mark it as supported /// then the result is supported. - pub parent_filters: PredicateSupports, + pub parent_filters: Vec, /// The result of pushing down each filter this node provided into each of it's children. /// This is not combined with the parent filters so that nodes can treat each child independently. - pub self_filters: Vec, + pub self_filters: Vec>, } /// The result of pushing down filters into a node that it returns to its parent. @@ -182,7 +121,7 @@ pub struct ChildPushdownResult { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { - pub filters: PredicateSupports, + pub filters: Vec, pub updated_node: Option, } @@ -197,18 +136,8 @@ impl FilterPushdownPropagation { } } - /// Create a new [`FilterPushdownPropagation`] that tells the parent node - /// that none of the parent filters were not pushed down. - pub fn unsupported(parent_filters: Vec>) -> Self { - let unsupported = PredicateSupports::all_unsupported(parent_filters); - Self { - filters: unsupported, - updated_node: None, - } - } - /// Create a new [`FilterPushdownPropagation`] with the specified filter support. - pub fn with_filters(filters: PredicateSupports) -> Self { + pub fn with_filters(filters: Vec) -> Self { Self { filters, updated_node: None, @@ -228,7 +157,7 @@ pub struct ChildFilterDescription { /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters. - pub(crate) parent_filters: PredicateSupports, + pub(crate) parent_filters: Vec, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays /// instead of having to track supported/unsupported. @@ -236,13 +165,6 @@ pub struct ChildFilterDescription { } impl ChildFilterDescription { - fn new() -> Self { - Self { - parent_filters: PredicateSupports::new(vec![]), - self_filters: vec![], - } - } - /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. pub fn from_child( parent_filters: Vec>, @@ -259,33 +181,31 @@ impl ChildFilterDescription { // Analyze each parent filter let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); - + for filter in &parent_filters { // Check which columns the filter references let referenced_columns = collect_columns(filter); - + // Check if all referenced columns exist in the child schema let all_columns_exist = referenced_columns .iter() .all(|col| child_column_names.contains(col.name())); - + if all_columns_exist { // All columns exist in child - we can push down // Need to reassign column indices to match child schema - let reassigned_filter = reassign_predicate_columns( - Arc::clone(filter), - &child_schema, - false, - )?; + let reassigned_filter = + reassign_predicate_columns(Arc::clone(filter), &child_schema, false)?; child_parent_filters.push(PredicateSupport::Supported(reassigned_filter)); } else { // Some columns don't exist in child - cannot push down - child_parent_filters.push(PredicateSupport::Unsupported(Arc::clone(filter))); + child_parent_filters + .push(PredicateSupport::Unsupported(Arc::clone(filter))); } } Ok(Self { - parent_filters: PredicateSupports::new(child_parent_filters), + parent_filters: child_parent_filters, self_filters: vec![], }) } @@ -311,6 +231,12 @@ pub struct FilterDescription { child_filter_descriptions: Vec, } +impl Default for FilterDescription { + fn default() -> Self { + Self::new() + } +} + impl FilterDescription { /// Create a new empty FilterDescription pub fn new() -> Self { @@ -319,12 +245,6 @@ impl FilterDescription { } } - pub fn new_with_child_count(num_children: usize) -> Self { - Self { - child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], - } - } - /// Add a child filter description pub fn with_child(mut self, child: ChildFilterDescription) -> Self { self.child_filter_descriptions.push(child); @@ -340,7 +260,7 @@ impl FilterDescription { children: &[&Arc], ) -> Result { let mut desc = Self::new(); - + // For each child, create a ChildFilterDescription for child in children { desc = desc.with_child(ChildFilterDescription::from_child( @@ -352,8 +272,7 @@ impl FilterDescription { Ok(desc) } - - pub fn parent_filters(&self) -> Vec { + pub fn parent_filters(&self) -> Vec> { self.child_filter_descriptions .iter() .map(|d| &d.parent_filters) @@ -368,7 +287,4 @@ impl FilterDescription { .cloned() .collect() } - - - } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 75ff55048b75e..1df6725f42438 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -952,10 +952,7 @@ impl ExecutionPlan for HashJoinExec { _config: &datafusion_common::config::ConfigOptions, ) -> Result { // Use the new from_children API - it automatically handles column analysis - FilterDescription::from_children( - parent_filters, - &self.children(), - ) + FilterDescription::from_children(parent_filters, &self.children()) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 68103c73b6de9..e6cc4406f86cc 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; -use crate::filter_pushdown::{ChildFilterDescription, FilterDescription, FilterPushdownPhase}; +use crate::filter_pushdown::{ + ChildFilterDescription, FilterDescription, FilterPushdownPhase, +}; use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, @@ -1268,20 +1270,18 @@ impl ExecutionPlan for SortExec { config: &datafusion_common::config::ConfigOptions, ) -> Result { if !matches!(phase, FilterPushdownPhase::Post) { - return FilterDescription::from_children(parent_filters, &self.children()) + return FilterDescription::from_children(parent_filters, &self.children()); } - - let mut child = ChildFilterDescription::from_child( - parent_filters, - &self.input(), - )?; + + let mut child = ChildFilterDescription::from_child(parent_filters, self.input())?; if let Some(filter) = &self.filter { if config.optimizer.enable_dynamic_filter_pushdown { - child = child.with_self_filter(Arc::clone(filter) as Arc); + child = + child.with_self_filter(Arc::clone(filter) as Arc); } } - + Ok(FilterDescription::new().with_child(child)) } } From 3924772a0a7c0c98dea10be860f2c30d9a0228cc Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 11:03:03 -0500 Subject: [PATCH 03/15] fix docs --- datafusion/physical-plan/src/execution_plan.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index d223cc21362d6..4c72f3f989818 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -596,15 +596,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// **Helper Methods for Customization:** /// There are various helper methods to simplify implementing this method: - /// - [`FilterPushdownPropagation::unsupported`]: Indicates that the node - /// does not support filter pushdown at all, rejecting all filters. /// - [`FilterPushdownPropagation::transparent`]: Indicates that the node /// supports filter pushdown but does not modify it, simply transmitting /// the children's pushdown results back up to its parent. - /// - [`PredicateSupports::new`]: Create a new collection of filters with - /// their support status, useful with [`FilterPushdownPropagation::with_filters`] - /// and [`FilterPushdownPropagation::with_updated_node`] to build mixed results - /// of supported and unsupported filters. + /// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters + /// to the propagation result, indicating which filters are supported by + /// the current node. + /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the + /// current node in the propagation result, used if the node + /// has modified its plan based on the pushdown results. /// /// **Filter Pushdown Phases:** /// There are two different phases in filter pushdown (`Pre` and others), @@ -613,7 +613,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`FilterPushdownPhase`] for more details on phase-specific behavior. /// /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported - /// [`PredicateSupports::new`]: crate::filter_pushdown::PredicateSupports::new fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, From c5f1e79eebee463c2b1973a19c56f3789c2a6114 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 11:13:19 -0500 Subject: [PATCH 04/15] cleanup --- datafusion/datasource-parquet/src/source.rs | 4 ++-- datafusion/physical-plan/src/filter.rs | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 41895751ea9e7..8ca36e7cd3216 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -665,7 +665,7 @@ impl FileSource for ParquetSource { PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), PredicateSupport::Unsupported(_) => None, }) - .collect::>(); + .collect_vec(); let predicate = match source.predicate { Some(predicate) => { conjunction(std::iter::once(predicate).chain(allowed_filters)) @@ -682,7 +682,7 @@ impl FileSource for ParquetSource { filters .into_iter() .map(|f| PredicateSupport::Unsupported(f.into_inner())) - .collect::>(), + .collect_vec(), ) .with_updated_node(source)); } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 58e464e99f1cb..69f600981b318 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,6 +20,8 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; +use itertools::Itertools; + use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -491,7 +493,7 @@ impl ExecutionPlan for FilterExec { PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), PredicateSupport::Supported(_) => None, }) - .collect::>(); + .collect_vec(); assert_eq!( child_pushdown_result.self_filters.len(), 1, @@ -503,7 +505,7 @@ impl ExecutionPlan for FilterExec { PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), PredicateSupport::Supported(_) => None, }) - .collect::>(); + .collect_vec(); unhandled_filters.extend(unsupported_self_filters); // If we have unhandled filters, we need to create a new FilterExec From 110ac64da16294fde8921888bd820f1ea83e2193 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 11:38:56 -0500 Subject: [PATCH 05/15] lint --- datafusion/physical-plan/src/execution_plan.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 4c72f3f989818..a2e56a5d92e7e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -600,11 +600,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// supports filter pushdown but does not modify it, simply transmitting /// the children's pushdown results back up to its parent. /// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters - /// to the propagation result, indicating which filters are supported by - /// the current node. + /// to the propagation result, indicating which filters are supported by + /// the current node. /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the - /// current node in the propagation result, used if the node - /// has modified its plan based on the pushdown results. + /// current node in the propagation result, used if the node + /// has modified its plan based on the pushdown results. /// /// **Filter Pushdown Phases:** /// There are two different phases in filter pushdown (`Pre` and others), From adf8080d2d19d97de249546982662e98c99127d1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 21:56:15 -0500 Subject: [PATCH 06/15] remove hash join bits --- .../physical_optimizer/filter_pushdown/mod.rs | 101 ------------------ .../physical-plan/src/joins/hash_join.rs | 13 +-- 2 files changed, 1 insertion(+), 113 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 60d89deaab742..f1ef365c92205 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -506,107 +506,6 @@ fn schema() -> SchemaRef { Arc::clone(&TEST_SCHEMA) } -#[tokio::test] -async fn test_hashjoin_parent_filter_pushdown() { - use datafusion_common::JoinType; - use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - - // Create build side with limited values - let build_batches = vec![record_batch!( - ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["ba", "bb"]), - ("c", Float64, [1.0, 2.0]) - ) - .unwrap()]; - let build_side_schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), - Field::new("c", DataType::Float64, false), - ])); - let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) - .with_support(true) - .with_batches(build_batches) - .build(); - - // Create probe side with more values - let probe_batches = vec![record_batch!( - ("d", Utf8, ["aa", "ab", "ac", "ad"]), - ("e", Utf8, ["ba", "bb", "bc", "bd"]), - ("f", Float64, [1.0, 2.0, 3.0, 4.0]) - ) - .unwrap()]; - let probe_side_schema = Arc::new(Schema::new(vec![ - Field::new("d", DataType::Utf8, false), - Field::new("e", DataType::Utf8, false), - Field::new("f", DataType::Float64, false), - ])); - let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) - .with_support(true) - .with_batches(probe_batches) - .build(); - - // Create HashJoinExec - let on = vec![( - col("a", &build_side_schema).unwrap(), - col("d", &probe_side_schema).unwrap(), - )]; - let join = Arc::new( - HashJoinExec::try_new( - build_scan, - probe_scan, - on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - datafusion_common::NullEquality::NullEqualsNothing, - ) - .unwrap(), - ); - - // Create filters that can be pushed down to different sides - // We need to create filters in the context of the join output schema - let join_schema = join.schema(); - - // Filter on build side column: a = 'aa' - let left_filter = col_lit_predicate("a", "aa", &join_schema); - // Filter on probe side column: e = 'ba' - let right_filter = col_lit_predicate("e", "ba", &join_schema); - // Filter that references both sides: a = d (should not be pushed down) - let cross_filter = Arc::new(BinaryExpr::new( - col("a", &join_schema).unwrap(), - Operator::Eq, - col("d", &join_schema).unwrap(), - )) as Arc; - - let filter = - Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); - let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()); - let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap()) - as Arc; - - // Test that filters are pushed down correctly to each side of the join - insta::assert_snapshot!( - OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), - @r" - OptimizationTest: - input: - - FilterExec: a@0 = d@3 - - FilterExec: e@4 = ba - - FilterExec: a@0 = aa - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true - output: - Ok: - - FilterExec: e@4 = ba AND a@0 = d@3 AND a@0 = aa - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba - " - ); -} - /// Returns a predicate that is a binary expression col = lit fn col_lit_predicate( column_name: &str, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 1df6725f42438..770399290dca5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -34,7 +34,6 @@ use super::{ }; use super::{JoinOn, JoinOnRef}; use crate::execution_plan::{boundedness_from_children, EmissionType}; -use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, @@ -79,7 +78,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; +use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; @@ -944,16 +943,6 @@ impl ExecutionPlan for HashJoinExec { try_embed_projection(projection, self) } } - - fn gather_filters_for_pushdown( - &self, - _phase: FilterPushdownPhase, - parent_filters: Vec>, - _config: &datafusion_common::config::ConfigOptions, - ) -> Result { - // Use the new from_children API - it automatically handles column analysis - FilterDescription::from_children(parent_filters, &self.children()) - } } /// Reads the left (build) side of the input, buffering it in memory, to build a From 32932e1703139de27d03d738eb96334b5f12bff8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 21:59:37 -0500 Subject: [PATCH 07/15] nit --- datafusion/physical-optimizer/src/filter_pushdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 257bb7bc6d38f..fbbac6ed306dd 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -26,7 +26,7 @@ use datafusion_physical_plan::filter_pushdown::{ }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use itertools::izip; +use itertools::{izip, Itertools}; /// Attempts to recursively push given filters from the top of the tree into leafs. /// @@ -496,7 +496,7 @@ fn push_down_filters( // Our child doesn't know the difference between filters that were passed down // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. - let mut all_filters: Vec<_> = result.filters.into_iter().collect(); + let mut all_filters= result.filters.into_iter().collect_vec(); let parent_predicates = all_filters.split_off(num_self_filters); let self_predicates = all_filters; self_filters_pushdown_supports.push(self_predicates); From 54f4c95ae0fe2aa658c0d8fc1e200f67f53485d3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 23:21:58 -0500 Subject: [PATCH 08/15] lint --- datafusion/physical-optimizer/src/filter_pushdown.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index fbbac6ed306dd..59b26af3378b0 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -496,7 +496,7 @@ fn push_down_filters( // Our child doesn't know the difference between filters that were passed down // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. - let mut all_filters= result.filters.into_iter().collect_vec(); + let mut all_filters = result.filters.into_iter().collect_vec(); let parent_predicates = all_filters.split_off(num_self_filters); let self_predicates = all_filters; self_filters_pushdown_supports.push(self_predicates); From 7e78336ca48f20e8508f11286c3aad205eded0c2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 2 Jul 2025 14:19:18 -0500 Subject: [PATCH 09/15] Update datafusion/physical-plan/src/filter_pushdown.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/filter_pushdown.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index ec4647e6f16ea..826522010cacc 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -166,6 +166,8 @@ pub struct ChildFilterDescription { impl ChildFilterDescription { /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. + /// + /// See [`FilterDescription::from_children`] for more details pub fn from_child( parent_filters: Vec>, child: &Arc, From 9c81ab0f9d1bc5e506cefbbfd021fc42c08a9762 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Jul 2025 07:32:45 -0500 Subject: [PATCH 10/15] reduce clones --- .../tests/physical_optimizer/filter_pushdown/util.rs | 2 +- datafusion/physical-plan/src/filter.rs | 2 +- datafusion/physical-plan/src/filter_pushdown.rs | 10 ++++------ datafusion/physical-plan/src/sorts/sort.rs | 3 ++- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 22ddd23d62385..d4318235bafbc 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -526,7 +526,7 @@ impl ExecutionPlan for TestNode { // Since TestNode marks all parent filters as supported and adds its own filter, // we use from_child to create a description with all parent filters supported let child = &self.input; - let child_desc = ChildFilterDescription::from_child(parent_filters, child)? + let child_desc = ChildFilterDescription::from_child(&parent_filters, child)? .with_self_filter(Arc::clone(&self.predicate)); Ok(FilterDescription::new().with_child(child_desc)) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 69f600981b318..54015c7bcdd22 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -463,7 +463,7 @@ impl ExecutionPlan for FilterExec { })); } - let child = ChildFilterDescription::from_child(parent_filters, self.input())? + let child = ChildFilterDescription::from_child(&parent_filters, self.input())? .with_self_filters( split_conjunction(&self.predicate) .into_iter() diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 826522010cacc..ecfec255ade19 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -169,7 +169,7 @@ impl ChildFilterDescription { /// /// See [`FilterDescription::from_children`] for more details pub fn from_child( - parent_filters: Vec>, + parent_filters: &[Arc], child: &Arc, ) -> Result { let child_schema = child.schema(); @@ -184,7 +184,7 @@ impl ChildFilterDescription { // Analyze each parent filter let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); - for filter in &parent_filters { + for filter in parent_filters { // Check which columns the filter references let referenced_columns = collect_columns(filter); @@ -265,10 +265,8 @@ impl FilterDescription { // For each child, create a ChildFilterDescription for child in children { - desc = desc.with_child(ChildFilterDescription::from_child( - parent_filters.clone(), - child, - )?); + desc = desc + .with_child(ChildFilterDescription::from_child(&parent_filters, child)?); } Ok(desc) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e6cc4406f86cc..bb572c4315fb8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1273,7 +1273,8 @@ impl ExecutionPlan for SortExec { return FilterDescription::from_children(parent_filters, &self.children()); } - let mut child = ChildFilterDescription::from_child(parent_filters, self.input())?; + let mut child = + ChildFilterDescription::from_child(&parent_filters, self.input())?; if let Some(filter) = &self.filter { if config.optimizer.enable_dynamic_filter_pushdown { From 963a0a942d25f59167232fa870321a669ef76548 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Jul 2025 08:06:45 -0500 Subject: [PATCH 11/15] move vec out of loop --- datafusion/physical-plan/src/execution_plan.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a2e56a5d92e7e..375de7c7385e7 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -33,6 +33,7 @@ pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ expressions, Distribution, Partitioning, PhysicalExpr, }; +use itertools::Itertools; use std::any::Any; use std::fmt::Debug; @@ -522,13 +523,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result { // Default implementation: mark all filters as unsupported for all children let mut desc = FilterDescription::new(); - for _child in self.children() { - let child_filters = parent_filters - .iter() - .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) - .collect(); + let child_filters = parent_filters + .iter() + .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) + .collect_vec(); + for _ in 0..self.children().len() { desc = desc.with_child(ChildFilterDescription { - parent_filters: child_filters, + parent_filters: child_filters.clone(), self_filters: vec![], }); } From 4436495516cd5717851fad0f6dd81e050e44661f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Jul 2025 09:45:00 -0500 Subject: [PATCH 12/15] more docstrings --- .../physical-optimizer/src/filter_pushdown.rs | 14 ++++++++++++++ datafusion/physical-plan/src/filter_pushdown.rs | 16 ++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 59b26af3378b0..3f777578e3e2e 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -15,6 +15,20 @@ // specific language governing permissions and limitations // under the License. +//! Filter Pushdown Optimization Process +//! +//! The filter pushdown mechanism involves four key steps: +//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`] +//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] +//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. +//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls [`push_down_filters`] on each child, +//! passing the appropriate filters (`Vec>`) for that child. +//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, +//! containing information about which filters were successfully pushed down vs. unsupported. +//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, +//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides +//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). + use std::sync::Arc; use crate::PhysicalOptimizerRule; diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index ecfec255ade19..a08520a140945 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -15,6 +15,22 @@ // specific language governing permissions and limitations // under the License. +//! Filter Pushdown Optimization Process +//! +//! The filter pushdown mechanism involves four key steps: +//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls `ExecutionPlan::gather_filters_for_pushdown` +//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] +//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. +//! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child, +//! passing the appropriate filters (`Vec>`) for that child. +//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, +//! containing information about which filters were successfully pushed down vs. unsupported. +//! 4. **Parent Responds**: The optimizer calls `ExecutionPlan::handle_child_pushdown_result` on the parent, +//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides +//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). +//! +//! See also datafusion/physical-optimizer/src/filter_pushdown.rs. + use std::collections::HashSet; use std::sync::Arc; From 099b808f5659ed5567a2475ccf9ef2313ac3f890 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Jul 2025 09:48:51 -0500 Subject: [PATCH 13/15] fmt --- datafusion/physical-plan/src/filter_pushdown.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index a08520a140945..c6a6a6d8712ae 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -28,7 +28,7 @@ //! 4. **Parent Responds**: The optimizer calls `ExecutionPlan::handle_child_pushdown_result` on the parent, //! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides //! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). -//! +//! //! See also datafusion/physical-optimizer/src/filter_pushdown.rs. use std::collections::HashSet; From 2b3b7573e5b700dbe0aabd8f20c09e3cee406f73 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Jul 2025 10:08:51 -0500 Subject: [PATCH 14/15] fmt --- datafusion/physical-optimizer/src/filter_pushdown.rs | 4 +++- datafusion/physical-plan/src/filter_pushdown.rs | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 3f777578e3e2e..414c33956b686 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -21,13 +21,15 @@ //! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`] //! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] //! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. -//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls [`push_down_filters`] on each child, +//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child, //! passing the appropriate filters (`Vec>`) for that child. //! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, //! containing information about which filters were successfully pushed down vs. unsupported. //! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, //! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides //! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). +//! +//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription use std::sync::Arc; diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index c6a6a6d8712ae..70b5ee0633932 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -18,16 +18,19 @@ //! Filter Pushdown Optimization Process //! //! The filter pushdown mechanism involves four key steps: -//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls `ExecutionPlan::gather_filters_for_pushdown` +//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`] //! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] //! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. //! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child, //! passing the appropriate filters (`Vec>`) for that child. //! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, //! containing information about which filters were successfully pushed down vs. unsupported. -//! 4. **Parent Responds**: The optimizer calls `ExecutionPlan::handle_child_pushdown_result` on the parent, +//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, //! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides //! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). +//! +//! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown +//! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result //! //! See also datafusion/physical-optimizer/src/filter_pushdown.rs. From 2f4817a2e15f11c53621b37192736a5a1df7e610 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 3 Jul 2025 16:17:43 -0500 Subject: [PATCH 15/15] fmt --- datafusion/physical-optimizer/src/filter_pushdown.rs | 2 +- datafusion/physical-plan/src/filter_pushdown.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 414c33956b686..eb7ec81b260ea 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -28,7 +28,7 @@ //! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, //! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides //! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). -//! +//! //! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription use std::sync::Arc; diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 70b5ee0633932..b50f86382edf0 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -28,7 +28,7 @@ //! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, //! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides //! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). -//! +//! //! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown //! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result //!