diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 69347f440c365..7b8c45dc04e98 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,7 +41,6 @@ use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; -use datafusion_physical_plan::filter_pushdown::PredicateSupport; use datafusion_physical_plan::filter_pushdown::PredicateSupports; use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -613,22 +612,15 @@ impl FileSource for ParquetSource { let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled; let mut source = self.clone(); - let mut allowed_filters = vec![]; - let mut remaining_filters = vec![]; - for filter in &filters { - if can_expr_be_pushed_down_with_schemas(filter, &file_schema) { - // This filter can be pushed down - allowed_filters.push(Arc::clone(filter)); - } else { - // This filter cannot be pushed down - remaining_filters.push(Arc::clone(filter)); - } - } - if allowed_filters.is_empty() { + let filters = PredicateSupports::new_with_supported_check(filters, |filter| { + can_expr_be_pushed_down_with_schemas(filter, &file_schema) + }); + if filters.is_all_unsupported() { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. - return Ok(FilterPushdownPropagation::unsupported(filters)); + return Ok(FilterPushdownPropagation::with_filters(filters)); } + let allowed_filters = filters.collect_supported(); let predicate = match source.predicate { Some(predicate) => conjunction( std::iter::once(predicate).chain(allowed_filters.iter().cloned()), @@ -637,23 +629,14 @@ impl FileSource for ParquetSource { }; source.predicate = Some(predicate); let source = Arc::new(source); - let filters = PredicateSupports::new( - allowed_filters - .into_iter() - .map(|f| { - if pushdown_filters { - PredicateSupport::Supported(f) - } else { - PredicateSupport::Unsupported(f) - } - }) - .chain( - remaining_filters - .into_iter() - .map(PredicateSupport::Unsupported), - ) - .collect(), - ); + // If pushdown_filters is false we tell our parents that they still have to handle the filters, + // even if we updated the predicate to include the filters (they will only be used for stats pruning). + if !pushdown_filters { + return Ok(FilterPushdownPropagation::with_filters( + filters.make_unsupported(), + ) + .with_updated_node(source)); + } Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b81b3c8beeac1..a99c1729ceb4f 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -536,7 +536,20 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. /// + /// When returning filters via [`FilterPushdownPropagation`] the order of the filters need not match + /// the order they were passed in via `child_pushdown_result`, but preserving the order may be beneficial + /// for debugging and reasoning about the resulting plans so it is recommended to preserve the order. + /// + /// There are various helper methods to make implementing this method easier, see: + /// - [`FilterPushdownPropagation::unsupported`]: to indicate that the node does not support filter pushdown at all. + /// - [`FilterPushdownPropagation::transparent`]: to indicate that the node supports filter pushdown but does not involve itself in it, + /// instead if simply transmits the result of pushdown into its children back up to its parent. + /// - [`PredicateSupports::new_with_supported_check`]: takes a callback that returns true / false for each filter to indicate pushdown support. + /// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`] + /// to dynamically build a result with a mix of supported and unsupported filters. + /// /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported + /// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check fn handle_child_pushdown_result( &self, child_pushdown_result: ChildPushdownResult, diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 4e84fe36f98f3..5222a98e3dd54 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -31,6 +31,16 @@ pub enum PredicateSupport { Unsupported(Arc), } +impl PredicateSupport { + pub fn into_inner(self) -> Arc { + match self { + PredicateSupport::Supported(expr) | PredicateSupport::Unsupported(expr) => { + expr + } + } + } +} + /// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of /// supported and unsupported filters. Inner vector stores each predicate for one node. #[derive(Debug, Clone)] @@ -60,6 +70,25 @@ impl PredicateSupports { Self::new(pushdowns) } + /// Create a new [`PredicateSupport`] with filterrs marked as supported if + /// `f` returns true and unsupported otherwise. + pub fn new_with_supported_check( + filters: Vec>, + check: impl Fn(&Arc) -> bool, + ) -> Self { + let pushdowns = filters + .into_iter() + .map(|f| { + if check(&f) { + PredicateSupport::Supported(f) + } else { + PredicateSupport::Unsupported(f) + } + }) + .collect(); + Self::new(pushdowns) + } + /// Transform all filters to supported, returning a new [`PredicateSupports`] /// with all filters as [`PredicateSupport::Supported`]. /// This does not modify the original [`PredicateSupport`]. @@ -102,6 +131,18 @@ impl PredicateSupports { .collect() } + /// Collect supported filters into a Vec, without removing them from the original + /// [`PredicateSupport`]. + pub fn collect_supported(&self) -> Vec> { + self.0 + .iter() + .filter_map(|f| match f { + PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Unsupported(_) => None, + }) + .collect() + } + /// Collect all filters into a Vec, without removing them from the original /// FilterPushdowns. pub fn collect_all(self) -> Vec> { @@ -132,6 +173,20 @@ impl PredicateSupports { pub fn is_empty(&self) -> bool { self.0.is_empty() } + + /// Check if all filters are supported. + pub fn is_all_supported(&self) -> bool { + self.0 + .iter() + .all(|f| matches!(f, PredicateSupport::Supported(_))) + } + + /// Check if all filters are unsupported. + pub fn is_all_unsupported(&self) -> bool { + self.0 + .iter() + .all(|f| matches!(f, PredicateSupport::Unsupported(_))) + } } impl IntoIterator for PredicateSupports { diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 01e0ad2fee12c..c8cd9bb441d27 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -246,3 +246,38 @@ physical_plan 02)--FilterExec: val@0 != part@1 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)] + +# The order of filters should not matter +query TT +EXPLAIN select val, part from t_pushdown where part = 'a' AND part = val; +---- +logical_plan +01)Filter: t_pushdown.val = t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 = part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet + +query TT +select val, part from t_pushdown where part = 'a' AND part = val; +---- +a a + +query TT +EXPLAIN select val, part from t_pushdown where part = val AND part = 'a'; +---- +logical_plan +01)Filter: t_pushdown.val = t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 = part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet + +query TT +select val, part from t_pushdown where part = val AND part = 'a'; +---- +a a \ No newline at end of file