From 39a07ed2d52357da6538c4d60d0ef80b47b209f5 Mon Sep 17 00:00:00 2001 From: notashes Date: Wed, 11 Feb 2026 03:12:57 +0530 Subject: [PATCH 1/5] fix: disable dynamic filter pushdown for non min/max aggregates --- datafusion/physical-plan/src/aggregates/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index dcfa0456ac525..64d154bb7fe32 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1129,6 +1129,16 @@ impl AggregateExec { return; } + // Don't push the dynamic filter if the query includes aggregates like + // COUNT, SUM, AVG as they usually require all of the rows matching the WHERE clause + let has_non_minmax = self.aggr_expr.iter().any(|aggr_expr| { + let fun_name = aggr_expr.fun().name(); + !fun_name.eq_ignore_ascii_case("min") && !fun_name.eq_ignore_ascii_case("max") + }); + if has_non_minmax { + return; + } + // Collect supported accumulators // It is assumed the order of aggregate expressions are not changed from `AggregateExec` // to `AggregateStream` From fa8cf6709822d1308bd42d0db7b1961b7681a504 Mon Sep 17 00:00:00 2001 From: notashes Date: Wed, 11 Feb 2026 08:47:44 +0530 Subject: [PATCH 2/5] Update datafusion/physical-plan/src/aggregates/mod.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 64d154bb7fe32..af8d914aebe55 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1157,7 +1157,7 @@ impl AggregateExec { } else if fun_name.eq_ignore_ascii_case("max") { DynamicFilterAggregateType::Max } else { - continue; + return; }; // 2. arg should be only 1 column reference From eb69ab94a4d4b47cb468286ea7a99afad8a5fffa Mon Sep 17 00:00:00 2001 From: notashes Date: Wed, 11 Feb 2026 08:47:55 +0530 Subject: [PATCH 3/5] Update datafusion/physical-plan/src/aggregates/mod.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/physical-plan/src/aggregates/mod.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index af8d914aebe55..6cd8557421e5d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1129,16 +1129,6 @@ impl AggregateExec { return; } - // Don't push the dynamic filter if the query includes aggregates like - // COUNT, SUM, AVG as they usually require all of the rows matching the WHERE clause - let has_non_minmax = self.aggr_expr.iter().any(|aggr_expr| { - let fun_name = aggr_expr.fun().name(); - !fun_name.eq_ignore_ascii_case("min") && !fun_name.eq_ignore_ascii_case("max") - }); - if has_non_minmax { - return; - } - // Collect supported accumulators // It is assumed the order of aggregate expressions are not changed from `AggregateExec` // to `AggregateStream` From 507bcad6d4a38c971247b486e7add06631a5be38 Mon Sep 17 00:00:00 2001 From: notashes Date: Wed, 11 Feb 2026 18:30:53 +0530 Subject: [PATCH 4/5] test: add SLT test for testing dynamic filter pushdown for queries with non min/max aggregates --- .../dynamic_filter_pushdown_config.slt | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index b112d70f427f1..51635c7c6d382 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -288,6 +288,42 @@ physical_plan 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha AND DynamicFilter [ empty ], pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] +# Test 4b: Mixed aggregates with non MIN/MAX should not have DynamicFilter + +# Pure MAX — DynamicFilter is expected here as we can safely prune that don't match the filter. +query TT +EXPLAIN SELECT MAX(score) FROM agg_parquet WHERE category = 'alpha'; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[max(agg_parquet.score)]] +02)--Projection: agg_parquet.score +03)----Filter: agg_parquet.category = Utf8View("alpha") +04)------TableScan: agg_parquet projection=[category, score], partial_filters=[agg_parquet.category = Utf8View("alpha")] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_parquet.score)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_parquet.score)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha AND DynamicFilter [ empty ], pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] + +# COUNT + MAX — DynamicFilter should NOT appear here in mixed aggregates +query TT +EXPLAIN SELECT COUNT(*), MAX(score) FROM agg_parquet WHERE category = 'alpha'; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*), max(agg_parquet.score) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), max(agg_parquet.score)]] +03)----Projection: agg_parquet.score +04)------Filter: agg_parquet.category = Utf8View("alpha") +05)--------TableScan: agg_parquet projection=[category, score], partial_filters=[agg_parquet.category = Utf8View("alpha")] +physical_plan +01)ProjectionExec: expr=[count(Int64(1))@0 as count(*), max(agg_parquet.score)@1 as max(agg_parquet.score)] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1)), max(agg_parquet.score)] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1)), max(agg_parquet.score)] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha, pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] + # Disable aggregate dynamic filters only statement ok SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = false; From a50e3ad1569b8395a401505f1e5589c761b6816b Mon Sep 17 00:00:00 2001 From: notashes Date: Wed, 11 Feb 2026 18:52:05 +0530 Subject: [PATCH 5/5] chore: remove reundant test --- .../dynamic_filter_pushdown_config.slt | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 51635c7c6d382..1b037ee2b83af 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -288,25 +288,8 @@ physical_plan 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha AND DynamicFilter [ empty ], pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] -# Test 4b: Mixed aggregates with non MIN/MAX should not have DynamicFilter +# Test 4b: COUNT + MAX — DynamicFilter should NOT appear here in mixed aggregates -# Pure MAX — DynamicFilter is expected here as we can safely prune that don't match the filter. -query TT -EXPLAIN SELECT MAX(score) FROM agg_parquet WHERE category = 'alpha'; ----- -logical_plan -01)Aggregate: groupBy=[[]], aggr=[[max(agg_parquet.score)]] -02)--Projection: agg_parquet.score -03)----Filter: agg_parquet.category = Utf8View("alpha") -04)------TableScan: agg_parquet projection=[category, score], partial_filters=[agg_parquet.category = Utf8View("alpha")] -physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_parquet.score)] -02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_parquet.score)] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha AND DynamicFilter [ empty ], pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] - -# COUNT + MAX — DynamicFilter should NOT appear here in mixed aggregates query TT EXPLAIN SELECT COUNT(*), MAX(score) FROM agg_parquet WHERE category = 'alpha'; ----