From 7f2eb67a2fcf3cdd042908a9c95efa2c831ffeed Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 8 Jun 2025 13:39:14 -0500 Subject: [PATCH 01/14] implement dynamic filter pushdown for topk operator --- datafusion/common/src/config.rs | 7 + datafusion/core/tests/fuzz_cases/mod.rs | 1 + .../tests/fuzz_cases/topk_filter_pushdown.rs | 354 ++++++++++++++++++ .../physical_optimizer/filter_pushdown/mod.rs | 94 ++++- .../filter_pushdown/util.rs | 14 + .../physical-expr/src/expressions/mod.rs | 1 + .../physical-optimizer/src/optimizer.rs | 8 +- datafusion/physical-plan/src/sorts/sort.rs | 43 ++- datafusion/physical-plan/src/topk/mod.rs | 169 ++++++++- .../test_files/information_schema.slt | 2 + .../test_files/parquet_filter_pushdown.slt | 20 +- .../test_files/push_down_filter.slt | 31 +- datafusion/sqllogictest/test_files/topk.slt | 14 +- docs/source/user-guide/configs.md | 1 + 14 files changed, 724 insertions(+), 35 deletions(-) create mode 100644 datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 4efb67a37c991..8ab7f06dc7b5b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -614,6 +614,13 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. + /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer + /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. + /// This means that if we already have 10 timestamps in the year 2025 + /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. + pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index 8ccc2a5bc1310..9e01621c02574 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -21,6 +21,7 @@ mod join_fuzz; mod merge_fuzz; mod sort_fuzz; mod sort_query_fuzz; +mod topk_filter_pushdown; mod aggregation_fuzzer; mod equivalence; diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs new file mode 100644 index 0000000000000..2b1b905d9390f --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; + +use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder}; +use arrow::datatypes::Int32Type; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::object_store::ObjectStoreUrl; +use itertools::Itertools; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; +use parquet::arrow::ArrowWriter; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; + +#[derive(Clone)] +struct TestDataSet { + store: Arc, + schema: Arc, +} + +/// List of in memory parquet files with UTF8 data +// Use a mutex rather than LazyLock to allow for async initialization +static TESTFILES: LazyLock>> = + LazyLock::new(|| Mutex::new(vec![])); + +async fn test_files() -> Vec { + let files_mutex = &TESTFILES; + let mut files = files_mutex.lock().await; + if !files.is_empty() { + return (*files).clone(); + } + + let mut rng = StdRng::seed_from_u64(0); + + for nulls_in_ids in [false, true] { + for nulls_in_names in [false, true] { + for nulls_in_departments in [false, true] { + let store = Arc::new(InMemory::new()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, nulls_in_ids), + Field::new("name", DataType::Utf8, nulls_in_names), + Field::new( + "department", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + nulls_in_departments, + ), + ])); + + let name_choices = if nulls_in_names { + [Some("Alice"), Some("Bob"), None, Some("David"), None] + } else { + [ + Some("Alice"), + Some("Bob"), + Some("Charlie"), + Some("David"), + Some("Eve"), + ] + }; + + let department_choices = if nulls_in_departments { + [ + Some("Theater"), + Some("Engineering"), + None, + Some("Arts"), + None, + ] + } else { + [ + Some("Theater"), + Some("Engineering"), + Some("Healthcare"), + Some("Arts"), + Some("Music"), + ] + }; + + // Generate 5 files, some with overlapping or repeated ids some without + for i in 0..5 { + let num_batches = rng.gen_range(1..3); + let mut batches = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + let num_rows = 25; + let ids = Int32Array::from_iter((0..num_rows).map(|file| { + if nulls_in_ids { + if rng.gen_bool(1.0 / 10.0) { + None + } else { + Some(rng.gen_range(file..file + 5)) + } + } else { + Some(rng.gen_range(file..file + 5)) + } + })); + let names = StringArray::from_iter((0..num_rows).map(|_| { + // randomly select a name + let idx = rng.gen_range(0..name_choices.len()); + name_choices[idx].map(|s| s.to_string()) + })); + let mut departments = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + // randomly select a department + let idx = rng.gen_range(0..department_choices.len()); + departments.append_option(department_choices[idx].as_ref()); + } + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(ids), + Arc::new(names), + Arc::new(departments.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + let mut buf = vec![]; + { + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + } + writer.flush().unwrap(); + writer.finish().unwrap(); + } + let payload = PutPayload::from(buf); + let path = Path::from(format!("file_{i}.parquet")); + store.put(&path, payload).await.unwrap(); + } + files.push(TestDataSet { store, schema }); + } + } + } + (*files).clone() +} + +async fn run_query_with_config( + query: &str, + config: SessionConfig, + dataset: TestDataSet, +) -> Vec { + let store = dataset.store; + let schema = dataset.schema; + let ctx = SessionContext::new_with_config(config); + let url = ObjectStoreUrl::parse("memory://").unwrap(); + ctx.register_object_store(url.as_ref(), store.clone()); + + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let options = ListingOptions::new(format); + let table_path = ListingTableUrl::parse("memory:///").unwrap(); + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .with_schema(schema); + let table = Arc::new(ListingTable::try_new(config).unwrap()); + + ctx.register_table("test_table", table).unwrap(); + + ctx.sql(query).await.unwrap().collect().await.unwrap() +} + +#[derive(Debug)] +struct RunQueryResult { + query: String, + result: Vec, + expected: Vec, +} + +impl RunQueryResult { + fn expected_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.expected).unwrap()) + } + + fn result_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.result).unwrap()) + } + + fn is_ok(&self) -> bool { + self.expected_formated() == self.result_formated() + } +} + +async fn run_query( + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +) -> RunQueryResult { + let cfg_with_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let cfg_without_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", false); + + let expected_result = + run_query_with_config(&query, cfg_without_dynamic_filters, dataset.clone()).await; + let result = + run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; + + RunQueryResult { + query: query.to_string(), + result, + expected: expected_result, + } +} + +struct TestCase { + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuzz_topk_filter_pushdown() { + let order_columns = ["id", "name", "department"]; + let order_directions = ["ASC", "DESC"]; + let null_orders = ["NULLS FIRST", "NULLS LAST"]; + + let start = datafusion_common::instant::Instant::now(); + let mut orders: HashMap> = HashMap::new(); + for order_column in &order_columns { + for order_direction in &order_directions { + for null_order in &null_orders { + // if there is a vec for this column insert the order, otherwise create a new vec + let ordering = + format!("{} {} {}", order_column, order_direction, null_order); + match orders.get_mut(*order_column) { + Some(order_vec) => { + order_vec.push(ordering); + } + None => { + orders.insert(order_column.to_string(), vec![ordering]); + } + } + } + } + } + + let mut queries = vec![]; + + for limit in [1, 10] { + for num_order_by_columns in [1, 2, 3] { + for order_columns in ["id", "name", "department"] + .iter() + .combinations(num_order_by_columns) + { + for orderings in order_columns + .iter() + .map(|col| orders.get(**col).unwrap()) + .multi_cartesian_product() + { + let query = format!( + "SELECT * FROM test_table ORDER BY {} LIMIT {}", + orderings.into_iter().join(", "), + limit + ); + queries.push(query); + } + } + } + } + + queries.sort_unstable(); + println!( + "Generated {} queries in {:?}", + queries.len(), + start.elapsed() + ); + + let start = datafusion_common::instant::Instant::now(); + let datasets = test_files().await; + println!("Generated test files in {:?}", start.elapsed()); + + let mut test_cases = vec![]; + for enable_filter_pushdown in [true, false] { + for query in &queries { + for dataset in &datasets { + let mut cfg = SessionConfig::new(); + cfg = cfg.set_bool( + "datafusion.optimizer.enable_dynamic_filter_pushdown", + enable_filter_pushdown, + ); + test_cases.push(TestCase { + query: query.to_string(), + cfg, + dataset: dataset.clone(), + }); + } + } + } + + let start = datafusion_common::instant::Instant::now(); + let mut join_set = JoinSet::new(); + for tc in test_cases { + join_set.spawn(run_query(tc.query, tc.cfg, tc.dataset)); + } + let mut results = join_set.join_all().await; + results.sort_unstable_by(|a, b| a.query.cmp(&b.query)); + println!("Ran {} test cases in {:?}", results.len(), start.elapsed()); + + let failures = results + .iter() + .filter(|result| !result.is_ok()) + .collect::>(); + + for failure in &failures { + println!("Failure:"); + println!("Query:\n{}", failure.query); + println!("\nExpected:\n{}", failure.expected_formated()); + println!("\nResult:\n{}", failure.result_formated()); + println!("\n\n"); + } + + if !failures.is_empty() { + panic!("Some test cases failed"); + } else { + println!("All test cases passed"); + } +} diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index a28933d97bcd1..ed42c3bc87f6e 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -17,28 +17,40 @@ use std::sync::{Arc, LazyLock}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::{ + array::record_batch, + datatypes::{DataType, Field, Schema, SchemaRef}, +}; +use arrow_schema::SortOptions; use datafusion::{ logical_expr::Operator, physical_plan::{ expressions::{BinaryExpr, Column, Literal}, PhysicalExpr, }, + prelude::{SessionConfig, SessionContext}, scalar::ScalarValue, }; use datafusion_common::config::ConfigOptions; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; -use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::{ + filter_pushdown::FilterPushdown, PhysicalOptimizerRule, +}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, filter::FilterExec, repartition::RepartitionExec, + sorts::sort::SortExec, + ExecutionPlan, }; -use util::{OptimizationTest, TestNode, TestScanBuilder}; +use futures::StreamExt; +use object_store::memory::InMemory; +use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder}; mod util; @@ -346,6 +358,80 @@ fn test_node_handles_child_pushdown_result() { ); } +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown() { + // This test is a bit of a hack, but it shows that we can push down dynamic filters + // into the DataSourceExec. The test is not perfect because we don't have a real + // implementation of the dynamic filter yet, so we just use a static filter. + let batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["bd", "bc"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + record_batch!( + ("a", Utf8, ["ac", "ad"]), + ("b", Utf8, ["bb", "ba"]), + ("c", Float64, [2.0, 1.0]) + ) + .unwrap(), + ]; + let scan = TestScanBuilder::new(schema()) + .with_support(true) + .with_batches(batches) + .build(); + let plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + col("b", &schema()).unwrap(), + SortOptions::new(true, false), // descending, nulls_first + )]), + Arc::clone(&scan), + ) + .with_fetch(Some(1)), + ) as Arc; + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown{}, true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown {}.optimize(plan, &config).unwrap(); + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + stream.next().await.unwrap().unwrap(); + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false], filter=[b@1 > bd] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 > bd ] + " + ); +} + /// Schema: /// a: String /// b: String diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 958dd8f59e933..1d4fd110ef0d6 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -271,6 +271,11 @@ impl TestScanBuilder { self } + pub fn with_batches(mut self, batches: Vec) -> Self { + self.batches = batches; + self + } + pub fn build(self) -> Arc { let source = Arc::new(TestSource::new(self.support, self.batches)); let base_config = FileScanConfigBuilder::new( @@ -426,6 +431,15 @@ fn format_lines(s: &str) -> Vec { s.trim().split('\n').map(|s| s.to_string()).collect() } +pub fn format_plan_for_test(plan: &Arc) -> String { + let mut out = String::new(); + for line in format_execution_plan(plan) { + out.push_str(&format!(" - {line}\n")); + } + out.push('\n'); + out +} + #[derive(Debug)] pub(crate) struct TestNode { inject_filter: bool, diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index d77207fbbcd76..8f46133ed0bbb 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,6 +43,7 @@ pub use case::{case, CaseExpr}; pub use cast::{cast, CastExpr}; pub use column::{col, with_new_schema, Column}; pub use datafusion_expr::utils::format_state_name; +pub use dynamic_filters::DynamicFilterPhysicalExpr; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index d5129cea9d4ef..d5af8a4b2d54e 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -96,10 +96,6 @@ impl PhysicalOptimizer { // as that rule may inject other operations in between the different AggregateExecs. // Applying the rule early means only directly-connected AggregateExecs must be examined. Arc::new(LimitedDistinctAggregation::new()), - // The FilterPushdown rule tries to push down filters as far as it can. - // For example, it will push down filtering from a `FilterExec` to - // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. - Arc::new(FilterPushdown::new()), // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution // requirements. Please make sure that the whole plan tree is determined before this rule. // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at @@ -139,6 +135,10 @@ impl PhysicalOptimizer { // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), Arc::new(InsertYieldExec::new()), + // The FilterPushdown rule tries to push down filters as far as it can. + // For example, it will push down filtering from a `FilterExec` to + // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. + Arc::new(FilterPushdown::new()), // The SanityCheckPlan rule checks whether the order and // distribution requirements of each node in the plan // is satisfied. It will also reject non-runnable query diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c31f17291e197..59fdaabe78ec5 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; +use crate::filter_pushdown::FilterDescription; use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, @@ -53,6 +54,8 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -843,6 +846,8 @@ pub struct SortExec { common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Filter matching the state of the sort for dynamic filter pushdown + filter: Option>, } impl SortExec { @@ -861,6 +866,7 @@ impl SortExec { fetch: None, common_sort_prefix: sort_prefix, cache, + filter: None, } } @@ -906,6 +912,14 @@ impl SortExec { if fetch.is_some() && is_pipeline_friendly { cache = cache.with_boundedness(Boundedness::Bounded); } + let filter = fetch.is_some().then(|| { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + }); SortExec { input: Arc::clone(&self.input), expr: self.expr.clone(), @@ -914,6 +928,7 @@ impl SortExec { common_sort_prefix: self.common_sort_prefix.clone(), fetch, cache, + filter, } } @@ -1009,6 +1024,13 @@ impl DisplayAs for SortExec { match self.fetch { Some(fetch) => { write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?; + if let Some(filter) = &self.filter { + if let Ok(current) = filter.current() { + if !current.eq(&lit(true)) { + write!(f, ", filter=[{}]", current)?; + } + } + } if !self.common_sort_prefix.is_empty() { write!(f, ", sort_prefix=[")?; let mut first = true; @@ -1079,9 +1101,10 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) + let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch) .with_preserve_partitioning(self.preserve_partitioning); + new_sort.filter = self.filter.clone(); Ok(Arc::new(new_sort)) } @@ -1122,6 +1145,7 @@ impl ExecutionPlan for SortExec { context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, + self.filter.clone(), )?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -1228,6 +1252,23 @@ impl ExecutionPlan for SortExec { .with_preserve_partitioning(self.preserve_partitioning()), ))) } + + fn gather_filters_for_pushdown( + &self, + parent_filters: Vec>, + config: &datafusion_common::config::ConfigOptions, + ) -> Result { + if let Some(filter) = &self.filter { + if config.optimizer.enable_dynamic_filter_pushdown { + let filter = Arc::clone(filter) as Arc; + return Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(filter)); + } + } + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index d8b6f0e400b83..307e8f1eab848 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -17,6 +17,12 @@ //! TopK: Combination of Sort / LIMIT +use arrow::{ + array::Array, + compute::interleave_record_batch, + row::{RowConverter, Rows, SortField}, +}; +use datafusion_expr::{ColumnarValue, Operator}; use std::mem::size_of; use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; @@ -25,15 +31,17 @@ use crate::spill::get_record_batch_memory_size; use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{ArrayRef, RecordBatch}; -use arrow::compute::interleave_record_batch; use arrow::datatypes::SchemaRef; -use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::{internal_datafusion_err, HashMap, Result}; +use datafusion_common::{internal_datafusion_err, internal_err, HashMap, Result, ScalarValue}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr::{ + expressions::{is_not_null, is_null, lit, BinaryExpr, DynamicFilterPhysicalExpr}, + PhysicalExpr, +}; /// Global TopK /// @@ -110,6 +118,8 @@ pub struct TopK { common_sort_prefix_converter: Option, /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, + /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown + filter: Option>, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. @@ -148,6 +158,7 @@ impl TopK { batch_size: usize, runtime: Arc, metrics: &ExecutionPlanMetricsSet, + filter: Option>, ) -> Result { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); @@ -179,6 +190,7 @@ impl TopK { common_sort_prefix_converter: prefix_row_converter, common_sort_prefix: Arc::from(common_sort_prefix), finished: false, + filter, }) } @@ -207,6 +219,7 @@ impl TopK { // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) // this avoids some work and also might be better vectorizable. let mut batch_entry = self.heap.register_batch(batch.clone()); + let mut updated = false; for (index, row) in rows.iter().enumerate() { match self.heap.max() { // heap has k items, and the new row is greater than the @@ -216,6 +229,7 @@ impl TopK { None | Some(_) => { self.heap.add(&mut batch_entry, row, index); self.metrics.row_replacements.add(1); + updated = true; } } } @@ -227,10 +241,108 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; - // flag the topK as finished if we know that all - // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, - // which means the top K won't change and the computation can be finished early. - self.attempt_early_completion(&batch)?; + if updated { + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, + // which means the top K won't change and the computation can be finished early. + self.attempt_early_completion(&batch)?; + // update the filter representation of our TopK heap + self.update_filter()?; + } + + Ok(()) + } + + /// Update the filter representation of our TopK heap + fn update_filter(&mut self) -> Result<()> { + let Some(filter) = &self.filter else { + return Ok(()); + }; + if let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? { + // Create filter expressions for each threshold + let mut filters: Vec> = + Vec::with_capacity(thresholds.len()); + + let mut prev_sort_expr: Option> = None; + for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { + // Create the appropriate operator based on sort order + let op = if sort_expr.options.descending { + // For descending sort, we want col > threshold (exclude smaller values) + Operator::Gt + } else { + // For ascending sort, we want col < threshold (exclude larger values) + Operator::Lt + }; + + let value_null = value.is_null(); + + let comparison = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + op, + lit(value.clone()), + )); + + let comparison_with_null = + match (sort_expr.options.nulls_first, value_null) { + // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) + (true, true) => lit(false), + (true, false) => Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + comparison, + )), + // For nulls last, transform to (threshold.value is null and threshold.expr is not null) + // or (threshold.value is not null and comparison) + (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?, + (false, false) => comparison, + }; + + let mut eq_expr = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + Operator::Eq, + lit(value.clone()), + )); + + if value_null { + eq_expr = Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + eq_expr, + )); + } + + // For a query like order by a, b, the filter for column `b` is only applied if + // the condition a = threshold.value (considering null equality) is met. + // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, + // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. + match prev_sort_expr.take() { + None => { + prev_sort_expr = Some(eq_expr); + filters.push(comparison_with_null); + } + Some(p) => { + filters.push(Arc::new(BinaryExpr::new( + Arc::clone(&p), + Operator::And, + comparison_with_null, + ))); + + prev_sort_expr = + Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); + } + } + } + + let dynamic_predicate = filters + .into_iter() + .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); + + if let Some(predicate) = dynamic_predicate { + if !predicate.eq(&lit(true)) { + filter.update(predicate)?; + } + } + } Ok(()) } @@ -324,6 +436,7 @@ impl TopK { common_sort_prefix_converter: _, common_sort_prefix: _, finished: _, + filter: _, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop @@ -566,6 +679,47 @@ impl TopKHeap { + self.store.size() + self.owned_bytes } + + fn get_threshold_values( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Result>> { + // If the heap doesn't have k elements yet, we can't create thresholds + let max_row = match self.max() { + Some(row) => row, + None => return Ok(None), + }; + + // Get the batch that contains the max row + let batch_entry = match self.store.get(max_row.batch_id) { + Some(entry) => entry, + None => return internal_err!("Invalid batch ID in TopKRow"), + }; + + // Extract threshold values for each sort expression + let mut scalar_values = Vec::with_capacity(sort_exprs.len()); + for sort_expr in sort_exprs { + // Extract the value for this column from the max row + let expr = Arc::clone(&sort_expr.expr); + let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 1))?; + + // Convert to scalar value - should be a single value since we're evaluating on a single row batch + let scalar = match value { + ColumnarValue::Scalar(scalar) => scalar, + ColumnarValue::Array(array) if array.len() == 1 => { + // Extract the first (and only) value from the array + ScalarValue::try_from_array(&array, 0)? + } + array => { + return internal_err!("Expected a scalar value, got {:?}", array) + } + }; + + scalar_values.push(scalar); + } + + Ok(Some(scalar_values)) + } } /// Represents one of the top K rows held in this heap. Orders @@ -834,6 +988,7 @@ mod tests { 2, runtime, &metrics, + None, )?; // Create the first batch with two columns: diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index dc8b7680d83e1..42d64c5ee155d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -285,6 +285,7 @@ datafusion.format.types_info false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true +datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.expand_views_at_output false @@ -396,6 +397,7 @@ datafusion.format.types_info false Show types in visual representation batches datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. +datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index e5b5f5ac878a9..9f7d8e011f97f 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -86,7 +86,10 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------ProjectionExec: expr=[a@0 as a] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] # When filter pushdown *is* enabled, ParquetExec can filter exactly, @@ -134,7 +137,10 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------ProjectionExec: expr=[a@0 as a] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] query I @@ -153,7 +159,10 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------ProjectionExec: expr=[b@1 as b] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] ## cleanup statement ok @@ -229,7 +238,10 @@ EXPLAIN select * from t_pushdown where val != 'c'; logical_plan 01)Filter: t_pushdown.val != Utf8("c") 02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != Utf8("c")] -physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)] # If we have a mix of filters: # - The partition filters get evaluated during planning diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index ed948dd11439a..b4c74610f0640 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -192,7 +192,8 @@ explain select * from test_filter_with_limit where value = 2 limit 1; ---- physical_plan 01)CoalescePartitionsExec: fetch=1 -02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)] +02)--CoalesceBatchesExec: target_batch_size=8192, fetch=1 +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)] query II select * from test_filter_with_limit where value = 2 limit 1; @@ -229,43 +230,57 @@ LOCATION 'test_files/scratch/push_down_filter/t.parquet'; query TT explain select a from t where a = '100'; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] # The predicate should not have a column cast when the value is a valid i32 query TT explain select a from t where a != '100'; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)] # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = '99999999999'; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999 +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999 # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = '99.99'; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99 +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99 # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = ''; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = # The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information. query TT explain select a from t where cast(a as string) = '100'; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] # The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost). query TT explain select a from t where CAST(a AS string) = '0123'; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 0123 +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 0123 statement ok diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 26fef6d666b10..9ff382d32af95 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -316,7 +316,7 @@ explain select number, letter, age from partial_sorted order by number desc, let ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Explain variations of the above query with different orderings, and different sort prefixes. @@ -326,28 +326,28 @@ explain select number, letter, age from partial_sorted order by age desc limit 3 ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[age@2 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number desc, letter desc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number asc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by letter asc, number desc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[letter@1 ASC NULLS LAST, number@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Explicit NULLS ordering cases (reversing the order of the NULLS on the number and letter orderings) query TT @@ -355,14 +355,14 @@ explain select number, letter, age from partial_sorted order by number desc, let ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC], preserve_partitioning=[false], sort_prefix=[number@0 DESC] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number desc NULLS LAST, letter asc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC NULLS LAST, letter@1 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Verify that the sort prefix is correctly computed on the normalized ordering (removing redundant aliased columns) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 42282e39e41f5..fcb2360b87e3f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | From f36c72d819cf2cd4183b5c8d0909f05616b94834 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 8 Jun 2025 15:29:43 -0500 Subject: [PATCH 02/14] fix some tests --- .../tests/fuzz_cases/topk_filter_pushdown.rs | 12 +++---- .../physical_optimizer/filter_pushdown/mod.rs | 2 +- .../physical-optimizer/src/optimizer.rs | 6 ++++ datafusion/sqllogictest/test_files/limit.slt | 2 +- .../test_files/parquet_filter_pushdown.slt | 20 +++--------- .../test_files/push_down_filter.slt | 31 +++++-------------- 6 files changed, 26 insertions(+), 47 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 2b1b905d9390f..2fa3575bb65a5 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -108,30 +108,30 @@ async fn test_files() -> Vec { // Generate 5 files, some with overlapping or repeated ids some without for i in 0..5 { - let num_batches = rng.gen_range(1..3); + let num_batches = rng.random_range(1..3); let mut batches = Vec::with_capacity(num_batches); for _ in 0..num_batches { let num_rows = 25; let ids = Int32Array::from_iter((0..num_rows).map(|file| { if nulls_in_ids { - if rng.gen_bool(1.0 / 10.0) { + if rng.random_bool(1.0 / 10.0) { None } else { - Some(rng.gen_range(file..file + 5)) + Some(rng.random_range(file..file + 5)) } } else { - Some(rng.gen_range(file..file + 5)) + Some(rng.random_range(file..file + 5)) } })); let names = StringArray::from_iter((0..num_rows).map(|_| { // randomly select a name - let idx = rng.gen_range(0..name_choices.len()); + let idx = rng.random_range(0..name_choices.len()); name_choices[idx].map(|s| s.to_string()) })); let mut departments = StringDictionaryBuilder::::new(); for _ in 0..num_rows { // randomly select a department - let idx = rng.gen_range(0..department_choices.len()); + let idx = rng.random_range(0..department_choices.len()); departments.append_option(department_choices[idx].as_ref()); } let batch = RecordBatch::try_new( diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index ed42c3bc87f6e..58815d05d6c5e 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -386,7 +386,7 @@ async fn test_topk_dynamic_filter_pushdown() { LexOrdering::new(vec![PhysicalSortExpr::new( col("b", &schema()).unwrap(), SortOptions::new(true, false), // descending, nulls_first - )]), + )]).unwrap(), Arc::clone(&scan), ) .with_fetch(Some(1)), diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index d5af8a4b2d54e..49245551645e6 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -96,6 +96,12 @@ impl PhysicalOptimizer { // as that rule may inject other operations in between the different AggregateExecs. // Applying the rule early means only directly-connected AggregateExecs must be examined. Arc::new(LimitedDistinctAggregation::new()), + // The FilterPushdown rule tries to push down filters as far as it can. + // For example, it will push down filtering from a `FilterExec` to + // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. + // This must run before EnforceSorting to ensure dynamic filters from TopK operators + // are established before sorts are potentially recreated. + Arc::new(FilterPushdown::new()), // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution // requirements. Please make sure that the whole plan tree is determined before this rule. // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 9d5106bf2cafb..3398fa29018bc 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -854,7 +854,7 @@ physical_plan 01)ProjectionExec: expr=[1 as foo] 02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1 03)----SortExec: TopK(fetch=1), expr=[part_key@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], file_type=parquet +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query I with selection as ( diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 9f7d8e011f97f..e5b5f5ac878a9 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -86,10 +86,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------ProjectionExec: expr=[a@0 as a] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] # When filter pushdown *is* enabled, ParquetExec can filter exactly, @@ -137,10 +134,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------ProjectionExec: expr=[a@0 as a] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[] query I @@ -159,10 +153,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------ProjectionExec: expr=[b@1 as b] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] ## cleanup statement ok @@ -238,10 +229,7 @@ EXPLAIN select * from t_pushdown where val != 'c'; logical_plan 01)Filter: t_pushdown.val != Utf8("c") 02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != Utf8("c")] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)] +physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)] # If we have a mix of filters: # - The partition filters get evaluated during planning diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index b4c74610f0640..ed948dd11439a 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -192,8 +192,7 @@ explain select * from test_filter_with_limit where value = 2 limit 1; ---- physical_plan 01)CoalescePartitionsExec: fetch=1 -02)--CoalesceBatchesExec: target_batch_size=8192, fetch=1 -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)] +02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)] query II select * from test_filter_with_limit where value = 2 limit 1; @@ -230,57 +229,43 @@ LOCATION 'test_files/scratch/push_down_filter/t.parquet'; query TT explain select a from t where a = '100'; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] # The predicate should not have a column cast when the value is a valid i32 query TT explain select a from t where a != '100'; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)] # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = '99999999999'; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999 +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999 # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = '99.99'; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99 +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99 # The predicate should still have the column cast when the value is a NOT valid i32 query TT explain select a from t where a = ''; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = # The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information. query TT explain select a from t where cast(a as string) = '100'; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] # The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost). query TT explain select a from t where CAST(a AS string) = '0123'; ---- -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 0123 +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 0123 statement ok From b49c5aa77496833c42101f16aedfd1bc57c01994 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 8 Jun 2025 16:04:25 -0500 Subject: [PATCH 03/14] lint --- datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs | 2 +- .../core/tests/physical_optimizer/filter_pushdown/mod.rs | 8 +++----- datafusion/physical-plan/src/sorts/sort.rs | 4 ++-- datafusion/physical-plan/src/topk/mod.rs | 6 ++++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 2fa3575bb65a5..572222b7b9daa 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -258,7 +258,7 @@ async fn test_fuzz_topk_filter_pushdown() { for null_order in &null_orders { // if there is a vec for this column insert the order, otherwise create a new vec let ordering = - format!("{} {} {}", order_column, order_direction, null_order); + format!("{order_column} {order_direction} {null_order}"); match orders.get_mut(*order_column) { Some(order_vec) => { order_vec.push(ordering); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 58815d05d6c5e..1f525af23dbbd 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -36,9 +36,7 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; -use datafusion_physical_optimizer::{ - filter_pushdown::FilterPushdown, PhysicalOptimizerRule, -}; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, @@ -386,7 +384,8 @@ async fn test_topk_dynamic_filter_pushdown() { LexOrdering::new(vec![PhysicalSortExpr::new( col("b", &schema()).unwrap(), SortOptions::new(true, false), // descending, nulls_first - )]).unwrap(), + )]) + .unwrap(), Arc::clone(&scan), ) .with_fetch(Some(1)), @@ -410,7 +409,6 @@ async fn test_topk_dynamic_filter_pushdown() { // Actually apply the optimization to the plan let mut config = ConfigOptions::default(); config.execution.parquet.pushdown_filters = true; - let plan = FilterPushdown {}.optimize(plan, &config).unwrap(); let config = SessionConfig::new().with_batch_size(2); let session_ctx = SessionContext::new_with_config(config); session_ctx.register_object_store( diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 59fdaabe78ec5..570d64449040e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -53,8 +53,8 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; -use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; @@ -1027,7 +1027,7 @@ impl DisplayAs for SortExec { if let Some(filter) = &self.filter { if let Ok(current) = filter.current() { if !current.eq(&lit(true)) { - write!(f, ", filter=[{}]", current)?; + write!(f, ", filter=[{current}]")?; } } } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 307e8f1eab848..a610a47fb90aa 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -32,16 +32,18 @@ use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; -use datafusion_common::{internal_datafusion_err, internal_err, HashMap, Result, ScalarValue}; +use datafusion_common::{ + internal_datafusion_err, internal_err, HashMap, Result, ScalarValue, +}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr::{ expressions::{is_not_null, is_null, lit, BinaryExpr, DynamicFilterPhysicalExpr}, PhysicalExpr, }; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; /// Global TopK /// From f092bf5083bf94c0baeddd03d4ca2ebc228edd36 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 10 Jun 2025 06:41:15 -0500 Subject: [PATCH 04/14] pass around the filter in EnforceSorting optimizer rule --- .../tests/fuzz_cases/topk_filter_pushdown.rs | 3 +- .../physical_optimizer/filter_pushdown/mod.rs | 68 +++++++++++++++++-- .../src/expressions/dynamic_filters.rs | 3 +- .../src/enforce_distribution.rs | 1 + .../src/enforce_sorting/mod.rs | 23 ++++--- .../src/enforce_sorting/sort_pushdown.rs | 51 +++++++++++++- .../physical-optimizer/src/optimizer.rs | 6 -- .../src/topk_aggregation.rs | 7 +- datafusion/physical-optimizer/src/utils.rs | 8 ++- datafusion/physical-plan/src/sorts/sort.rs | 25 +++++-- datafusion/physical-plan/src/topk/mod.rs | 9 +-- .../test_files/parquet_filter_pushdown.slt | 35 ---------- 12 files changed, 169 insertions(+), 70 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 572222b7b9daa..7296502264e68 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -257,8 +257,7 @@ async fn test_fuzz_topk_filter_pushdown() { for order_direction in &order_directions { for null_order in &null_orders { // if there is a vec for this column insert the order, otherwise create a new vec - let ordering = - format!("{order_column} {order_direction} {null_order}"); + let ordering = format!("{order_column} {order_direction} {null_order}"); match orders.get_mut(*order_column) { Some(order_vec) => { order_vec.push(ordering); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 1f525af23dbbd..7087942ab3153 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, LazyLock}; use arrow::{ array::record_batch, datatypes::{DataType, Field, Schema, SchemaRef}, + util::pretty::pretty_format_batches, }; use arrow_schema::SortOptions; use datafusion::{ @@ -28,7 +29,7 @@ use datafusion::{ expressions::{BinaryExpr, Column, Literal}, PhysicalExpr, }, - prelude::{SessionConfig, SessionContext}, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, scalar::ScalarValue, }; use datafusion_common::config::ConfigOptions; @@ -36,7 +37,9 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; -use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_optimizer::{ + filter_pushdown::FilterPushdown, PhysicalOptimizerRule, +}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, @@ -47,7 +50,7 @@ use datafusion_physical_plan::{ }; use futures::StreamExt; -use object_store::memory::InMemory; +use object_store::{memory::InMemory, ObjectStore}; use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder}; mod util; @@ -406,9 +409,10 @@ async fn test_topk_dynamic_filter_pushdown() { " ); - // Actually apply the optimization to the plan + // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state let mut config = ConfigOptions::default(); config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown::new().optimize(plan, &config).unwrap(); let config = SessionConfig::new().with_batch_size(2); let session_ctx = SessionContext::new_with_config(config); session_ctx.register_object_store( @@ -430,6 +434,62 @@ async fn test_topk_dynamic_filter_pushdown() { ); } +/// Integration test for dynamic filter pushdown with TopK. +/// We use an integration test because there are complex interactions in the optimizer rules +/// that the unit tests applying a single optimizer rule do not cover. +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown_integration() { + let store = Arc::new(InMemory::new()) as Arc; + let mut cfg = SessionConfig::new(); + cfg.options_mut().execution.parquet.pushdown_filters = true; + cfg.options_mut().execution.parquet.max_row_group_size = 128; + let ctx = SessionContext::new_with_config(cfg); + ctx.register_object_store( + ObjectStoreUrl::parse("memory://").unwrap().as_ref(), + Arc::clone(&store), + ); + ctx.sql( + r" +COPY ( + SELECT 1372708800 + value AS t + FROM generate_series(0, 99999) + ORDER BY t + ) TO 'memory:///1.parquet' +STORED AS PARQUET; + ", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Register the file with the context + ctx.register_parquet( + "topk_pushdown", + "memory:///1.parquet", + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Create a TopK query that will use dynamic filter pushdown + let df = ctx + .sql(r"EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t LIMIT 10;") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let explain = format!("{}", pretty_format_batches(&batches).unwrap()); + + assert!(explain.contains("output_rows=128")); // Read 1 row group + assert!(explain.contains("t@0 < 1372708809")); // Dynamic filter was applied + assert!( + explain.contains("pushdown_rows_matched=128, pushdown_rows_pruned=99872"), + "{explain}" + ); + // Pushdown pruned most rows +} + /// Schema: /// a: String /// b: String diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 756fb638af2b5..09a8da61cdfa5 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -288,8 +288,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } fn snapshot(&self) -> Result>> { + let inner = self.current()?; // Return the current expression as a snapshot. - Ok(Some(self.current()?)) + Ok(Some(inner)) } } diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 478ce39eecb9e..bf08e4a9c243c 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1308,6 +1308,7 @@ pub fn ensure_distribution( .downcast_ref::() .map(|output| output.fetch()) .unwrap_or(None), + None, )?; } } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 8a71b28486a2a..78360ab30fdee 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -57,6 +57,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -403,7 +404,7 @@ pub fn parallelize_sorts( && requirements.plan.output_partitioning().partition_count() <= 1 { // Take the initial sort expressions and requirements - let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; + let (sort_exprs, fetch, filter) = get_sort_exprs(&requirements.plan)?; let sort_reqs = LexRequirement::from(sort_exprs.clone()); let sort_exprs = sort_exprs.clone(); @@ -417,7 +418,7 @@ pub fn parallelize_sorts( // deals with the children and their children and so on. requirements = requirements.children.swap_remove(0); - requirements = add_sort_above_with_check(requirements, sort_reqs, fetch)?; + requirements = add_sort_above_with_check(requirements, sort_reqs, fetch, filter)?; let spm = SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan)); @@ -513,6 +514,7 @@ pub fn ensure_sorting( .downcast_ref::() .map(|output| output.fetch()) .unwrap_or(None), + None, ); child = update_sort_ctx_children_data(child, true)?; } @@ -644,7 +646,7 @@ fn adjust_window_sort_removal( // Satisfy the ordering requirement so that the window can run: let mut child_node = window_tree.children.swap_remove(0); if let Some(reqs) = reqs { - child_node = add_sort_above(child_node, reqs.into_single(), None); + child_node = add_sort_above(child_node, reqs.into_single(), None, None); } let child_plan = Arc::clone(&child_node.plan); window_tree.children.push(child_node); @@ -803,15 +805,20 @@ fn remove_corresponding_sort_from_sub_plan( Ok(node) } +/// Return type for get_sort_exprs function +type SortExprsResult<'a> = ( + &'a LexOrdering, + Option, + Option>, +); + /// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible. -fn get_sort_exprs( - sort_any: &Arc, -) -> Result<(&LexOrdering, Option)> { +fn get_sort_exprs(sort_any: &Arc) -> Result> { if let Some(sort_exec) = sort_any.as_any().downcast_ref::() { - Ok((sort_exec.expr(), sort_exec.fetch())) + Ok((sort_exec.expr(), sort_exec.fetch(), sort_exec.filter())) } else if let Some(spm) = sort_any.as_any().downcast_ref::() { - Ok((spm.expr(), spm.fetch())) + Ok((spm.expr(), spm.fetch(), None)) } else { plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec") } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index bd7b0060c3be3..1fe1ed3e21b10 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{internal_err, HashSet, JoinSide, Result}; use datafusion_expr::JoinType; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ add_offset_to_physical_sort_exprs, EquivalenceProperties, @@ -57,6 +57,7 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; pub struct ParentRequirements { ordering_requirement: Option, fetch: Option, + filter: Option>, } pub type SortPushDown = PlanContext; @@ -70,6 +71,8 @@ pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) { // If the parent has a fetch value, assign it to the children // Or use the fetch value of the child. fetch: child.plan.fetch(), + // If the parent has a filter, assign it to the children + filter: sort_push_down.data.filter.clone(), }; } } @@ -95,6 +98,7 @@ fn pushdown_sorts_helper( ) -> Result> { let plan = sort_push_down.plan; let parent_fetch = sort_push_down.data.fetch; + let parent_filter = sort_push_down.data.filter.clone(); let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone() else { @@ -114,6 +118,18 @@ fn pushdown_sorts_helper( sort_push_down.data.fetch = fetch; sort_push_down.data.ordering_requirement = Some(OrderingRequirements::from(sort_ordering)); + let filter = plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); + match filter { + Some(filter) => { + sort_push_down.data.filter = Some(filter); + } + None => { + sort_push_down.data.filter = parent_filter.clone(); + } + } // Recursive call to helper, so it doesn't transform_down and miss // the new node (previous child of sort): return pushdown_sorts_helper(sort_push_down); @@ -131,11 +147,20 @@ fn pushdown_sorts_helper( return internal_err!("SortExec should have output ordering"); }; + let filter = plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); + let sort_fetch = plan.fetch(); let parent_is_stricter = eqp.requirements_compatible( parent_requirement.first().clone(), sort_ordering.clone().into(), ); + let sort_filter = plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); // Remove the current sort as we are either going to prove that it is // unnecessary, or replace it with a stricter sort. @@ -152,17 +177,27 @@ fn pushdown_sorts_helper( sort_push_down, parent_requirement.into_single(), parent_fetch, + filter.clone(), ); // Update pushdown requirements: sort_push_down.children[0].data = ParentRequirements { ordering_requirement: Some(OrderingRequirements::from(sort_ordering)), fetch: sort_fetch, + filter, }; return Ok(Transformed::yes(sort_push_down)); } else { // Sort was unnecessary, just propagate the stricter fetch and // ordering requirements: sort_push_down.data.fetch = min_fetch(sort_fetch, parent_fetch); + match sort_filter { + Some(filter) => { + sort_push_down.data.filter = Some(filter); + } + None => { + sort_push_down.data.filter = parent_filter.clone(); + } + } let current_is_stricter = eqp.requirements_compatible( sort_ordering.clone().into(), parent_requirement.first().clone(), @@ -194,9 +229,22 @@ fn pushdown_sorts_helper( // For operators that can take a sort pushdown, continue with updated // requirements: let current_fetch = sort_push_down.plan.fetch(); + let current_filter = sort_push_down + .plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) { child.data.ordering_requirement = order; child.data.fetch = min_fetch(current_fetch, parent_fetch); + match current_filter { + Some(ref filter) => { + child.data.filter = Some(Arc::clone(filter)); + } + None => { + child.data.filter = parent_filter.clone(); + } + } } sort_push_down.data.ordering_requirement = None; } else { @@ -205,6 +253,7 @@ fn pushdown_sorts_helper( sort_push_down, parent_requirement.into_single(), parent_fetch, + parent_filter, ); assign_initial_requirements(&mut sort_push_down); } diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 49245551645e6..d5129cea9d4ef 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -99,8 +99,6 @@ impl PhysicalOptimizer { // The FilterPushdown rule tries to push down filters as far as it can. // For example, it will push down filtering from a `FilterExec` to // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. - // This must run before EnforceSorting to ensure dynamic filters from TopK operators - // are established before sorts are potentially recreated. Arc::new(FilterPushdown::new()), // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution // requirements. Please make sure that the whole plan tree is determined before this rule. @@ -141,10 +139,6 @@ impl PhysicalOptimizer { // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), Arc::new(InsertYieldExec::new()), - // The FilterPushdown rule tries to push down filters as far as it can. - // For example, it will push down filtering from a `FilterExec` to - // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. - Arc::new(FilterPushdown::new()), // The SanityCheckPlan rule checks whether the order and // distribution requirements of each node in the plan // is satisfied. It will also reject non-runnable query diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs b/datafusion/physical-optimizer/src/topk_aggregation.rs index bff0b1e49684f..33166284774be 100644 --- a/datafusion/physical-optimizer/src/topk_aggregation.rs +++ b/datafusion/physical-optimizer/src/topk_aggregation.rs @@ -130,10 +130,13 @@ impl TopKAggregation { Ok(Transformed::no(plan)) }; let child = Arc::clone(child).transform_down(closure).data().ok()?; - let sort = SortExec::new(sort.expr().clone(), child) + let mut new_sort = SortExec::new(sort.expr().clone(), child) .with_fetch(sort.fetch()) .with_preserve_partitioning(sort.preserve_partitioning()); - Some(Arc::new(sort)) + if let Some(filter) = sort.filter() { + new_sort = new_sort.with_filter(Arc::clone(&filter)); + } + Some(Arc::new(new_sort)) } } diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 3655e555a7440..431925033678b 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use datafusion_common::Result; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -39,6 +40,7 @@ pub fn add_sort_above( node: PlanContext, sort_requirements: LexRequirement, fetch: Option, + filter: Option>, ) -> PlanContext { let mut sort_reqs: Vec<_> = sort_requirements.into(); sort_reqs.retain(|sort_expr| { @@ -55,6 +57,9 @@ pub fn add_sort_above( if node.plan.output_partitioning().partition_count() > 1 { new_sort = new_sort.with_preserve_partitioning(true); } + if let Some(filter) = filter { + new_sort = new_sort.with_filter(filter); + } PlanContext::new(Arc::new(new_sort), T::default(), vec![node]) } @@ -65,13 +70,14 @@ pub fn add_sort_above_with_check( node: PlanContext, sort_requirements: LexRequirement, fetch: Option, + filter: Option>, ) -> Result> { if !node .plan .equivalence_properties() .ordering_satisfy_requirement(sort_requirements.clone())? { - Ok(add_sort_above(node, sort_requirements, fetch)) + Ok(add_sort_above(node, sort_requirements, fetch, filter)) } else { Ok(node) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 570d64449040e..1ffaf7f1641b8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -913,12 +913,15 @@ impl SortExec { cache = cache.with_boundedness(Boundedness::Bounded); } let filter = fetch.is_some().then(|| { - let children = self - .expr - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + // If we already have a filter, keep it. Otherwise, create a new one. + self.filter.clone().unwrap_or_else(|| { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + }) }); SortExec { input: Arc::clone(&self.input), @@ -932,6 +935,11 @@ impl SortExec { } } + pub fn with_filter(mut self, filter: Arc) -> Self { + self.filter = Some(filter); + self + } + /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -947,6 +955,11 @@ impl SortExec { self.fetch } + /// If `Some(filter)`, returns the filter expression that matches the state of the sort. + pub fn filter(&self) -> Option> { + self.filter.clone() + } + fn output_partitioning_helper( input: &Arc, preserve_partitioning: bool, diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index a610a47fb90aa..929f2873fe93c 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -243,11 +243,12 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, + // which means the top K won't change and the computation can be finished early. + self.attempt_early_completion(&batch)?; + if updated { - // flag the topK as finished if we know that all - // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, - // which means the top K won't change and the computation can be finished early. - self.attempt_early_completion(&batch)?; // update the filter representation of our TopK heap self.update_filter()?; } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index e5b5f5ac878a9..1b6ae13fbe771 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -246,38 +246,3 @@ physical_plan 02)--FilterExec: val@0 != part@1 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)] - -# The order of filters should not matter -query TT -EXPLAIN select val, part from t_pushdown where part = 'a' AND part = val; ----- -logical_plan -01)Filter: t_pushdown.val = t_pushdown.part -02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: val@0 = part@1 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet - -query TT -select val, part from t_pushdown where part = 'a' AND part = val; ----- -a a - -query TT -EXPLAIN select val, part from t_pushdown where part = val AND part = 'a'; ----- -logical_plan -01)Filter: t_pushdown.val = t_pushdown.part -02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: val@0 = part@1 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet - -query TT -select val, part from t_pushdown where part = val AND part = 'a'; ----- -a a \ No newline at end of file From f034e4a0e1d710852d27ebffd18bba9a9cdebaf9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 10 Jun 2025 12:56:42 -0500 Subject: [PATCH 05/14] check that filter pushdown happened in fuzz tests --- .../tests/fuzz_cases/topk_filter_pushdown.rs | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 7296502264e68..2568e10dcd89f 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -167,11 +167,17 @@ async fn test_files() -> Vec { (*files).clone() } + +struct RunResult { + results: Vec, + explain_plan: String, +} + async fn run_query_with_config( query: &str, config: SessionConfig, dataset: TestDataSet, -) -> Vec { +) -> RunResult { let store = dataset.store; let schema = dataset.schema; let ctx = SessionContext::new_with_config(config); @@ -191,7 +197,16 @@ async fn run_query_with_config( ctx.register_table("test_table", table).unwrap(); - ctx.sql(query).await.unwrap().collect().await.unwrap() + let results = ctx.sql(query).await.unwrap().collect().await.unwrap(); + let explain_batches = ctx.sql(&format!("EXPLAIN ANALYZE {query}")).await.unwrap(). + collect().await.unwrap(); + let explain_plan = pretty_format_batches(&explain_batches) + .unwrap() + .to_string(); + RunResult { + results, + explain_plan, + } } #[derive(Debug)] @@ -215,6 +230,16 @@ impl RunQueryResult { } } +/// Iterate over each line in the plan and check that one of them has `DataSourceExec` and `DynamicFilterPhysicalExpr` in the same line. +fn has_dynamic_filter_expr_pushdown(plan: &str) -> bool { + for line in plan.lines() { + if line.contains("DataSourceExec") && line.contains("DynamicFilterPhysicalExpr") { + return true; + } + } + false +} + async fn run_query( query: String, cfg: SessionConfig, @@ -231,11 +256,15 @@ async fn run_query( run_query_with_config(&query, cfg_without_dynamic_filters, dataset.clone()).await; let result = run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; + // Check that dynamic filters were actually pushed down + if !has_dynamic_filter_expr_pushdown(&result.explain_plan) { + panic!("Dynamic filter was not pushed down in query: {query}\n\n{}", result.explain_plan); + } RunQueryResult { query: query.to_string(), - result, - expected: expected_result, + result: result.results, + expected: expected_result.results, } } From 24afb59c9d562ac4a39bed28f9f74165fee061be Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 10 Jun 2025 13:10:41 -0500 Subject: [PATCH 06/14] fmt --- .../tests/fuzz_cases/topk_filter_pushdown.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs index 2568e10dcd89f..a5934882cbcc6 100644 --- a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -167,7 +167,6 @@ async fn test_files() -> Vec { (*files).clone() } - struct RunResult { results: Vec, explain_plan: String, @@ -198,11 +197,14 @@ async fn run_query_with_config( ctx.register_table("test_table", table).unwrap(); let results = ctx.sql(query).await.unwrap().collect().await.unwrap(); - let explain_batches = ctx.sql(&format!("EXPLAIN ANALYZE {query}")).await.unwrap(). - collect().await.unwrap(); - let explain_plan = pretty_format_batches(&explain_batches) + let explain_batches = ctx + .sql(&format!("EXPLAIN ANALYZE {query}")) + .await .unwrap() - .to_string(); + .collect() + .await + .unwrap(); + let explain_plan = pretty_format_batches(&explain_batches).unwrap().to_string(); RunResult { results, explain_plan, @@ -258,7 +260,10 @@ async fn run_query( run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; // Check that dynamic filters were actually pushed down if !has_dynamic_filter_expr_pushdown(&result.explain_plan) { - panic!("Dynamic filter was not pushed down in query: {query}\n\n{}", result.explain_plan); + panic!( + "Dynamic filter was not pushed down in query: {query}\n\n{}", + result.explain_plan + ); } RunQueryResult { From e718c76a058ed4c716d2a3cf53002ebfbf302822 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Jun 2025 14:15:42 -0500 Subject: [PATCH 07/14] revert accidental change --- .../test_files/parquet_filter_pushdown.slt | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 1b6ae13fbe771..e5b5f5ac878a9 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -246,3 +246,38 @@ physical_plan 02)--FilterExec: val@0 != part@1 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)] + +# The order of filters should not matter +query TT +EXPLAIN select val, part from t_pushdown where part = 'a' AND part = val; +---- +logical_plan +01)Filter: t_pushdown.val = t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 = part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet + +query TT +select val, part from t_pushdown where part = 'a' AND part = val; +---- +a a + +query TT +EXPLAIN select val, part from t_pushdown where part = val AND part = 'a'; +---- +logical_plan +01)Filter: t_pushdown.val = t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 = part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet + +query TT +select val, part from t_pushdown where part = val AND part = 'a'; +---- +a a \ No newline at end of file From c11957a9adc024b4ae7e7d368b2b55d7eda8c264 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Jun 2025 14:17:03 -0500 Subject: [PATCH 08/14] remove indentation --- datafusion/physical-plan/src/topk/mod.rs | 147 ++++++++++++----------- 1 file changed, 74 insertions(+), 73 deletions(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 929f2873fe93c..a5fff1b84e002 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -261,89 +261,90 @@ impl TopK { let Some(filter) = &self.filter else { return Ok(()); }; - if let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? { - // Create filter expressions for each threshold - let mut filters: Vec> = - Vec::with_capacity(thresholds.len()); - - let mut prev_sort_expr: Option> = None; - for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { - // Create the appropriate operator based on sort order - let op = if sort_expr.options.descending { - // For descending sort, we want col > threshold (exclude smaller values) - Operator::Gt - } else { - // For ascending sort, we want col < threshold (exclude larger values) - Operator::Lt - }; + let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { + return Ok(()); + }; - let value_null = value.is_null(); + // Create filter expressions for each threshold + let mut filters: Vec> = + Vec::with_capacity(thresholds.len()); - let comparison = Arc::new(BinaryExpr::new( - Arc::clone(&sort_expr.expr), - op, - lit(value.clone()), - )); + let mut prev_sort_expr: Option> = None; + for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { + // Create the appropriate operator based on sort order + let op = if sort_expr.options.descending { + // For descending sort, we want col > threshold (exclude smaller values) + Operator::Gt + } else { + // For ascending sort, we want col < threshold (exclude larger values) + Operator::Lt + }; - let comparison_with_null = - match (sort_expr.options.nulls_first, value_null) { - // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) - (true, true) => lit(false), - (true, false) => Arc::new(BinaryExpr::new( - is_null(Arc::clone(&sort_expr.expr))?, - Operator::Or, - comparison, - )), - // For nulls last, transform to (threshold.value is null and threshold.expr is not null) - // or (threshold.value is not null and comparison) - (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?, - (false, false) => comparison, - }; - - let mut eq_expr = Arc::new(BinaryExpr::new( - Arc::clone(&sort_expr.expr), - Operator::Eq, - lit(value.clone()), + let value_null = value.is_null(); + + let comparison = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + op, + lit(value.clone()), + )); + + let comparison_with_null = match (sort_expr.options.nulls_first, value_null) { + // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) + (true, true) => lit(false), + (true, false) => Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + comparison, + )), + // For nulls last, transform to (threshold.value is null and threshold.expr is not null) + // or (threshold.value is not null and comparison) + (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?, + (false, false) => comparison, + }; + + let mut eq_expr = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + Operator::Eq, + lit(value.clone()), + )); + + if value_null { + eq_expr = Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + eq_expr, )); + } - if value_null { - eq_expr = Arc::new(BinaryExpr::new( - is_null(Arc::clone(&sort_expr.expr))?, - Operator::Or, - eq_expr, - )); + // For a query like order by a, b, the filter for column `b` is only applied if + // the condition a = threshold.value (considering null equality) is met. + // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, + // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. + match prev_sort_expr.take() { + None => { + prev_sort_expr = Some(eq_expr); + filters.push(comparison_with_null); } - - // For a query like order by a, b, the filter for column `b` is only applied if - // the condition a = threshold.value (considering null equality) is met. - // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, - // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. - match prev_sort_expr.take() { - None => { - prev_sort_expr = Some(eq_expr); - filters.push(comparison_with_null); - } - Some(p) => { - filters.push(Arc::new(BinaryExpr::new( - Arc::clone(&p), - Operator::And, - comparison_with_null, - ))); - - prev_sort_expr = - Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); - } + Some(p) => { + filters.push(Arc::new(BinaryExpr::new( + Arc::clone(&p), + Operator::And, + comparison_with_null, + ))); + + prev_sort_expr = + Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); } } + } - let dynamic_predicate = filters - .into_iter() - .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); + let dynamic_predicate = filters + .into_iter() + .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); - if let Some(predicate) = dynamic_predicate { - if !predicate.eq(&lit(true)) { - filter.update(predicate)?; - } + if let Some(predicate) = dynamic_predicate { + if !predicate.eq(&lit(true)) { + filter.update(predicate)?; } } From 4152b5e84f793976e975835523f3290a26c28a9a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Jun 2025 14:18:19 -0500 Subject: [PATCH 09/14] remove outdated comment --- .../core/tests/physical_optimizer/filter_pushdown/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 7087942ab3153..80c9199cf5614 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -361,9 +361,6 @@ fn test_node_handles_child_pushdown_result() { #[tokio::test] async fn test_topk_dynamic_filter_pushdown() { - // This test is a bit of a hack, but it shows that we can push down dynamic filters - // into the DataSourceExec. The test is not perfect because we don't have a real - // implementation of the dynamic filter yet, so we just use a static filter. let batches = vec![ record_batch!( ("a", Utf8, ["aa", "ab"]), From 0e86c22b619894ae0d1fda7fe40e3199af2c870b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Jun 2025 14:26:34 -0500 Subject: [PATCH 10/14] add example to docstring --- datafusion/physical-plan/src/topk/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index a5fff1b84e002..2da2ea8707610 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -256,7 +256,16 @@ impl TopK { Ok(()) } - /// Update the filter representation of our TopK heap + /// Update the filter representation of our TopK heap. + /// For example, given the sort expression `ORDER BY a DESC, b ASC LIMIT 3`, + /// and the current heap values `[(1, 5), (1, 4), (2, 3)]`, + /// the filter will be updated to: + /// + /// ``` + /// (a > 1 OR (a = 1 AND b < 5)) AND + /// (a > 1 OR (a = 1 AND b < 4)) AND + /// (a > 2 OR (a = 2 AND b < 3)) + /// ``` fn update_filter(&mut self) -> Result<()> { let Some(filter) = &self.filter else { return Ok(()); From a6b56f1779a6c45e51192ffffc44a6a7c42413cd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Jun 2025 16:43:39 -0500 Subject: [PATCH 11/14] fix doctest --- datafusion/physical-plan/src/topk/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 2da2ea8707610..2accd6726c267 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -261,7 +261,7 @@ impl TopK { /// and the current heap values `[(1, 5), (1, 4), (2, 3)]`, /// the filter will be updated to: /// - /// ``` + /// ```sql /// (a > 1 OR (a = 1 AND b < 5)) AND /// (a > 1 OR (a = 1 AND b < 4)) AND /// (a > 2 OR (a = 2 AND b < 3)) From 4cf574bb5a41d4ca36c47187cba554d32257dbce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Jun 2025 11:36:30 +0200 Subject: [PATCH 12/14] Filter TopK inputs based on dynamic topk filter --- datafusion/physical-plan/src/topk/mod.rs | 102 ++++++++++++++++------- 1 file changed, 73 insertions(+), 29 deletions(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 2accd6726c267..8aadb334b5e82 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -18,8 +18,8 @@ //! TopK: Combination of Sort / LIMIT use arrow::{ - array::Array, - compute::interleave_record_batch, + array::{Array, AsArray}, + compute::{interleave_record_batch, FilterBuilder}, row::{RowConverter, Rows, SortField}, }; use datafusion_expr::{ColumnarValue, Operator}; @@ -203,7 +203,7 @@ impl TopK { let baseline = self.metrics.baseline.clone(); let _timer = baseline.elapsed_compute().timer(); - let sort_keys: Vec = self + let mut sort_keys: Vec = self .expr .iter() .map(|expr| { @@ -212,43 +212,65 @@ impl TopK { }) .collect::>>()?; + let mut selected_rows = None; + + match self.filter.as_ref() { + // If a filter is provided, update it with the new rows + Some(filter) => { + let filter = filter.current()?; + let filtered = filter.evaluate(&batch)?; + let num_rows = batch.num_rows(); + let array = filtered.into_array(num_rows)?; + let filter = array.as_boolean().clone(); + if filter.true_count() == 0 { + // nothing to filter, so no need to update + return Ok(()); + } + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + } + None => {} + } // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows; rows.clear(); self.row_converter.append(rows, &sort_keys)?; - // TODO make this algorithmically better?: - // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) - // this avoids some work and also might be better vectorizable. let mut batch_entry = self.heap.register_batch(batch.clone()); - let mut updated = false; - for (index, row) in rows.iter().enumerate() { - match self.heap.max() { - // heap has k items, and the new row is greater than the - // current max in the heap ==> it is not a new topk - Some(max_row) if row.as_ref() >= max_row.row() => {} - // don't yet have k items or new item is lower than the currently k low values - None | Some(_) => { - self.heap.add(&mut batch_entry, row, index); - self.metrics.row_replacements.add(1); - updated = true; - } + + let replacements = match selected_rows { + Some(filter) => { + self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry) } - } - self.heap.insert_batch_entry(batch_entry); + None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry), + }; + + if replacements > 0 { + self.metrics.row_replacements.add(replacements); - // conserve memory - self.heap.maybe_compact()?; + self.heap.insert_batch_entry(batch_entry); - // update memory reservation - self.reservation.try_resize(self.size())?; + // conserve memory + self.heap.maybe_compact()?; - // flag the topK as finished if we know that all - // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, - // which means the top K won't change and the computation can be finished early. - self.attempt_early_completion(&batch)?; + // update memory reservation + self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, + // which means the top K won't change and the computation can be finished early. + self.attempt_early_completion(&batch)?; - if updated { // update the filter representation of our TopK heap self.update_filter()?; } @@ -256,6 +278,28 @@ impl TopK { Ok(()) } + fn find_new_topk_items( + &mut self, + items: impl Iterator, + batch_entry: &mut RecordBatchEntry, + ) -> usize { + let mut replacements = 0; + let rows = &mut self.scratch_rows; + for (index, row) in items.zip(rows.iter()) { + match self.heap.max() { + // heap has k items, and the new row is greater than the + // current max in the heap ==> it is not a new topk + Some(max_row) if row.as_ref() >= max_row.row() => {} + // don't yet have k items or new item is lower than the currently k low values + None | Some(_) => { + self.heap.add(batch_entry, row, index); + replacements += 1; + } + } + } + replacements + } + /// Update the filter representation of our TopK heap. /// For example, given the sort expression `ORDER BY a DESC, b ASC LIMIT 3`, /// and the current heap values `[(1, 5), (1, 4), (2, 3)]`, From 6d39884faa497a3d934eeabc9cec2c83e900e0a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Jun 2025 11:54:34 +0200 Subject: [PATCH 13/14] Clippy --- datafusion/physical-plan/src/topk/mod.rs | 47 +++++++++++------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 8aadb334b5e82..d5c1ee950142a 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -214,33 +214,30 @@ impl TopK { let mut selected_rows = None; - match self.filter.as_ref() { + if let Some(filter) = self.filter.as_ref() { // If a filter is provided, update it with the new rows - Some(filter) => { - let filter = filter.current()?; - let filtered = filter.evaluate(&batch)?; - let num_rows = batch.num_rows(); - let array = filtered.into_array(num_rows)?; - let filter = array.as_boolean().clone(); - if filter.true_count() == 0 { - // nothing to filter, so no need to update - return Ok(()); - } - let filter_predicate = FilterBuilder::new(&filter); - let filter_predicate = if sort_keys.len() > 1 { - // Optimize filter when it has multiple sort keys - filter_predicate.optimize().build() - } else { - filter_predicate.build() - }; - selected_rows = Some(filter); - sort_keys = sort_keys - .iter() - .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) - .collect::>>()?; + let filter = filter.current()?; + let filtered = filter.evaluate(&batch)?; + let num_rows = batch.num_rows(); + let array = filtered.into_array(num_rows)?; + let filter = array.as_boolean().clone(); + if filter.true_count() == 0 { + // nothing to filter, so no need to update + return Ok(()); } - None => {} - } + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + }; // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows; rows.clear(); From 805aaefdf52e52f65dfe34509ca5f5086dad1dca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Jun 2025 12:48:17 +0200 Subject: [PATCH 14/14] Fast path --- datafusion/physical-plan/src/topk/mod.rs | 30 ++++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index d5c1ee950142a..0d97c11b2b7cb 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -221,22 +221,26 @@ impl TopK { let num_rows = batch.num_rows(); let array = filtered.into_array(num_rows)?; let filter = array.as_boolean().clone(); - if filter.true_count() == 0 { + let true_count = filter.true_count(); + if true_count == 0 { // nothing to filter, so no need to update return Ok(()); } - let filter_predicate = FilterBuilder::new(&filter); - let filter_predicate = if sort_keys.len() > 1 { - // Optimize filter when it has multiple sort keys - filter_predicate.optimize().build() - } else { - filter_predicate.build() - }; - selected_rows = Some(filter); - sort_keys = sort_keys - .iter() - .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) - .collect::>>()?; + // only update the keys / rows if the filter does not match all rows + if true_count < num_rows { + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + } }; // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows;