diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 4efb67a37c991..8ab7f06dc7b5b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -614,6 +614,13 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. + /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer + /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. + /// This means that if we already have 10 timestamps in the year 2025 + /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. + pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index 8ccc2a5bc1310..9e01621c02574 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -21,6 +21,7 @@ mod join_fuzz; mod merge_fuzz; mod sort_fuzz; mod sort_query_fuzz; +mod topk_filter_pushdown; mod aggregation_fuzzer; mod equivalence; diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs new file mode 100644 index 0000000000000..a5934882cbcc6 --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; + +use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder}; +use arrow::datatypes::Int32Type; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::object_store::ObjectStoreUrl; +use itertools::Itertools; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; +use parquet::arrow::ArrowWriter; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; + +#[derive(Clone)] +struct TestDataSet { + store: Arc, + schema: Arc, +} + +/// List of in memory parquet files with UTF8 data +// Use a mutex rather than LazyLock to allow for async initialization +static TESTFILES: LazyLock>> = + LazyLock::new(|| Mutex::new(vec![])); + +async fn test_files() -> Vec { + let files_mutex = &TESTFILES; + let mut files = files_mutex.lock().await; + if !files.is_empty() { + return (*files).clone(); + } + + let mut rng = StdRng::seed_from_u64(0); + + for nulls_in_ids in [false, true] { + for nulls_in_names in [false, true] { + for nulls_in_departments in [false, true] { + let store = Arc::new(InMemory::new()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, nulls_in_ids), + Field::new("name", DataType::Utf8, nulls_in_names), + Field::new( + "department", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + nulls_in_departments, + ), + ])); + + let name_choices = if nulls_in_names { + [Some("Alice"), Some("Bob"), None, Some("David"), None] + } else { + [ + Some("Alice"), + Some("Bob"), + Some("Charlie"), + Some("David"), + Some("Eve"), + ] + }; + + let department_choices = if nulls_in_departments { + [ + Some("Theater"), + Some("Engineering"), + None, + Some("Arts"), + None, + ] + } else { + [ + Some("Theater"), + Some("Engineering"), + Some("Healthcare"), + Some("Arts"), + Some("Music"), + ] + }; + + // Generate 5 files, some with overlapping or repeated ids some without + for i in 0..5 { + let num_batches = rng.random_range(1..3); + let mut batches = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + let num_rows = 25; + let ids = Int32Array::from_iter((0..num_rows).map(|file| { + if nulls_in_ids { + if rng.random_bool(1.0 / 10.0) { + None + } else { + Some(rng.random_range(file..file + 5)) + } + } else { + Some(rng.random_range(file..file + 5)) + } + })); + let names = StringArray::from_iter((0..num_rows).map(|_| { + // randomly select a name + let idx = rng.random_range(0..name_choices.len()); + name_choices[idx].map(|s| s.to_string()) + })); + let mut departments = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + // randomly select a department + let idx = rng.random_range(0..department_choices.len()); + departments.append_option(department_choices[idx].as_ref()); + } + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(ids), + Arc::new(names), + Arc::new(departments.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + let mut buf = vec![]; + { + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + } + writer.flush().unwrap(); + writer.finish().unwrap(); + } + let payload = PutPayload::from(buf); + let path = Path::from(format!("file_{i}.parquet")); + store.put(&path, payload).await.unwrap(); + } + files.push(TestDataSet { store, schema }); + } + } + } + (*files).clone() +} + +struct RunResult { + results: Vec, + explain_plan: String, +} + +async fn run_query_with_config( + query: &str, + config: SessionConfig, + dataset: TestDataSet, +) -> RunResult { + let store = dataset.store; + let schema = dataset.schema; + let ctx = SessionContext::new_with_config(config); + let url = ObjectStoreUrl::parse("memory://").unwrap(); + ctx.register_object_store(url.as_ref(), store.clone()); + + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let options = ListingOptions::new(format); + let table_path = ListingTableUrl::parse("memory:///").unwrap(); + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .with_schema(schema); + let table = Arc::new(ListingTable::try_new(config).unwrap()); + + ctx.register_table("test_table", table).unwrap(); + + let results = ctx.sql(query).await.unwrap().collect().await.unwrap(); + let explain_batches = ctx + .sql(&format!("EXPLAIN ANALYZE {query}")) + .await + .unwrap() + .collect() + .await + .unwrap(); + let explain_plan = pretty_format_batches(&explain_batches).unwrap().to_string(); + RunResult { + results, + explain_plan, + } +} + +#[derive(Debug)] +struct RunQueryResult { + query: String, + result: Vec, + expected: Vec, +} + +impl RunQueryResult { + fn expected_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.expected).unwrap()) + } + + fn result_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.result).unwrap()) + } + + fn is_ok(&self) -> bool { + self.expected_formated() == self.result_formated() + } +} + +/// Iterate over each line in the plan and check that one of them has `DataSourceExec` and `DynamicFilterPhysicalExpr` in the same line. +fn has_dynamic_filter_expr_pushdown(plan: &str) -> bool { + for line in plan.lines() { + if line.contains("DataSourceExec") && line.contains("DynamicFilterPhysicalExpr") { + return true; + } + } + false +} + +async fn run_query( + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +) -> RunQueryResult { + let cfg_with_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let cfg_without_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", false); + + let expected_result = + run_query_with_config(&query, cfg_without_dynamic_filters, dataset.clone()).await; + let result = + run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; + // Check that dynamic filters were actually pushed down + if !has_dynamic_filter_expr_pushdown(&result.explain_plan) { + panic!( + "Dynamic filter was not pushed down in query: {query}\n\n{}", + result.explain_plan + ); + } + + RunQueryResult { + query: query.to_string(), + result: result.results, + expected: expected_result.results, + } +} + +struct TestCase { + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuzz_topk_filter_pushdown() { + let order_columns = ["id", "name", "department"]; + let order_directions = ["ASC", "DESC"]; + let null_orders = ["NULLS FIRST", "NULLS LAST"]; + + let start = datafusion_common::instant::Instant::now(); + let mut orders: HashMap> = HashMap::new(); + for order_column in &order_columns { + for order_direction in &order_directions { + for null_order in &null_orders { + // if there is a vec for this column insert the order, otherwise create a new vec + let ordering = format!("{order_column} {order_direction} {null_order}"); + match orders.get_mut(*order_column) { + Some(order_vec) => { + order_vec.push(ordering); + } + None => { + orders.insert(order_column.to_string(), vec![ordering]); + } + } + } + } + } + + let mut queries = vec![]; + + for limit in [1, 10] { + for num_order_by_columns in [1, 2, 3] { + for order_columns in ["id", "name", "department"] + .iter() + .combinations(num_order_by_columns) + { + for orderings in order_columns + .iter() + .map(|col| orders.get(**col).unwrap()) + .multi_cartesian_product() + { + let query = format!( + "SELECT * FROM test_table ORDER BY {} LIMIT {}", + orderings.into_iter().join(", "), + limit + ); + queries.push(query); + } + } + } + } + + queries.sort_unstable(); + println!( + "Generated {} queries in {:?}", + queries.len(), + start.elapsed() + ); + + let start = datafusion_common::instant::Instant::now(); + let datasets = test_files().await; + println!("Generated test files in {:?}", start.elapsed()); + + let mut test_cases = vec![]; + for enable_filter_pushdown in [true, false] { + for query in &queries { + for dataset in &datasets { + let mut cfg = SessionConfig::new(); + cfg = cfg.set_bool( + "datafusion.optimizer.enable_dynamic_filter_pushdown", + enable_filter_pushdown, + ); + test_cases.push(TestCase { + query: query.to_string(), + cfg, + dataset: dataset.clone(), + }); + } + } + } + + let start = datafusion_common::instant::Instant::now(); + let mut join_set = JoinSet::new(); + for tc in test_cases { + join_set.spawn(run_query(tc.query, tc.cfg, tc.dataset)); + } + let mut results = join_set.join_all().await; + results.sort_unstable_by(|a, b| a.query.cmp(&b.query)); + println!("Ran {} test cases in {:?}", results.len(), start.elapsed()); + + let failures = results + .iter() + .filter(|result| !result.is_ok()) + .collect::>(); + + for failure in &failures { + println!("Failure:"); + println!("Query:\n{}", failure.query); + println!("\nExpected:\n{}", failure.expected_formated()); + println!("\nResult:\n{}", failure.result_formated()); + println!("\n\n"); + } + + if !failures.is_empty() { + panic!("Some test cases failed"); + } else { + println!("All test cases passed"); + } +} diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index a28933d97bcd1..80c9199cf5614 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -17,28 +17,41 @@ use std::sync::{Arc, LazyLock}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::{ + array::record_batch, + datatypes::{DataType, Field, Schema, SchemaRef}, + util::pretty::pretty_format_batches, +}; +use arrow_schema::SortOptions; use datafusion::{ logical_expr::Operator, physical_plan::{ expressions::{BinaryExpr, Column, Literal}, PhysicalExpr, }, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, scalar::ScalarValue, }; use datafusion_common::config::ConfigOptions; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; -use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::{ + filter_pushdown::FilterPushdown, PhysicalOptimizerRule, +}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, filter::FilterExec, repartition::RepartitionExec, + sorts::sort::SortExec, + ExecutionPlan, }; -use util::{OptimizationTest, TestNode, TestScanBuilder}; +use futures::StreamExt; +use object_store::{memory::InMemory, ObjectStore}; +use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder}; mod util; @@ -346,6 +359,134 @@ fn test_node_handles_child_pushdown_result() { ); } +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown() { + let batches = vec![ + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["bd", "bc"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + record_batch!( + ("a", Utf8, ["ac", "ad"]), + ("b", Utf8, ["bb", "ba"]), + ("c", Float64, [2.0, 1.0]) + ) + .unwrap(), + ]; + let scan = TestScanBuilder::new(schema()) + .with_support(true) + .with_batches(batches) + .build(); + let plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + col("b", &schema()).unwrap(), + SortOptions::new(true, false), // descending, nulls_first + )]) + .unwrap(), + Arc::clone(&scan), + ) + .with_fetch(Some(1)), + ) as Arc; + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown{}, true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown::new().optimize(plan, &config).unwrap(); + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + stream.next().await.unwrap().unwrap(); + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false], filter=[b@1 > bd] + - DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 > bd ] + " + ); +} + +/// Integration test for dynamic filter pushdown with TopK. +/// We use an integration test because there are complex interactions in the optimizer rules +/// that the unit tests applying a single optimizer rule do not cover. +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown_integration() { + let store = Arc::new(InMemory::new()) as Arc; + let mut cfg = SessionConfig::new(); + cfg.options_mut().execution.parquet.pushdown_filters = true; + cfg.options_mut().execution.parquet.max_row_group_size = 128; + let ctx = SessionContext::new_with_config(cfg); + ctx.register_object_store( + ObjectStoreUrl::parse("memory://").unwrap().as_ref(), + Arc::clone(&store), + ); + ctx.sql( + r" +COPY ( + SELECT 1372708800 + value AS t + FROM generate_series(0, 99999) + ORDER BY t + ) TO 'memory:///1.parquet' +STORED AS PARQUET; + ", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Register the file with the context + ctx.register_parquet( + "topk_pushdown", + "memory:///1.parquet", + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + // Create a TopK query that will use dynamic filter pushdown + let df = ctx + .sql(r"EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t LIMIT 10;") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let explain = format!("{}", pretty_format_batches(&batches).unwrap()); + + assert!(explain.contains("output_rows=128")); // Read 1 row group + assert!(explain.contains("t@0 < 1372708809")); // Dynamic filter was applied + assert!( + explain.contains("pushdown_rows_matched=128, pushdown_rows_pruned=99872"), + "{explain}" + ); + // Pushdown pruned most rows +} + /// Schema: /// a: String /// b: String diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 958dd8f59e933..1d4fd110ef0d6 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -271,6 +271,11 @@ impl TestScanBuilder { self } + pub fn with_batches(mut self, batches: Vec) -> Self { + self.batches = batches; + self + } + pub fn build(self) -> Arc { let source = Arc::new(TestSource::new(self.support, self.batches)); let base_config = FileScanConfigBuilder::new( @@ -426,6 +431,15 @@ fn format_lines(s: &str) -> Vec { s.trim().split('\n').map(|s| s.to_string()).collect() } +pub fn format_plan_for_test(plan: &Arc) -> String { + let mut out = String::new(); + for line in format_execution_plan(plan) { + out.push_str(&format!(" - {line}\n")); + } + out.push('\n'); + out +} + #[derive(Debug)] pub(crate) struct TestNode { inject_filter: bool, diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 756fb638af2b5..09a8da61cdfa5 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -288,8 +288,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } fn snapshot(&self) -> Result>> { + let inner = self.current()?; // Return the current expression as a snapshot. - Ok(Some(self.current()?)) + Ok(Some(inner)) } } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index d77207fbbcd76..8f46133ed0bbb 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,6 +43,7 @@ pub use case::{case, CaseExpr}; pub use cast::{cast, CastExpr}; pub use column::{col, with_new_schema, Column}; pub use datafusion_expr::utils::format_state_name; +pub use dynamic_filters::DynamicFilterPhysicalExpr; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 478ce39eecb9e..bf08e4a9c243c 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1308,6 +1308,7 @@ pub fn ensure_distribution( .downcast_ref::() .map(|output| output.fetch()) .unwrap_or(None), + None, )?; } } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 8a71b28486a2a..78360ab30fdee 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -57,6 +57,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -403,7 +404,7 @@ pub fn parallelize_sorts( && requirements.plan.output_partitioning().partition_count() <= 1 { // Take the initial sort expressions and requirements - let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; + let (sort_exprs, fetch, filter) = get_sort_exprs(&requirements.plan)?; let sort_reqs = LexRequirement::from(sort_exprs.clone()); let sort_exprs = sort_exprs.clone(); @@ -417,7 +418,7 @@ pub fn parallelize_sorts( // deals with the children and their children and so on. requirements = requirements.children.swap_remove(0); - requirements = add_sort_above_with_check(requirements, sort_reqs, fetch)?; + requirements = add_sort_above_with_check(requirements, sort_reqs, fetch, filter)?; let spm = SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan)); @@ -513,6 +514,7 @@ pub fn ensure_sorting( .downcast_ref::() .map(|output| output.fetch()) .unwrap_or(None), + None, ); child = update_sort_ctx_children_data(child, true)?; } @@ -644,7 +646,7 @@ fn adjust_window_sort_removal( // Satisfy the ordering requirement so that the window can run: let mut child_node = window_tree.children.swap_remove(0); if let Some(reqs) = reqs { - child_node = add_sort_above(child_node, reqs.into_single(), None); + child_node = add_sort_above(child_node, reqs.into_single(), None, None); } let child_plan = Arc::clone(&child_node.plan); window_tree.children.push(child_node); @@ -803,15 +805,20 @@ fn remove_corresponding_sort_from_sub_plan( Ok(node) } +/// Return type for get_sort_exprs function +type SortExprsResult<'a> = ( + &'a LexOrdering, + Option, + Option>, +); + /// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible. -fn get_sort_exprs( - sort_any: &Arc, -) -> Result<(&LexOrdering, Option)> { +fn get_sort_exprs(sort_any: &Arc) -> Result> { if let Some(sort_exec) = sort_any.as_any().downcast_ref::() { - Ok((sort_exec.expr(), sort_exec.fetch())) + Ok((sort_exec.expr(), sort_exec.fetch(), sort_exec.filter())) } else if let Some(spm) = sort_any.as_any().downcast_ref::() { - Ok((spm.expr(), spm.fetch())) + Ok((spm.expr(), spm.fetch(), None)) } else { plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec") } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index bd7b0060c3be3..1fe1ed3e21b10 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{internal_err, HashSet, JoinSide, Result}; use datafusion_expr::JoinType; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ add_offset_to_physical_sort_exprs, EquivalenceProperties, @@ -57,6 +57,7 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; pub struct ParentRequirements { ordering_requirement: Option, fetch: Option, + filter: Option>, } pub type SortPushDown = PlanContext; @@ -70,6 +71,8 @@ pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) { // If the parent has a fetch value, assign it to the children // Or use the fetch value of the child. fetch: child.plan.fetch(), + // If the parent has a filter, assign it to the children + filter: sort_push_down.data.filter.clone(), }; } } @@ -95,6 +98,7 @@ fn pushdown_sorts_helper( ) -> Result> { let plan = sort_push_down.plan; let parent_fetch = sort_push_down.data.fetch; + let parent_filter = sort_push_down.data.filter.clone(); let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone() else { @@ -114,6 +118,18 @@ fn pushdown_sorts_helper( sort_push_down.data.fetch = fetch; sort_push_down.data.ordering_requirement = Some(OrderingRequirements::from(sort_ordering)); + let filter = plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); + match filter { + Some(filter) => { + sort_push_down.data.filter = Some(filter); + } + None => { + sort_push_down.data.filter = parent_filter.clone(); + } + } // Recursive call to helper, so it doesn't transform_down and miss // the new node (previous child of sort): return pushdown_sorts_helper(sort_push_down); @@ -131,11 +147,20 @@ fn pushdown_sorts_helper( return internal_err!("SortExec should have output ordering"); }; + let filter = plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); + let sort_fetch = plan.fetch(); let parent_is_stricter = eqp.requirements_compatible( parent_requirement.first().clone(), sort_ordering.clone().into(), ); + let sort_filter = plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); // Remove the current sort as we are either going to prove that it is // unnecessary, or replace it with a stricter sort. @@ -152,17 +177,27 @@ fn pushdown_sorts_helper( sort_push_down, parent_requirement.into_single(), parent_fetch, + filter.clone(), ); // Update pushdown requirements: sort_push_down.children[0].data = ParentRequirements { ordering_requirement: Some(OrderingRequirements::from(sort_ordering)), fetch: sort_fetch, + filter, }; return Ok(Transformed::yes(sort_push_down)); } else { // Sort was unnecessary, just propagate the stricter fetch and // ordering requirements: sort_push_down.data.fetch = min_fetch(sort_fetch, parent_fetch); + match sort_filter { + Some(filter) => { + sort_push_down.data.filter = Some(filter); + } + None => { + sort_push_down.data.filter = parent_filter.clone(); + } + } let current_is_stricter = eqp.requirements_compatible( sort_ordering.clone().into(), parent_requirement.first().clone(), @@ -194,9 +229,22 @@ fn pushdown_sorts_helper( // For operators that can take a sort pushdown, continue with updated // requirements: let current_fetch = sort_push_down.plan.fetch(); + let current_filter = sort_push_down + .plan + .as_any() + .downcast_ref::() + .and_then(|s| s.filter().clone()); for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) { child.data.ordering_requirement = order; child.data.fetch = min_fetch(current_fetch, parent_fetch); + match current_filter { + Some(ref filter) => { + child.data.filter = Some(Arc::clone(filter)); + } + None => { + child.data.filter = parent_filter.clone(); + } + } } sort_push_down.data.ordering_requirement = None; } else { @@ -205,6 +253,7 @@ fn pushdown_sorts_helper( sort_push_down, parent_requirement.into_single(), parent_fetch, + parent_filter, ); assign_initial_requirements(&mut sort_push_down); } diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs b/datafusion/physical-optimizer/src/topk_aggregation.rs index bff0b1e49684f..33166284774be 100644 --- a/datafusion/physical-optimizer/src/topk_aggregation.rs +++ b/datafusion/physical-optimizer/src/topk_aggregation.rs @@ -130,10 +130,13 @@ impl TopKAggregation { Ok(Transformed::no(plan)) }; let child = Arc::clone(child).transform_down(closure).data().ok()?; - let sort = SortExec::new(sort.expr().clone(), child) + let mut new_sort = SortExec::new(sort.expr().clone(), child) .with_fetch(sort.fetch()) .with_preserve_partitioning(sort.preserve_partitioning()); - Some(Arc::new(sort)) + if let Some(filter) = sort.filter() { + new_sort = new_sort.with_filter(Arc::clone(&filter)); + } + Some(Arc::new(new_sort)) } } diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 3655e555a7440..431925033678b 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use datafusion_common::Result; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -39,6 +40,7 @@ pub fn add_sort_above( node: PlanContext, sort_requirements: LexRequirement, fetch: Option, + filter: Option>, ) -> PlanContext { let mut sort_reqs: Vec<_> = sort_requirements.into(); sort_reqs.retain(|sort_expr| { @@ -55,6 +57,9 @@ pub fn add_sort_above( if node.plan.output_partitioning().partition_count() > 1 { new_sort = new_sort.with_preserve_partitioning(true); } + if let Some(filter) = filter { + new_sort = new_sort.with_filter(filter); + } PlanContext::new(Arc::new(new_sort), T::default(), vec![node]) } @@ -65,13 +70,14 @@ pub fn add_sort_above_with_check( node: PlanContext, sort_requirements: LexRequirement, fetch: Option, + filter: Option>, ) -> Result> { if !node .plan .equivalence_properties() .ordering_satisfy_requirement(sort_requirements.clone())? { - Ok(add_sort_above(node, sort_requirements, fetch)) + Ok(add_sort_above(node, sort_requirements, fetch, filter)) } else { Ok(node) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c31f17291e197..1ffaf7f1641b8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; +use crate::filter_pushdown::FilterDescription; use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, @@ -52,7 +53,9 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -843,6 +846,8 @@ pub struct SortExec { common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Filter matching the state of the sort for dynamic filter pushdown + filter: Option>, } impl SortExec { @@ -861,6 +866,7 @@ impl SortExec { fetch: None, common_sort_prefix: sort_prefix, cache, + filter: None, } } @@ -906,6 +912,17 @@ impl SortExec { if fetch.is_some() && is_pipeline_friendly { cache = cache.with_boundedness(Boundedness::Bounded); } + let filter = fetch.is_some().then(|| { + // If we already have a filter, keep it. Otherwise, create a new one. + self.filter.clone().unwrap_or_else(|| { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + }) + }); SortExec { input: Arc::clone(&self.input), expr: self.expr.clone(), @@ -914,9 +931,15 @@ impl SortExec { common_sort_prefix: self.common_sort_prefix.clone(), fetch, cache, + filter, } } + pub fn with_filter(mut self, filter: Arc) -> Self { + self.filter = Some(filter); + self + } + /// Input schema pub fn input(&self) -> &Arc { &self.input @@ -932,6 +955,11 @@ impl SortExec { self.fetch } + /// If `Some(filter)`, returns the filter expression that matches the state of the sort. + pub fn filter(&self) -> Option> { + self.filter.clone() + } + fn output_partitioning_helper( input: &Arc, preserve_partitioning: bool, @@ -1009,6 +1037,13 @@ impl DisplayAs for SortExec { match self.fetch { Some(fetch) => { write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?; + if let Some(filter) = &self.filter { + if let Ok(current) = filter.current() { + if !current.eq(&lit(true)) { + write!(f, ", filter=[{current}]")?; + } + } + } if !self.common_sort_prefix.is_empty() { write!(f, ", sort_prefix=[")?; let mut first = true; @@ -1079,9 +1114,10 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) + let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch) .with_preserve_partitioning(self.preserve_partitioning); + new_sort.filter = self.filter.clone(); Ok(Arc::new(new_sort)) } @@ -1122,6 +1158,7 @@ impl ExecutionPlan for SortExec { context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, + self.filter.clone(), )?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -1228,6 +1265,23 @@ impl ExecutionPlan for SortExec { .with_preserve_partitioning(self.preserve_partitioning()), ))) } + + fn gather_filters_for_pushdown( + &self, + parent_filters: Vec>, + config: &datafusion_common::config::ConfigOptions, + ) -> Result { + if let Some(filter) = &self.filter { + if config.optimizer.enable_dynamic_filter_pushdown { + let filter = Arc::clone(filter) as Arc; + return Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(filter)); + } + } + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index d8b6f0e400b83..0d97c11b2b7cb 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -17,6 +17,12 @@ //! TopK: Combination of Sort / LIMIT +use arrow::{ + array::{Array, AsArray}, + compute::{interleave_record_batch, FilterBuilder}, + row::{RowConverter, Rows, SortField}, +}; +use datafusion_expr::{ColumnarValue, Operator}; use std::mem::size_of; use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; @@ -25,14 +31,18 @@ use crate::spill::get_record_batch_memory_size; use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{ArrayRef, RecordBatch}; -use arrow::compute::interleave_record_batch; use arrow::datatypes::SchemaRef; -use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_common::{internal_datafusion_err, HashMap, Result}; +use datafusion_common::{ + internal_datafusion_err, internal_err, HashMap, Result, ScalarValue, +}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; +use datafusion_physical_expr::{ + expressions::{is_not_null, is_null, lit, BinaryExpr, DynamicFilterPhysicalExpr}, + PhysicalExpr, +}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; /// Global TopK @@ -110,6 +120,8 @@ pub struct TopK { common_sort_prefix_converter: Option, /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, + /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown + filter: Option>, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. @@ -148,6 +160,7 @@ impl TopK { batch_size: usize, runtime: Arc, metrics: &ExecutionPlanMetricsSet, + filter: Option>, ) -> Result { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); @@ -179,6 +192,7 @@ impl TopK { common_sort_prefix_converter: prefix_row_converter, common_sort_prefix: Arc::from(common_sort_prefix), finished: false, + filter, }) } @@ -189,7 +203,7 @@ impl TopK { let baseline = self.metrics.baseline.clone(); let _timer = baseline.elapsed_compute().timer(); - let sort_keys: Vec = self + let mut sort_keys: Vec = self .expr .iter() .map(|expr| { @@ -198,39 +212,195 @@ impl TopK { }) .collect::>>()?; + let mut selected_rows = None; + + if let Some(filter) = self.filter.as_ref() { + // If a filter is provided, update it with the new rows + let filter = filter.current()?; + let filtered = filter.evaluate(&batch)?; + let num_rows = batch.num_rows(); + let array = filtered.into_array(num_rows)?; + let filter = array.as_boolean().clone(); + let true_count = filter.true_count(); + if true_count == 0 { + // nothing to filter, so no need to update + return Ok(()); + } + // only update the keys / rows if the filter does not match all rows + if true_count < num_rows { + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + } + }; // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows; rows.clear(); self.row_converter.append(rows, &sort_keys)?; - // TODO make this algorithmically better?: - // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) - // this avoids some work and also might be better vectorizable. let mut batch_entry = self.heap.register_batch(batch.clone()); - for (index, row) in rows.iter().enumerate() { + + let replacements = match selected_rows { + Some(filter) => { + self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry) + } + None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry), + }; + + if replacements > 0 { + self.metrics.row_replacements.add(replacements); + + self.heap.insert_batch_entry(batch_entry); + + // conserve memory + self.heap.maybe_compact()?; + + // update memory reservation + self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, + // which means the top K won't change and the computation can be finished early. + self.attempt_early_completion(&batch)?; + + // update the filter representation of our TopK heap + self.update_filter()?; + } + + Ok(()) + } + + fn find_new_topk_items( + &mut self, + items: impl Iterator, + batch_entry: &mut RecordBatchEntry, + ) -> usize { + let mut replacements = 0; + let rows = &mut self.scratch_rows; + for (index, row) in items.zip(rows.iter()) { match self.heap.max() { // heap has k items, and the new row is greater than the // current max in the heap ==> it is not a new topk Some(max_row) if row.as_ref() >= max_row.row() => {} // don't yet have k items or new item is lower than the currently k low values None | Some(_) => { - self.heap.add(&mut batch_entry, row, index); - self.metrics.row_replacements.add(1); + self.heap.add(batch_entry, row, index); + replacements += 1; } } } - self.heap.insert_batch_entry(batch_entry); + replacements + } - // conserve memory - self.heap.maybe_compact()?; + /// Update the filter representation of our TopK heap. + /// For example, given the sort expression `ORDER BY a DESC, b ASC LIMIT 3`, + /// and the current heap values `[(1, 5), (1, 4), (2, 3)]`, + /// the filter will be updated to: + /// + /// ```sql + /// (a > 1 OR (a = 1 AND b < 5)) AND + /// (a > 1 OR (a = 1 AND b < 4)) AND + /// (a > 2 OR (a = 2 AND b < 3)) + /// ``` + fn update_filter(&mut self) -> Result<()> { + let Some(filter) = &self.filter else { + return Ok(()); + }; + let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { + return Ok(()); + }; - // update memory reservation - self.reservation.try_resize(self.size())?; + // Create filter expressions for each threshold + let mut filters: Vec> = + Vec::with_capacity(thresholds.len()); - // flag the topK as finished if we know that all - // subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, - // which means the top K won't change and the computation can be finished early. - self.attempt_early_completion(&batch)?; + let mut prev_sort_expr: Option> = None; + for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { + // Create the appropriate operator based on sort order + let op = if sort_expr.options.descending { + // For descending sort, we want col > threshold (exclude smaller values) + Operator::Gt + } else { + // For ascending sort, we want col < threshold (exclude larger values) + Operator::Lt + }; + + let value_null = value.is_null(); + + let comparison = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + op, + lit(value.clone()), + )); + + let comparison_with_null = match (sort_expr.options.nulls_first, value_null) { + // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) + (true, true) => lit(false), + (true, false) => Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + comparison, + )), + // For nulls last, transform to (threshold.value is null and threshold.expr is not null) + // or (threshold.value is not null and comparison) + (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?, + (false, false) => comparison, + }; + + let mut eq_expr = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + Operator::Eq, + lit(value.clone()), + )); + + if value_null { + eq_expr = Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + eq_expr, + )); + } + + // For a query like order by a, b, the filter for column `b` is only applied if + // the condition a = threshold.value (considering null equality) is met. + // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, + // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. + match prev_sort_expr.take() { + None => { + prev_sort_expr = Some(eq_expr); + filters.push(comparison_with_null); + } + Some(p) => { + filters.push(Arc::new(BinaryExpr::new( + Arc::clone(&p), + Operator::And, + comparison_with_null, + ))); + + prev_sort_expr = + Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); + } + } + } + + let dynamic_predicate = filters + .into_iter() + .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); + + if let Some(predicate) = dynamic_predicate { + if !predicate.eq(&lit(true)) { + filter.update(predicate)?; + } + } Ok(()) } @@ -324,6 +494,7 @@ impl TopK { common_sort_prefix_converter: _, common_sort_prefix: _, finished: _, + filter: _, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop @@ -566,6 +737,47 @@ impl TopKHeap { + self.store.size() + self.owned_bytes } + + fn get_threshold_values( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Result>> { + // If the heap doesn't have k elements yet, we can't create thresholds + let max_row = match self.max() { + Some(row) => row, + None => return Ok(None), + }; + + // Get the batch that contains the max row + let batch_entry = match self.store.get(max_row.batch_id) { + Some(entry) => entry, + None => return internal_err!("Invalid batch ID in TopKRow"), + }; + + // Extract threshold values for each sort expression + let mut scalar_values = Vec::with_capacity(sort_exprs.len()); + for sort_expr in sort_exprs { + // Extract the value for this column from the max row + let expr = Arc::clone(&sort_expr.expr); + let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 1))?; + + // Convert to scalar value - should be a single value since we're evaluating on a single row batch + let scalar = match value { + ColumnarValue::Scalar(scalar) => scalar, + ColumnarValue::Array(array) if array.len() == 1 => { + // Extract the first (and only) value from the array + ScalarValue::try_from_array(&array, 0)? + } + array => { + return internal_err!("Expected a scalar value, got {:?}", array) + } + }; + + scalar_values.push(scalar); + } + + Ok(Some(scalar_values)) + } } /// Represents one of the top K rows held in this heap. Orders @@ -834,6 +1046,7 @@ mod tests { 2, runtime, &metrics, + None, )?; // Create the first batch with two columns: diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index dc8b7680d83e1..42d64c5ee155d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -285,6 +285,7 @@ datafusion.format.types_info false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true +datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.expand_views_at_output false @@ -396,6 +397,7 @@ datafusion.format.types_info false Show types in visual representation batches datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. +datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 9d5106bf2cafb..3398fa29018bc 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -854,7 +854,7 @@ physical_plan 01)ProjectionExec: expr=[1 as foo] 02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1 03)----SortExec: TopK(fetch=1), expr=[part_key@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], file_type=parquet +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query I with selection as ( diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 26fef6d666b10..9ff382d32af95 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -316,7 +316,7 @@ explain select number, letter, age from partial_sorted order by number desc, let ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Explain variations of the above query with different orderings, and different sort prefixes. @@ -326,28 +326,28 @@ explain select number, letter, age from partial_sorted order by age desc limit 3 ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[age@2 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number desc, letter desc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number asc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by letter asc, number desc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[letter@1 ASC NULLS LAST, number@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Explicit NULLS ordering cases (reversing the order of the NULLS on the number and letter orderings) query TT @@ -355,14 +355,14 @@ explain select number, letter, age from partial_sorted order by number desc, let ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC], preserve_partitioning=[false], sort_prefix=[number@0 DESC] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] query TT explain select number, letter, age from partial_sorted order by number desc NULLS LAST, letter asc limit 3; ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC NULLS LAST, letter@1 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Verify that the sort prefix is correctly computed on the normalized ordering (removing redundant aliased columns) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 42282e39e41f5..fcb2360b87e3f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |