From de8630635ae402de381d8a3e9a447b670a23d0bc Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 18 Apr 2026 15:53:38 +0800 Subject: [PATCH 1/2] feat: initialize TopK dynamic filter threshold from parquet row group statistics Before reading any parquet data, scan row group min/max statistics to compute an initial threshold for TopK's dynamic filter. This allows row-level filtering to benefit immediately from the first file opened, rather than waiting until TopK processes enough rows to build a threshold organically. Algorithm (single-column sort): - DESC LIMIT K: threshold = max(min) across RGs with num_rows >= K Filter: col > threshold - ASC LIMIT K: threshold = min(max) across RGs with num_rows >= K Filter: col < threshold The DynamicFilterPhysicalExpr is shared across all partitions, so each file's threshold update is visible to subsequent files globally. Graceful fallback: skips initialization when statistics are unavailable, column is not found, or sort is multi-column. --- datafusion/datasource-parquet/src/opener.rs | 569 +++++++++++++++++- .../sqllogictest/test_files/sort_pushdown.slt | 142 +++++ 2 files changed, 710 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f..95102ede8ff6 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,7 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{Array, RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; @@ -45,6 +45,10 @@ use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, lit, +}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -67,6 +71,7 @@ use log::debug; use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; @@ -1075,6 +1080,25 @@ impl RowGroupsPrunedParquetOpen { let file_metadata = Arc::clone(reader_metadata.metadata()); let rg_metadata = file_metadata.row_groups(); + // Initialize TopK dynamic filter from row group statistics. + // This sets an initial threshold before any data is read, so that + // subsequent row filtering can benefit immediately. + // Uses reverse_row_groups to infer sort direction: + // true => DESC (file declared ASC, reversed for DESC queries) + // false => ASC + if let (Some(predicate), Some(limit)) = (&prepared.predicate, prepared.limit) + && let Err(e) = try_init_topk_threshold( + predicate, + limit, + prepared.reverse_row_groups, + rg_metadata, + &prepared.physical_file_schema, + reader_metadata.parquet_schema(), + ) + { + debug!("Skipping TopK threshold initialization from statistics: {e}"); + } + // Filter pushdown: evaluate predicates during scan let row_filter = if let Some(predicate) = prepared .pushdown_filters @@ -1218,6 +1242,191 @@ impl RowGroupsPrunedParquetOpen { } } +/// Attempt to initialize a TopK dynamic filter threshold from row group statistics. +/// +/// Before any parquet data is read, this function scans the row group min/max +/// statistics to compute an initial threshold for the TopK dynamic filter. +/// By setting this threshold early, subsequent row group pruning and row-level +/// filtering can benefit immediately. +/// +/// **Algorithm (single-column sort only):** +/// +/// For `ORDER BY col DESC LIMIT K`: +/// - For each row group where `num_rows >= K`: the min value of that RG is a +/// lower bound on the K-th largest value. +/// - `threshold = max(min)` across all qualifying row groups. +/// - Filter: `col > threshold` +/// +/// For `ORDER BY col ASC LIMIT K`: +/// - For each row group where `num_rows >= K`: the max value is an upper bound +/// on the K-th smallest value. +/// - `threshold = min(max)` across qualifying row groups. +/// - Filter: `col < threshold` +fn try_init_topk_threshold( + predicate: &Arc, + limit: usize, + reverse_row_groups: bool, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + arrow_schema: &Schema, + parquet_schema: &parquet::schema::types::SchemaDescriptor, +) -> Result<()> { + // Find the DynamicFilterPhysicalExpr in the predicate tree. + let dynamic_filter = match find_dynamic_filter(predicate) { + Some(df) => df, + None => return Ok(()), // No dynamic filter found, nothing to do + }; + + // Only handle single-column sort for now. + let children = dynamic_filter.children(); + if children.len() != 1 { + debug!( + "Skipping TopK threshold initialization: expected 1 sort column, found {}", + children.len() + ); + return Ok(()); + } + + // The child must be a Column expression so we can look up statistics. + let col_expr: Arc = Arc::clone(children[0]); + let col_any: &dyn std::any::Any = col_expr.as_ref(); + let column = col_any.downcast_ref::().ok_or_else(|| { + DataFusionError::Internal( + "TopK threshold init: sort child is not a Column expression".to_string(), + ) + })?; + + let col_name = column.name(); + + // Build a statistics converter for the sort column. + let converter = StatisticsConverter::try_new(col_name, arrow_schema, parquet_schema) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // Determine sort direction from reverse_row_groups: + // reverse_row_groups = true => DESC sort (file is ASC, we reverse) + // reverse_row_groups = false => ASC sort + let is_descending = reverse_row_groups; + + // Compute the threshold. + let threshold = if is_descending { + // DESC: threshold = max(min) across RGs with num_rows >= limit + let mins = converter + .row_group_mins(rg_metadata.iter()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + compute_best_threshold_from_stats(&mins, rg_metadata, limit, true)? + } else { + // ASC: threshold = min(max) across RGs with num_rows >= limit + let maxes = converter + .row_group_maxes(rg_metadata.iter()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + compute_best_threshold_from_stats(&maxes, rg_metadata, limit, false)? + }; + + let threshold = match threshold { + Some(t) => t, + None => { + debug!("No qualifying row groups for TopK threshold initialization"); + return Ok(()); + } + }; + + // Build the filter expression: col > threshold (DESC) or col < threshold (ASC) + let op = if is_descending { + Operator::Gt + } else { + Operator::Lt + }; + + let filter_expr: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_expr), + op, + lit(threshold.clone()), + )); + + debug!( + "Initializing TopK dynamic filter from statistics: {} {} {}", + col_name, + if is_descending { ">" } else { "<" }, + threshold + ); + + dynamic_filter.update(filter_expr)?; + + Ok(()) +} + +/// Find a [`DynamicFilterPhysicalExpr`] in the predicate tree. +/// +/// Returns the first `DynamicFilterPhysicalExpr` found (as an `Arc`) by +/// checking the predicate itself and recursively walking its children. +fn find_dynamic_filter( + expr: &Arc, +) -> Option> { + // Try to downcast this expression directly. + // PhysicalExpr: Any + Send + Sync, so trait upcasting allows the coercion. + let cloned = Arc::clone(expr); + let any_arc: Arc = cloned; + if let Ok(df) = Arc::downcast::(any_arc) { + return Some(df); + } + + // Recursively check children + for child in expr.children() { + if let Some(df) = find_dynamic_filter(child) { + return Some(df); + } + } + + None +} + +/// Compute the best threshold from row group statistics. +/// +/// For `want_max = true` (DESC): finds the maximum value from the stats array +/// across row groups with `num_rows >= limit`. +/// +/// For `want_max = false` (ASC): finds the minimum value from the stats array +/// across row groups with `num_rows >= limit`. +fn compute_best_threshold_from_stats( + stats: &arrow::array::ArrayRef, + rg_metadata: &[parquet::file::metadata::RowGroupMetaData], + limit: usize, + want_max: bool, +) -> Result> { + let mut best: Option = None; + + for (i, rg) in rg_metadata.iter().enumerate() { + // Only consider row groups with enough rows + if (rg.num_rows() as usize) < limit { + continue; + } + + // Skip null statistics + if i >= stats.len() || stats.is_null(i) { + continue; + } + + let value = ScalarValue::try_from_array(stats.as_ref(), i)?; + if value.is_null() { + continue; + } + + best = Some(match best { + None => value, + Some(current) => { + if want_max { + // Keep the maximum + if value > current { value } else { current } + } else { + // Keep the minimum + if value < current { value } else { current } + } + } + }); + } + + Ok(best) +} + /// State for a stream that decodes a single Parquet file using a push-based decoder. /// /// The [`transition`](Self::transition) method drives the decoder in a loop: it requests @@ -2720,4 +2929,362 @@ mod test { "without page index all rows are returned" ); } + + // --------------------------------------------------------------- + // Helper: build RowGroupMetaData with a given num_rows + // --------------------------------------------------------------- + + /// Create a minimal `SchemaDescriptor` with a single Int64 column named `id`. + fn make_int64_schema_descr() -> parquet::schema::types::SchemaDescPtr { + use parquet::basic::Type as PhysicalType; + use parquet::schema::types::Type as SchemaType; + + let field = SchemaType::primitive_type_builder("id", PhysicalType::INT64) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + Arc::new(parquet::schema::types::SchemaDescriptor::new(Arc::new( + schema, + ))) + } + + /// Build a vector of `RowGroupMetaData`, one per entry in `row_counts`. + fn make_rg_metadata( + row_counts: &[i64], + ) -> Vec { + let schema_descr = make_int64_schema_descr(); + row_counts + .iter() + .map(|&num_rows| { + let column = parquet::file::metadata::ColumnChunkMetaData::builder( + schema_descr.column(0), + ) + .set_num_values(num_rows) + .build() + .unwrap(); + parquet::file::metadata::RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect() + } + + // --------------------------------------------------------------- + // Tests for compute_best_threshold_from_stats + // --------------------------------------------------------------- + + #[test] + fn test_compute_threshold_desc_picks_max_of_mins() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = + Arc::new(Int64Array::from(vec![Some(10), Some(50), Some(30)])); + let rg_meta = make_rg_metadata(&[100_000, 100_000, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + assert_eq!(result, Some(ScalarValue::Int64(Some(50)))); + } + + #[test] + fn test_compute_threshold_asc_picks_min_of_maxes() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = + Arc::new(Int64Array::from(vec![Some(100), Some(50), Some(80)])); + let rg_meta = make_rg_metadata(&[100_000, 100_000, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, false).unwrap(); + assert_eq!(result, Some(ScalarValue::Int64(Some(50)))); + } + + #[test] + fn test_compute_threshold_skips_small_rgs() { + use arrow::array::{ArrayRef, Int64Array}; + + // RG 0: 100K rows, value=10 + // RG 1: 50 rows (< limit=100), value=999 — should be skipped + // RG 2: 100K rows, value=30 + let stats: ArrayRef = + Arc::new(Int64Array::from(vec![Some(10), Some(999), Some(30)])); + let rg_meta = make_rg_metadata(&[100_000, 50, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + // want_max=true => max(10, 30) = 30 (skipping the 999 from the small RG) + assert_eq!(result, Some(ScalarValue::Int64(Some(30)))); + } + + #[test] + fn test_compute_threshold_skips_null_stats() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = Arc::new(Int64Array::from(vec![Some(10), None, Some(30)])); + let rg_meta = make_rg_metadata(&[100_000, 100_000, 100_000]); + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + // Null entry is skipped, max(10, 30) = 30 + assert_eq!(result, Some(ScalarValue::Int64(Some(30)))); + } + + #[test] + fn test_compute_threshold_all_rgs_too_small() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = Arc::new(Int64Array::from(vec![Some(10), Some(50)])); + let rg_meta = make_rg_metadata(&[5, 10]); // all < limit=100 + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_compute_threshold_empty_metadata() { + use arrow::array::{ArrayRef, Int64Array}; + + let stats: ArrayRef = Arc::new(Int64Array::from(Vec::>::new())); + let rg_meta: Vec = vec![]; + + let result = + compute_best_threshold_from_stats(&stats, &rg_meta, 100, true).unwrap(); + assert_eq!(result, None); + } + + // --------------------------------------------------------------- + // Tests for find_dynamic_filter + // --------------------------------------------------------------- + + #[test] + fn test_find_dynamic_filter_direct_match() { + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![col_expr], super::lit(true)), + ); + + let result = find_dynamic_filter(&dynamic_filter); + assert!( + result.is_some(), + "should find direct DynamicFilterPhysicalExpr" + ); + } + + #[test] + fn test_find_dynamic_filter_nested_in_conjunction() { + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![col_expr], super::lit(true)), + ); + // Wrap it: (id > 0) AND dynamic_filter + let left: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Gt, + super::lit(0i64), + )); + let conjunction: Arc = + Arc::new(BinaryExpr::new(left, Operator::And, dynamic_filter)); + + let result = find_dynamic_filter(&conjunction); + assert!( + result.is_some(), + "should find DynamicFilterPhysicalExpr nested in AND" + ); + } + + #[test] + fn test_find_dynamic_filter_none_when_absent() { + let col_expr: Arc = Arc::new(Column::new("id", 0)); + let result = find_dynamic_filter(&col_expr); + assert!( + result.is_none(), + "plain Column has no DynamicFilterPhysicalExpr" + ); + } + + // --------------------------------------------------------------- + // Tests for try_init_topk_threshold + // --------------------------------------------------------------- + + /// Write a small parquet file and return its metadata so we can build + /// realistic `RowGroupMetaData` with actual statistics. + fn make_parquet_metadata_with_stats( + row_counts: &[Vec], + ) -> ( + Vec, + Schema, + parquet::schema::types::SchemaDescPtr, + ) { + use parquet::arrow::ArrowWriter; + use parquet::file::reader::FileReader; + use parquet::file::serialized_reader::SerializedFileReader; + + let schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); + + // Write each row_counts entry as a separate row group + let mut buf = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, Arc::clone(&schema), None).unwrap(); + for values in row_counts { + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(arrow::array::Int64Array::from(values.clone()))], + ) + .unwrap(); + writer.write(&batch).unwrap(); + writer.flush().unwrap(); // flush after each batch -> separate RG + } + writer.close().unwrap(); + + // Read back metadata + let reader = SerializedFileReader::new(bytes::Bytes::from(buf)).unwrap(); + let file_metadata = reader.metadata(); + let rg_metadata: Vec<_> = file_metadata.row_groups().to_vec(); + let parquet_schema = file_metadata.file_metadata().schema_descr_ptr(); + (rg_metadata, schema.as_ref().clone(), parquet_schema) + } + + #[test] + fn test_try_init_topk_threshold_desc() { + // Two RGs: + // RG0: values [1..=200] => min=1, max=200 + // RG1: values [100..=300] => min=100, max=300 + // DESC sort, limit=10: threshold = max(min) = max(1, 100) = 100 + // Filter should be: id > 100 + let rg0_values: Vec = (1..=200).collect(); + let rg1_values: Vec = (100..=300).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values, rg1_values]); + + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_expr)], + super::lit(true), + )); + let predicate: Arc = dynamic_filter.clone(); + + try_init_topk_threshold( + &predicate, + 10, + true, // DESC => reverse_row_groups=true + &rg_metadata, + &arrow_schema, + &parquet_schema, + ) + .unwrap(); + + // Verify the dynamic filter was updated + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert!( + display.contains(">"), + "DESC should produce Gt operator, got: {display}" + ); + assert!( + display.contains("100"), + "threshold should be 100 (max of mins), got: {display}" + ); + } + + #[test] + fn test_try_init_topk_threshold_asc() { + // Two RGs: + // RG0: values [1..=200] => min=1, max=200 + // RG1: values [100..=300] => min=100, max=300 + // ASC sort, limit=10: threshold = min(max) = min(200, 300) = 200 + // Filter should be: id < 200 + let rg0_values: Vec = (1..=200).collect(); + let rg1_values: Vec = (100..=300).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values, rg1_values]); + + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_expr)], + super::lit(true), + )); + let predicate: Arc = dynamic_filter.clone(); + + try_init_topk_threshold( + &predicate, + 10, + false, // ASC => reverse_row_groups=false + &rg_metadata, + &arrow_schema, + &parquet_schema, + ) + .unwrap(); + + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert!( + display.contains("<"), + "ASC should produce Lt operator, got: {display}" + ); + assert!( + display.contains("200"), + "threshold should be 200 (min of maxes), got: {display}" + ); + } + + #[test] + fn test_try_init_topk_threshold_no_dynamic_filter() { + let rg0_values: Vec = (1..=200).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values]); + + // Plain predicate with no DynamicFilterPhysicalExpr + let predicate: Arc = Arc::new(Column::new("id", 0)); + + let result = try_init_topk_threshold( + &predicate, + 10, + true, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ); + assert!(result.is_ok(), "should be a no-op when no dynamic filter"); + } + + #[test] + fn test_try_init_topk_threshold_multi_column_skipped() { + let rg0_values: Vec = (1..=200).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values]); + + // DynamicFilterPhysicalExpr with 2 children => should be skipped + let col1 = Arc::new(Column::new("id", 0)) as Arc; + let col2 = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![col1, col2], + super::lit(true), + )); + let predicate: Arc = dynamic_filter.clone(); + + let result = try_init_topk_threshold( + &predicate, + 10, + true, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ); + assert!( + result.is_ok(), + "multi-column sort should be skipped gracefully" + ); + + // Filter should still be the original `super::lit(true)` + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert_eq!(display, "true", "filter should remain unchanged: {display}"); + } } diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f397701..78a69481a636 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2271,6 +2271,145 @@ DROP TABLE tg_src_high; statement ok DROP TABLE tg_buffer; +# ============================================================================ +# Test H: TopK threshold initialization from row group statistics +# ============================================================================ +# When TopK's dynamic filter is pushed to parquet, we can initialize +# its threshold from row group statistics before reading any data. +# This test verifies correctness with scrambled multi-file data. + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 5; + +# Create 3 files with non-overlapping but scrambled id ranges: +# file_a.parquet: ids 7-10 (highest range, alphabetically first) +# file_b.parquet: ids 1-3 (lowest range) +# file_c.parquet: ids 4-6 (middle range) +# Alphabetical order: a(7-10), b(1-3), c(4-6) — NOT sorted by id. +# TopK DESC should read file_a first (highest min) after reorder. + +statement ok +CREATE TABLE th_high(id INT, value INT) AS VALUES +(7, 700), (8, 800), (9, 900), (10, 1000); + +statement ok +CREATE TABLE th_low(id INT, value INT) AS VALUES +(1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE th_mid(id INT, value INT) AS VALUES +(4, 400), (5, 500), (6, 600); + +query I +COPY (SELECT * FROM th_high ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/topk_stats/file_a.parquet'; +---- +4 + +query I +COPY (SELECT * FROM th_low ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/topk_stats/file_b.parquet'; +---- +3 + +query I +COPY (SELECT * FROM th_mid ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/topk_stats/file_c.parquet'; +---- +3 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE th_scrambled(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/topk_stats/' +WITH ORDER (id ASC); + +# Test H.1: DESC LIMIT should return correct results despite scrambled file order +query II +SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 3; +---- +10 1000 +9 900 +8 800 + +# Test H.2: ASC LIMIT should also work correctly +query II +SELECT * FROM th_scrambled ORDER BY id ASC LIMIT 3; +---- +1 100 +2 200 +3 300 + +# Test H.3: Larger LIMIT spanning multiple files +query II +SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 7; +---- +10 1000 +9 900 +8 800 +7 700 +6 600 +5 500 +4 400 + +# Test H.4: DESC with EXPLAIN to verify dynamic filter is pushed down +query TT +EXPLAIN SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: th_scrambled.id DESC NULLS FIRST, fetch=3 +02)--TableScan: th_scrambled projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/topk_stats/file_a.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/topk_stats/file_c.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/topk_stats/file_b.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Test H.5: With multiple partitions — TopK filter is shared across partitions +statement ok +SET datafusion.execution.target_partitions = 4; + +query II +SELECT * FROM th_scrambled ORDER BY id DESC LIMIT 5; +---- +10 1000 +9 900 +8 800 +7 700 +6 600 + +query II +SELECT * FROM th_scrambled ORDER BY id ASC LIMIT 5; +---- +1 100 +2 200 +3 300 +4 400 +5 500 + +# Cleanup Test H +statement ok +DROP TABLE th_high; + +statement ok +DROP TABLE th_low; + +statement ok +DROP TABLE th_mid; + +statement ok +DROP TABLE th_scrambled; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; @@ -2280,3 +2419,6 @@ SET datafusion.execution.collect_statistics = true; statement ok SET datafusion.optimizer.enable_sort_pushdown = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; From fbbaf61875c902bfd3a8f6e7213f420548198280 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 18 Apr 2026 15:56:43 +0800 Subject: [PATCH 2/2] feat: initialize TopK dynamic filter threshold from parquet statistics Before reading any parquet data, scan row group min/max statistics to compute an initial threshold for TopK's dynamic filter. This allows row-level filtering to benefit immediately from the first file opened, rather than waiting until TopK processes enough rows to build a threshold organically. Algorithm (single-column sort): - DESC LIMIT K: threshold = max(min) across RGs with num_rows >= K Filter: col > threshold - ASC LIMIT K: threshold = min(max) across RGs with num_rows >= K Filter: col < threshold Sort direction is read from sort_options on DynamicFilterPhysicalExpr, which is now set by SortExec::create_filter() for TopK queries. This makes the optimization work for ALL TopK queries on parquet, not just those with sort pushdown. The DynamicFilterPhysicalExpr is shared across all partitions, so each file's threshold update is visible to subsequent files globally. Graceful fallback: skips initialization when sort_options is absent, statistics are unavailable, column not found, or multi-column sort. --- datafusion/datasource-parquet/src/opener.rs | 84 ++++++++++++++----- .../src/expressions/dynamic_filters.rs | 28 +++++++ datafusion/physical-plan/src/sorts/sort.rs | 11 ++- 3 files changed, 101 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 95102ede8ff6..16d314a2065f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1083,14 +1083,12 @@ impl RowGroupsPrunedParquetOpen { // Initialize TopK dynamic filter from row group statistics. // This sets an initial threshold before any data is read, so that // subsequent row filtering can benefit immediately. - // Uses reverse_row_groups to infer sort direction: - // true => DESC (file declared ASC, reversed for DESC queries) - // false => ASC + // Sort direction is read from the DynamicFilterPhysicalExpr's + // sort_options (set by SortExec for TopK queries). if let (Some(predicate), Some(limit)) = (&prepared.predicate, prepared.limit) && let Err(e) = try_init_topk_threshold( predicate, limit, - prepared.reverse_row_groups, rg_metadata, &prepared.physical_file_schema, reader_metadata.parquet_schema(), @@ -1265,7 +1263,6 @@ impl RowGroupsPrunedParquetOpen { fn try_init_topk_threshold( predicate: &Arc, limit: usize, - reverse_row_groups: bool, rg_metadata: &[parquet::file::metadata::RowGroupMetaData], arrow_schema: &Schema, parquet_schema: &parquet::schema::types::SchemaDescriptor, @@ -1276,17 +1273,25 @@ fn try_init_topk_threshold( None => return Ok(()), // No dynamic filter found, nothing to do }; + // Read sort options from the dynamic filter (set by SortExec for TopK). + let sort_options = match dynamic_filter.sort_options() { + Some(opts) => opts, + None => return Ok(()), // No sort options, cannot determine direction + }; + // Only handle single-column sort for now. - let children = dynamic_filter.children(); - if children.len() != 1 { + if sort_options.len() != 1 { debug!( "Skipping TopK threshold initialization: expected 1 sort column, found {}", - children.len() + sort_options.len() ); return Ok(()); } + let is_descending = sort_options[0].descending; + // The child must be a Column expression so we can look up statistics. + let children = dynamic_filter.children(); let col_expr: Arc = Arc::clone(children[0]); let col_any: &dyn std::any::Any = col_expr.as_ref(); let column = col_any.downcast_ref::().ok_or_else(|| { @@ -1301,11 +1306,6 @@ fn try_init_topk_threshold( let converter = StatisticsConverter::try_new(col_name, arrow_schema, parquet_schema) .map_err(|e| DataFusionError::External(Box::new(e)))?; - // Determine sort direction from reverse_row_groups: - // reverse_row_groups = true => DESC sort (file is ASC, we reverse) - // reverse_row_groups = false => ASC sort - let is_descending = reverse_row_groups; - // Compute the threshold. let threshold = if is_descending { // DESC: threshold = max(min) across RGs with num_rows >= limit @@ -3164,16 +3164,20 @@ mod test { make_parquet_metadata_with_stats(&[rg0_values, rg1_values]); let col_expr = Arc::new(Column::new("id", 0)) as Arc; - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + let desc_opts = arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new_with_sort_options( vec![Arc::clone(&col_expr)], super::lit(true), + vec![desc_opts], )); let predicate: Arc = dynamic_filter.clone(); try_init_topk_threshold( &predicate, 10, - true, // DESC => reverse_row_groups=true &rg_metadata, &arrow_schema, &parquet_schema, @@ -3206,16 +3210,20 @@ mod test { make_parquet_metadata_with_stats(&[rg0_values, rg1_values]); let col_expr = Arc::new(Column::new("id", 0)) as Arc; - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + let asc_opts = arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new_with_sort_options( vec![Arc::clone(&col_expr)], super::lit(true), + vec![asc_opts], )); let predicate: Arc = dynamic_filter.clone(); try_init_topk_threshold( &predicate, 10, - false, // ASC => reverse_row_groups=false &rg_metadata, &arrow_schema, &parquet_schema, @@ -3246,7 +3254,6 @@ mod test { let result = try_init_topk_threshold( &predicate, 10, - true, &rg_metadata, &arrow_schema, &parquet_schema, @@ -3254,25 +3261,60 @@ mod test { assert!(result.is_ok(), "should be a no-op when no dynamic filter"); } + #[test] + fn test_try_init_topk_threshold_no_sort_options() { + let rg0_values: Vec = (1..=200).collect(); + let (rg_metadata, arrow_schema, parquet_schema) = + make_parquet_metadata_with_stats(&[rg0_values]); + + // DynamicFilterPhysicalExpr WITHOUT sort_options => should be skipped + let col_expr = Arc::new(Column::new("id", 0)) as Arc; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![col_expr], + super::lit(true), + )); + let predicate: Arc = dynamic_filter.clone(); + + let result = try_init_topk_threshold( + &predicate, + 10, + &rg_metadata, + &arrow_schema, + &parquet_schema, + ); + assert!( + result.is_ok(), + "no sort_options should be skipped gracefully" + ); + + let current = dynamic_filter.current().unwrap(); + let display = format!("{current}"); + assert_eq!(display, "true", "filter should remain unchanged: {display}"); + } + #[test] fn test_try_init_topk_threshold_multi_column_skipped() { let rg0_values: Vec = (1..=200).collect(); let (rg_metadata, arrow_schema, parquet_schema) = make_parquet_metadata_with_stats(&[rg0_values]); - // DynamicFilterPhysicalExpr with 2 children => should be skipped + // DynamicFilterPhysicalExpr with 2 sort columns => should be skipped let col1 = Arc::new(Column::new("id", 0)) as Arc; let col2 = Arc::new(Column::new("id", 0)) as Arc; - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + let opts = arrow::compute::SortOptions { + descending: true, + nulls_first: true, + }; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new_with_sort_options( vec![col1, col2], super::lit(true), + vec![opts, opts], )); let predicate: Arc = dynamic_filter.clone(); let result = try_init_topk_threshold( &predicate, 10, - true, &rg_metadata, &arrow_schema, &parquet_schema, diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a..5671426f50a1 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::compute::SortOptions; use parking_lot::RwLock; use std::{fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; @@ -74,6 +75,11 @@ pub struct DynamicFilterPhysicalExpr { /// But this can have overhead in production, so it's only included in our tests. data_type: Arc>>, nullable: Arc>>, + /// Optional sort options for each child expression. + /// When set (e.g., by SortExec for TopK), downstream consumers like the + /// parquet reader can use these to initialize the filter threshold from + /// column statistics before reading any data. + sort_options: Option>, } #[derive(Debug)] @@ -177,9 +183,30 @@ impl DynamicFilterPhysicalExpr { state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), + sort_options: None, } } + /// Create a new [`DynamicFilterPhysicalExpr`] with sort options. + /// + /// Sort options indicate the sort direction for each child expression, + /// enabling downstream consumers (e.g., parquet readers) to initialize + /// the filter threshold from column statistics before reading data. + pub fn new_with_sort_options( + children: Vec>, + inner: Arc, + sort_options: Vec, + ) -> Self { + let mut this = Self::new(children, inner); + this.sort_options = Some(sort_options); + this + } + + /// Returns the sort options for each child expression, if available. + pub fn sort_options(&self) -> Option<&[SortOptions]> { + self.sort_options.as_deref() + } + fn remap_children( children: &[Arc], remapped_children: Option<&Vec>>, @@ -368,6 +395,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), + sort_options: self.sort_options.clone(), })) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d..89fb558789d3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -915,8 +915,17 @@ impl SortExec { .iter() .map(|sort_expr| Arc::clone(&sort_expr.expr)) .collect::>(); + let sort_options = self + .expr + .iter() + .map(|sort_expr| sort_expr.options) + .collect::>(); Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( - DynamicFilterPhysicalExpr::new(children, lit(true)), + DynamicFilterPhysicalExpr::new_with_sort_options( + children, + lit(true), + sort_options, + ), )))) }