From 1b1b5865df935db830c4c6785e882b2e78731bc6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 3 Apr 2026 13:42:21 -0400 Subject: [PATCH 01/16] Add Dynamic work scheduling in FileStream --- Cargo.lock | 1 + datafusion/datasource/Cargo.toml | 1 + .../datasource/src/file_scan_config/mod.rs | 136 +++- .../datasource/src/file_stream/builder.rs | 21 +- datafusion/datasource/src/file_stream/mod.rs | 593 ++++++++++++++++-- .../datasource/src/file_stream/scan_state.rs | 24 +- .../datasource/src/file_stream/work_source.rs | 102 +++ datafusion/datasource/src/morsel/mocks.rs | 12 +- datafusion/datasource/src/source.rs | 92 ++- datafusion/datasource/src/test_util.rs | 12 +- 10 files changed, 920 insertions(+), 74 deletions(-) create mode 100644 datafusion/datasource/src/file_stream/work_source.rs diff --git a/Cargo.lock b/Cargo.lock index a67cffe08a5ca..3fea730f283ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1969,6 +1969,7 @@ dependencies = [ "liblzma", "log", "object_store", + "parking_lot", "rand 0.9.2", "tempfile", "tokio", diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 4027521658977..40e2271f45205 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -64,6 +64,7 @@ itertools = { workspace = true } liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } +parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index f4e4e0a4dec0d..e75476f6ee551 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -24,7 +24,8 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, - source::DataSource, statistics::MinMaxStatistics, + file_stream::work_source::SharedWorkSource, source::DataSource, + statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; @@ -38,6 +39,7 @@ use datafusion_execution::{ }; use datafusion_expr::Operator; +use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -580,6 +582,15 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { + self.open_with_args(OpenArgs::new(partition, context)) + } + + fn open_with_args(&self, args: OpenArgs) -> Result { + let OpenArgs { + partition, + context, + sibling_state, + } = args; let object_store = context.runtime_env().object_store(&self.object_store_url)?; let batch_size = self .batch_size @@ -589,8 +600,17 @@ impl DataSource for FileScanConfig { let morselizer = source.create_morselizer(object_store, self, partition)?; + // Extract the shared work source from the sibling state if it exists. + // This allows multiple sibling streams to steal work from a single + // shared queue of unopened files. + let shared_work_source = sibling_state + .as_ref() + .and_then(|state| state.downcast_ref::()) + .cloned(); + let stream = FileStreamBuilder::new(self) .with_partition(partition) + .with_shared_work_source(shared_work_source) .with_morselizer(morselizer) .with_metrics(source.metrics()) .build()?; @@ -991,6 +1011,20 @@ impl DataSource for FileScanConfig { // Delegate to the file source self.file_source.apply_expressions(f) } + + /// Create any shared state that should be passed between sibling streams + /// during one execution. + /// + /// This returns `None` when sibling streams must not share work, such as + /// when file order must be preserved or the file groups define the output + /// partitioning needed for the rest of the plan + fn create_sibling_state(&self) -> Option> { + if self.preserve_order || self.partitioned_by_file_group { + return None; + } + + Some(Arc::new(SharedWorkSource::from_config(self)) as Arc) + } } impl FileScanConfig { @@ -1368,19 +1402,33 @@ mod tests { use super::*; use crate::TableSchema; + use crate::source::DataSourceExec; use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; + use arrow::array::{Int32Array, RecordBatch}; use arrow::datatypes::Field; use datafusion_common::ColumnStatistics; use datafusion_common::stats::Precision; + use datafusion_common::tree_node::TreeNodeRecursion; + use datafusion_common::{Result, assert_batches_eq, internal_err}; + use datafusion_execution::TaskContext; use datafusion_expr::SortExpr; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExpr; + use datafusion_physical_expr::projection::ProjectionExprs; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::execution_plan::collect; + use futures::FutureExt as _; + use futures::StreamExt as _; + use futures::stream; + use object_store::ObjectStore; + use std::fmt::Debug; #[derive(Clone)] struct InexactSortPushdownSource { @@ -1400,7 +1448,7 @@ mod tests { impl FileSource for InexactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { @@ -2288,6 +2336,88 @@ mod tests { assert_eq!(partition_stats.total_byte_size, Precision::Exact(800)); } + /// Regression test for reusing a `DataSourceExec` after its execution-local + /// shared work queue has been drained. + /// + /// This test uses a single file group with two files so the scan creates a + /// shared unopened-file queue. Executing after `reset_state` must recreate + /// the shared queue and return the same rows again. + #[tokio::test] + async fn reset_state_recreates_shared_work_source() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let file_source = Arc::new( + MockSource::new(Arc::clone(&schema)) + .with_file_opener(Arc::new(ResetStateTestFileOpener { schema })), + ); + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_group(FileGroup::new(vec![ + PartitionedFile::new("file1.parquet", 100), + PartitionedFile::new("file2.parquet", 100), + ])) + .build(); + + let exec: Arc = DataSourceExec::from_data_source(config); + let task_ctx = Arc::new(TaskContext::default()); + + // Running the same scan after resetting the state, should + // produce the same answer. + let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?; + let reset_exec = exec.reset_state()?; + let second_run = collect(reset_exec, task_ctx).await?; + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| 1 |", + "| 2 |", + "+-------+", + ]; + assert_batches_eq!(expected, &first_run); + assert_batches_eq!(expected, &second_run); + + Ok(()) + } + + /// Test-only `FileOpener` that turns file names like `file1.parquet` into a + /// single-batch stream containing that numeric value + #[derive(Debug)] + struct ResetStateTestFileOpener { + schema: SchemaRef, + } + + impl crate::file_stream::FileOpener for ResetStateTestFileOpener { + fn open( + &self, + file: PartitionedFile, + ) -> Result { + let value = file + .object_meta + .location + .as_ref() + .trim_start_matches("file") + .trim_end_matches(".parquet") + .parse::() + .expect("invalid test file name"); + let schema = Arc::clone(&self.schema); + Ok(async move { + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![value]))], + ) + .expect("test batch should be valid"); + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } + .boxed()) + } + } + #[test] fn test_output_partitioning_not_partitioned_by_file_group() { let file_schema = aggr_test_schema(); @@ -2476,7 +2606,7 @@ mod tests { impl FileSource for ExactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index efe9c39ce3b38..7034e902550a9 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; use crate::file_stream::scan_state::ScanState; +use crate::file_stream::work_source::{SharedWorkSource, WorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -33,10 +34,11 @@ pub struct FileStreamBuilder<'a> { morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, + shared_work_source: Option, } impl<'a> FileStreamBuilder<'a> { - /// Create a new builder. + /// Create a new builder for [`FileStream`]. pub fn new(config: &'a FileScanConfig) -> Self { Self { config, @@ -44,6 +46,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer: None, metrics: None, on_error: OnError::Fail, + shared_work_source: None, } } @@ -81,6 +84,15 @@ impl<'a> FileStreamBuilder<'a> { self } + /// Configure the [`SharedWorkSource`] for sibling work stealing. + pub(crate) fn with_shared_work_source( + mut self, + shared_work_source: Option, + ) -> Self { + self.shared_work_source = shared_work_source; + self + } + /// Build the configured [`FileStream`]. pub fn build(self) -> Result { let Self { @@ -89,6 +101,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer, metrics, on_error, + shared_work_source, } = self; let Some(partition) = partition else { @@ -106,10 +119,14 @@ impl<'a> FileStreamBuilder<'a> { "FileStreamBuilder invalid partition index: {partition}" ); }; + let work_source = match shared_work_source { + Some(shared) => WorkSource::Shared(shared), + None => WorkSource::Local(file_group.into_inner().into()), + }; let file_stream_metrics = FileStreamMetrics::new(metrics, partition); let scan_state = Box::new(ScanState::new( - file_group.into_inner(), + work_source, config.limit, morselizer, on_error, diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index ff71f16023080..b9aabacf64c11 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -24,6 +24,7 @@ mod builder; mod metrics; mod scan_state; +pub(crate) mod work_source; use std::pin::Pin; use std::sync::Arc; @@ -175,6 +176,7 @@ mod tests { IoFutureId, MockMorselizer, MockPlanBuilder, MockPlanner, MorselId, PendingPlannerBuilder, PollsToResolve, }; + use crate::source::DataSource; use crate::tests::make_partition; use crate::{PartitionedFile, TableSchema}; use arrow::array::{AsArray, RecordBatch}; @@ -184,14 +186,22 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{FutureExt as _, StreamExt as _}; + use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; + use crate::file_stream::{ + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, + work_source::SharedWorkSource, + }; use crate::test_util::MockSource; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; + /// Test identifier for one `FileStream` partition. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct PartitionId(usize); + /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] struct TestOpener { @@ -758,8 +768,8 @@ mod tests { async fn morsel_two_ios_one_batch() -> Result<()> { let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") - .add_plan(PendingPlannerBuilder::new(IoFutureId(1)).build()) - .add_plan(PendingPlannerBuilder::new(IoFutureId(2)).build()) + .add_plan(PendingPlannerBuilder::new(IoFutureId(1))) + .add_plan(PendingPlannerBuilder::new(IoFutureId(2))) .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) .return_none(), ); @@ -871,8 +881,7 @@ mod tests { async fn morsel_ready_child_planner() -> Result<()> { let child_planner = MockPlanner::builder("child planner") .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) - .return_none() - .build(); + .return_none(); let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") @@ -1001,11 +1010,265 @@ mod tests { Ok(()) } - /// Tests how FileStream opens and processes files. + /// Return a morsel test with two partitions: + /// Partition 0: file1, file2, file3 + /// Partition 1: file4 + /// + /// Partition 1 has only 1 file but it polled first 4 times + fn two_partition_morsel_test() -> FileStreamMorselTest { + FileStreamMorselTest::new() + // Partition 0 has three files + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 1 has only one file, but is polled first + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file4.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13), 201)) + .return_none(), + ) + .with_reads(vec![ + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + ]) + } + + /// Verifies that an idle sibling stream can steal shared files from + /// another stream once it exhausts its own local work. + #[tokio::test] + async fn morsel_shared_files_can_be_stolen() -> Result<()> { + let test = two_partition_morsel_test().with_file_stream_events(false); + + // Partition 0 starts with 3 files, but Partition 1 is polled first. + // Since Partition is polled first, it will run all the files even those + // that were assigned to Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that a stream that must preserve order keeps its files local + /// and therefore cannot steal from a sibling shared queue. + #[tokio::test] + async fn morsel_preserve_order_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-order + let test = two_partition_morsel_test() + .with_preserve_order(true) + .with_file_stream_events(false); + + // Even though that Partition 1 is polled first, it can not steal files + // from partition 0. The three files originally assigned to Partition 0 + // must be evaluated by Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that `partitioned_by_file_group` disables shared work stealing. + #[tokio::test] + async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-partitioned + let test = two_partition_morsel_test() + .with_partitioned_by_file_group(true) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that an empty sibling can immediately steal shared files when + /// it is polled before the stream that originally owned them. + #[tokio::test] + async fn morsel_empty_sibling_can_steal() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Poll the empty sibling first so it steals both files. + .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Ensures that if a sibling is built and polled + /// before another sibling has been built and contributed its files to the + /// shared queue, the first sibling does not finish prematurely. + #[tokio::test] + async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Build streams lazily so partition 1 can poll the shared queue + // before partition 0 has contributed its files. Once partition 0 + // is built, a later poll of partition 1 can still steal one of + // them from the shared queue. + .with_build_streams_on_first_read(true) + .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)]) + .with_file_stream_events(false); + + // Partition 1 polls too early once, then later steals one file after + // partition 0 has populated the shared queue. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 102 + Done + ----- Partition 1 ----- + Batch: 101 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that one fast sibling can drain shared files that originated + /// in more than one other partition. + #[tokio::test] + async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + // Partition 1 has two files + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 2 starts empty but is polled first, so it should drain + // the shared queue across both sibling partitions. + .with_reads(vec![ + PartitionId(2), + PartitionId(2), + PartitionId(1), + PartitionId(2), + ]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 103 + Done + ----- Partition 2 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Tests how one or more `FileStream`s consume morselized file work. #[derive(Clone)] struct FileStreamMorselTest { morselizer: MockMorselizer, - file_names: Vec, + partition_files: BTreeMap>, + preserve_order: bool, + partitioned_by_file_group: bool, + file_stream_events: bool, + build_streams_on_first_read: bool, + reads: Vec, limit: Option, } @@ -1014,75 +1277,238 @@ mod tests { fn new() -> Self { Self { morselizer: MockMorselizer::new(), - file_names: vec![], + partition_files: BTreeMap::new(), + preserve_order: false, + partitioned_by_file_group: false, + file_stream_events: true, + build_streams_on_first_read: false, + reads: vec![], limit: None, } } - /// Adds one file and its root planner to the test input. - fn with_file(mut self, planner: impl Into) -> Self { + /// Adds one file and its root planner to partition 0. + fn with_file(self, planner: impl Into) -> Self { + self.with_file_in_partition(PartitionId(0), planner) + } + + /// Adds one file and its root planner to the specified input partition. + fn with_file_in_partition( + mut self, + partition: PartitionId, + planner: impl Into, + ) -> Self { let planner = planner.into(); - self.file_names.push(planner.file_path().to_string()); - self.morselizer = self.morselizer.with_file(planner); + let file_path = planner.file_path().to_string(); + self.morselizer = self.morselizer.with_planner(planner); + self.partition_files + .entry(partition) + .or_default() + .push(file_path); self } - /// Sets a global output limit for the stream. + /// Marks the stream (and all partitions) to preserve the specified file + /// order. + fn with_preserve_order(mut self, preserve_order: bool) -> Self { + self.preserve_order = preserve_order; + self + } + + /// Marks the test scan as pre-partitioned by file group, which should + /// force each stream to keep its own files local. + fn with_partitioned_by_file_group( + mut self, + partitioned_by_file_group: bool, + ) -> Self { + self.partitioned_by_file_group = partitioned_by_file_group; + self + } + + /// Controls whether scheduler events are included in the snapshot. + /// + /// When disabled, `run()` still includes the event section header but + /// replaces the trace with a fixed placeholder so tests can focus only + /// on the output batches. + fn with_file_stream_events(mut self, file_stream_events: bool) -> Self { + self.file_stream_events = file_stream_events; + self + } + + /// Controls whether streams are all built up front or lazily on their + /// first read. + /// + /// The default builds all streams before polling begins, which matches + /// normal execution. Tests may enable lazy creation to model races + /// where one sibling polls before another has contributed its files to + /// the shared queue. + fn with_build_streams_on_first_read( + mut self, + build_streams_on_first_read: bool, + ) -> Self { + self.build_streams_on_first_read = build_streams_on_first_read; + self + } + + /// Sets the partition polling order. + /// + /// `run()` polls these partitions in the listed order first. After + /// those explicit reads are exhausted, it completes to round + /// robin across all configured partitions, skipping any streams that + /// have already finished. + /// + /// This allows testing early scheduling decisions explicit in a test + /// while avoiding a fully scripted poll trace for the remainder. + fn with_reads(mut self, reads: Vec) -> Self { + self.reads = reads; + self + } + + /// Sets a global output limit for all streams created by this test. fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } - /// Runs the test returns combined output and scheduler trace text as a String. + /// Runs the test and returns combined stream output and scheduler + /// trace text. async fn run(self) -> Result { let observer = self.morselizer.observer().clone(); observer.clear(); - let config = self.test_config(); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut stream = FileStreamBuilder::new(&config) - .with_partition(0) - .with_morselizer(Box::new(self.morselizer)) - .with_metrics(&metrics_set) - .build()?; + let partition_count = self.num_partitions(); - let mut stream_contents = Vec::new(); - while let Some(result) = stream.next().await { - match result { - Ok(batch) => { - let col = batch.column(0).as_primitive::(); - let batch_id = col.value(0); - stream_contents.push(format!("Batch: {batch_id}")); - } - Err(e) => { - // Pull the actual message for external errors rather than - // relying on DataFusionError formatting, which changes - // if backtraces are enabled, etc - let message = if let DataFusionError::External(generic) = e { - generic.to_string() - } else { - e.to_string() - }; - stream_contents.push(format!("Error: {message}")); - } + let mut partitions = (0..partition_count) + .map(|_| PartitionState::new()) + .collect::>(); + + let mut build_order = Vec::new(); + for partition in self.reads.iter().map(|partition| partition.0) { + if !build_order.contains(&partition) { + build_order.push(partition); + } + } + for partition in 0..partition_count { + if !build_order.contains(&partition) { + build_order.push(partition); } } - stream_contents.push("Done".to_string()); - Ok(format!( - "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", - stream_contents.join("\n"), + let config = self.test_config(); + // `DataSourceExec::execute` creates one execution-local shared + // state object via `create_sibling_state()` and then passes it + // to `open_with_sibling_state(...)`. These tests build + // `FileStream`s directly, bypassing `DataSourceExec`, so they must + // perform the same setup explicitly when exercising sibling-stream + // work stealing. + let shared_work_source = config.create_sibling_state().and_then(|state| { + state.as_ref().downcast_ref::().cloned() + }); + if !self.build_streams_on_first_read { + for partition in build_order { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partitions[partition].set_stream(stream); + } + } + + let mut initial_reads: VecDeque<_> = self.reads.into(); + let mut next_round_robin = 0; + + while !initial_reads.is_empty() + || partitions.iter().any(PartitionState::is_active) + { + let partition = if let Some(partition) = initial_reads.pop_front() { + partition.0 + } else { + let partition = next_round_robin; + next_round_robin = (next_round_robin + 1) % partition_count.max(1); + partition + }; + + let partition_state = &mut partitions[partition]; + + if self.build_streams_on_first_read && !partition_state.built { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partition_state.set_stream(stream); + } + + let Some(stream) = partition_state.stream.as_mut() else { + continue; + }; + + match stream.next().await { + Some(result) => partition_state.push_output(format_result(result)), + None => partition_state.finish(), + } + } + + let output_text = if partition_count == 1 { + format!( + "----- Output Stream -----\n{}", + partitions[0].output.join("\n") + ) + } else { + partitions + .into_iter() + .enumerate() + .map(|(partition, state)| { + format!( + "----- Partition {} -----\n{}", + partition, + state.output.join("\n") + ) + }) + .collect::>() + .join("\n") + }; + + let file_stream_events = if self.file_stream_events { observer.format_events() + } else { + "(omitted due to with_file_stream_events(false))".to_string() + }; + + Ok(format!( + "{output_text}\n----- File Stream Events -----\n{file_stream_events}", )) } - /// Builds the `FileScanConfig` for the configured mock file set. + /// Returns the number of configured partitions, including empty ones + /// that appear only in the explicit read schedule. + fn num_partitions(&self) -> usize { + self.partition_files + .keys() + .map(|partition| partition.0 + 1) + .chain(self.reads.iter().map(|partition| partition.0 + 1)) + .max() + .unwrap_or(1) + } + + /// Builds a `FileScanConfig` covering every configured partition. fn test_config(&self) -> FileScanConfig { - let file_group = self - .file_names - .iter() - .map(|name| PartitionedFile::new(name, 10)) - .collect(); + let file_groups = (0..self.num_partitions()) + .map(|partition| { + self.partition_files + .get(&PartitionId(partition)) + .into_iter() + .flat_map(|files| files.iter()) + .map(|name| PartitionedFile::new(name, 10)) + .collect::>() + .into() + }) + .collect::>(); + let table_schema = TableSchema::new( Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), vec![], @@ -1091,9 +1517,76 @@ mod tests { ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), ) - .with_file_group(file_group) + .with_file_groups(file_groups) .with_limit(self.limit) + .with_preserve_order(self.preserve_order) + .with_partitioned_by_file_group(self.partitioned_by_file_group) .build() } } + + /// Formats one stream poll result into a stable snapshot line. + fn format_result(result: Result) -> String { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + format!("Batch: {batch_id}") + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes if + // backtraces are enabled, etc. + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + format!("Error: {message}") + } + } + } + + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. + struct PartitionState { + /// Whether the `FileStream` for this partition has been built yet. + built: bool, + /// The live stream, if this partition has not finished yet. + stream: Option, + /// Snapshot lines produced by this partition. + output: Vec, + } + + impl PartitionState { + /// Create an unbuilt partition with no output yet. + fn new() -> Self { + Self { + built: false, + stream: None, + output: vec![], + } + } + + /// Returns true if this partition might still produce output. + fn is_active(&self) -> bool { + !self.built || self.stream.is_some() + } + + /// Records that this partition's stream has been built. + fn set_stream(&mut self, stream: FileStream) { + self.stream = Some(stream); + self.built = true; + } + + /// Records one formatted output line for this partition. + fn push_output(&mut self, line: String) { + self.output.push(line); + } + + /// Marks this partition as finished. + fn finish(&mut self) { + self.push_output("Done".to_string()); + self.stream = None; + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 025164c29c8f2..fdae1bcf7e074 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -19,7 +19,6 @@ use datafusion_common::internal_datafusion_err; use std::collections::VecDeque; use std::task::{Context, Poll}; -use crate::PartitionedFile; use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; @@ -27,6 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; +use super::work_source::WorkSource; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -45,7 +45,7 @@ use super::{FileStreamMetrics, OnError}; /// # State Transitions /// /// ```text -/// file_iter +/// work_source /// | /// v /// morselizer.plan_file(file) @@ -62,8 +62,8 @@ use super::{FileStreamMetrics, OnError}; /// /// [`FileStreamState::Scan`]: super::FileStreamState::Scan pub(super) struct ScanState { - /// Files that still need to be planned. - file_iter: VecDeque, + /// Unopened files that still need to be planned for this stream. + work_source: WorkSource, /// Remaining row limit, if any. remain: Option, /// The morselizer used to plan files. @@ -76,7 +76,10 @@ pub(super) struct ScanState { ready_morsels: VecDeque>, /// The active reader, if any. reader: Option>>, - /// The currently outstanding I/O, if any. + /// The single planner currently blocked on I/O, if any. + /// + /// Once the I/O completes, yields the next planner and is pushed back + /// onto `ready_planners`. pending_planner: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, @@ -84,15 +87,14 @@ pub(super) struct ScanState { impl ScanState { pub(super) fn new( - file_iter: impl Into>, + work_source: WorkSource, remain: Option, morselizer: Box, on_error: OnError, metrics: FileStreamMetrics, ) -> Self { - let file_iter = file_iter.into(); Self { - file_iter, + work_source, remain, morselizer, on_error, @@ -170,7 +172,7 @@ impl ScanState { (batch, false) } else { let batch = batch.slice(0, *remain); - let done = 1 + self.file_iter.len(); + let done = 1 + self.work_source.len(); self.metrics.files_processed.add(done); *remain = 0; (batch, true) @@ -263,8 +265,8 @@ impl ScanState { }; } - // No outstanding work remains, so morselize the next unopened file. - let part_file = match self.file_iter.pop_front() { + // No outstanding work remains, so begin planning the next unopened file. + let part_file = match self.work_source.pop_front() { Some(part_file) => part_file, None => return ScanAndReturn::Done(None), }; diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs new file mode 100644 index 0000000000000..1dcb6082c47c7 --- /dev/null +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::PartitionedFile; +use crate::file_groups::FileGroup; +use crate::file_scan_config::FileScanConfig; +use parking_lot::Mutex; + +/// Source of work for `ScanState`. +/// +/// Streams that may share work across siblings use [`WorkSource::Shared`], +/// while streams that can not share work (e.g. because they must preserve file +/// order) use [`WorkSource::Local`]. +#[derive(Debug, Clone)] +pub(super) enum WorkSource { + /// Files this stream will plan locally without sharing them. + Local(VecDeque), + /// Files shared with sibling streams. + Shared(SharedWorkSource), +} + +impl WorkSource { + /// Pop the next file to plan from this work source. + pub(super) fn pop_front(&mut self) -> Option { + match self { + Self::Local(files) => files.pop_front(), + Self::Shared(shared) => shared.pop_front(), + } + } + + /// Return the number of files that are still waiting to be planned. + pub(super) fn len(&self) -> usize { + match self { + Self::Local(files) => files.len(), + Self::Shared(shared) => shared.len(), + } + } +} + +/// Shared source of work for sibling `FileStream`s +/// +/// The queue is created once per execution and shared by all reorderable +/// sibling streams for that execution. Whichever stream becomes idle first may +/// take the next unopened file from the front of the queue. +/// +/// It uses a [`Mutex`] internally to provide thread-safe access +/// to the shared file queue. +#[derive(Debug, Clone)] +pub(crate) struct SharedWorkSource { + inner: Arc, +} + +#[derive(Debug, Default)] +pub(super) struct SharedWorkSourceInner { + files: Mutex>, +} + +impl SharedWorkSource { + /// Create a shared work source containing the provided unopened files. + pub(crate) fn new(files: impl IntoIterator) -> Self { + let files = files.into_iter().collect(); + Self { + inner: Arc::new(SharedWorkSourceInner { + files: Mutex::new(files), + }), + } + } + + /// Create a shared work source for the unopened files in `config`. + pub(crate) fn from_config(config: &FileScanConfig) -> Self { + Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) + } + + /// Pop the next file from the shared work queue. + /// + /// Returns `None` if the queue is empty + fn pop_front(&self) -> Option { + self.inner.files.lock().pop_front() + } + + /// Return the number of files still waiting in the shared queue. + fn len(&self) -> usize { + self.inner.files.lock().len() + } +} diff --git a/datafusion/datasource/src/morsel/mocks.rs b/datafusion/datasource/src/morsel/mocks.rs index cd1fa3732ffea..ceb0e720691a7 100644 --- a/datafusion/datasource/src/morsel/mocks.rs +++ b/datafusion/datasource/src/morsel/mocks.rs @@ -295,8 +295,11 @@ impl MockPlanBuilder { } /// Add a ready child planner - pub(crate) fn with_ready_planner(self, ready_planners: MockPlanner) -> Self { - self.with_ready_planners(vec![ready_planners]) + pub(crate) fn with_ready_planner( + self, + ready_planner: impl Into, + ) -> Self { + self.with_ready_planners(vec![ready_planner.into()]) } /// Add ready child planners produced by this planning step. @@ -430,8 +433,9 @@ impl MockMorselizer { &self.observer } - /// Associates a file path with the planner spec used to open it. - pub(crate) fn with_file(mut self, planner: MockPlanner) -> Self { + /// Specify the return planner for the specified file_path + pub(crate) fn with_planner(mut self, planner: impl Into) -> Self { + let planner = planner.into(); self.files.insert(planner.file_path.clone(), planner); self } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 81e15d0a2a092..afed439788463 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_plan::execution_plan::{ @@ -123,12 +123,23 @@ use datafusion_physical_plan::filter_pushdown::{ /// └─────────────────────┘ /// ``` pub trait DataSource: Send + Sync + Debug { + /// Open the specified output partition and return its stream of + /// [`RecordBatch`]es. + /// + /// This should be used by data sources that do not need any sibling + /// coordination. Data sources that want to use per-execution shared state + /// (for example, to reorder work across partitions at runtime) should + /// implement [`Self::open_with_args`] instead. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch fn open( &self, partition: usize, context: Arc, ) -> Result; + fn as_any(&self) -> &dyn Any; + /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; @@ -246,6 +257,55 @@ pub trait DataSource: Send + Sync + Debug { ) -> Option> { None } + + /// Create per execution state to share across sibling instances of this + /// data source during one execution. + /// + /// Returns `None` (the default) if this data source has + /// no sibling-shared execution state. + fn create_sibling_state(&self) -> Option> { + None + } + + /// Open a partition using optional sibling-shared execution state. + /// + /// The default implementation ignores the additional state and delegates to + /// [`Self::open`]. + fn open_with_args(&self, args: OpenArgs) -> Result { + self.open(args.partition, args.context) + } +} + +/// Arguments for [`DataSource::open_with_args`] +#[derive(Debug, Clone)] +pub struct OpenArgs { + /// Which partition to open + pub partition: usize, + /// The task context for execution + pub context: Arc, + /// Optional sibling-shared execution state, see + /// [`DataSource::create_sibling_state`] for details. + pub sibling_state: Option>, +} + +impl OpenArgs { + /// Create a new OpenArgs with required arguments + pub fn new(partition: usize, context: Arc) -> Self { + Self { + partition, + context, + sibling_state: None, + } + } + + /// Set sibling shared state + pub fn with_shared_state( + mut self, + sibling_state: Option>, + ) -> Self { + self.sibling_state = sibling_state; + self + } } /// [`ExecutionPlan`] that reads one or more files @@ -266,6 +326,12 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: Arc, + /// Per executon state shared across partitions of this plan. + /// + /// Created by [`DataSource::create_sibling_state`] + /// and then passed to + /// [`DataSource::open_with_args`]. + execution_state: Arc>>>, } impl DisplayAs for DataSourceExec { @@ -339,8 +405,15 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> Result { - let stream = self.data_source.open(partition, Arc::clone(&context))?; + let shared_state = self + .execution_state + .get_or_init(|| self.data_source.create_sibling_state()) + .clone(); + let args = OpenArgs::new(partition, Arc::clone(&context)) + .with_shared_state(shared_state); + let stream = self.data_source.open_with_args(args)?; let batch_size = context.session_config().batch_size(); + log::debug!( "Batch splitting enabled for partition {partition}: batch_size={batch_size}" ); @@ -377,8 +450,13 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = Arc::clone(&self.cache); + let execution_state = Arc::new(OnceLock::new()); - Some(Arc::new(Self { data_source, cache })) + Some(Arc::new(Self { + data_source, + cache, + execution_state, + })) } fn fetch(&self) -> Option { @@ -471,6 +549,12 @@ impl ExecutionPlan for DataSourceExec { as Arc }) } + + fn reset_state(self: Arc) -> Result> { + let mut new_exec = Arc::unwrap_or_clone(self); + new_exec.execution_state = Arc::new(OnceLock::new()); + Ok(Arc::new(new_exec)) + } } impl DataSourceExec { @@ -484,6 +568,7 @@ impl DataSourceExec { Self { data_source, cache: Arc::new(cache), + execution_state: Arc::new(OnceLock::new()), } } @@ -495,6 +580,7 @@ impl DataSourceExec { pub fn with_data_source(mut self, data_source: Arc) -> Self { self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; + self.execution_state = Arc::new(OnceLock::new()); self } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index 3a9e78943b07b..5ce0f1419d11d 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -34,6 +34,7 @@ pub(crate) struct MockSource { filter: Option>, table_schema: crate::table_schema::TableSchema, projection: crate::projection::SplitProjection, + file_opener: Option>, } impl Default for MockSource { @@ -45,6 +46,7 @@ impl Default for MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } } @@ -57,6 +59,7 @@ impl MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } @@ -64,6 +67,11 @@ impl MockSource { self.filter = Some(filter); self } + + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } } impl FileSource for MockSource { @@ -73,7 +81,9 @@ impl FileSource for MockSource { _base_config: &FileScanConfig, _partition: usize, ) -> Result> { - unimplemented!() + self.file_opener.clone().ok_or_else(|| { + datafusion_common::internal_datafusion_err!("MockSource missing FileOpener") + }) } fn as_any(&self) -> &dyn std::any::Any { From 5287210ff44c2a1d1372c6badc641659511fe8c2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 14 Apr 2026 12:56:48 -0400 Subject: [PATCH 02/16] fix: typos --- datafusion/datasource/src/source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index afed439788463..4bdf7bf795f41 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -326,7 +326,7 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: Arc, - /// Per executon state shared across partitions of this plan. + /// Per execution state shared across partitions of this plan. /// /// Created by [`DataSource::create_sibling_state`] /// and then passed to From 4cefb3e7d90c6324b32f7ad73e56d063017bfc01 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Apr 2026 15:14:57 -0400 Subject: [PATCH 03/16] Update datafusion/datasource/src/file_stream/mod.rs Co-authored-by: Oleks V --- datafusion/datasource/src/file_stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index b9aabacf64c11..34b92ee5338b9 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -1059,7 +1059,7 @@ mod tests { let test = two_partition_morsel_test().with_file_stream_events(false); // Partition 0 starts with 3 files, but Partition 1 is polled first. - // Since Partition is polled first, it will run all the files even those + // Since Partition 1 is polled first, it will run all the files even those // that were assigned to Partition 0. insta::assert_snapshot!(test.run().await.unwrap(), @r" ----- Partition 0 ----- From cd793122b0dfb4b6f376c9fbd074f91f4c0bedab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Apr 2026 14:05:45 -0400 Subject: [PATCH 04/16] Add test for file opened/closed metrics --- datafusion/datasource/src/file_stream/mod.rs | 84 ++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 34b92ee5338b9..e277690cff810 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -1208,6 +1208,72 @@ mod tests { Ok(()) } + /// Verifies that a sibling hitting its limit does not count shared files + /// left in the queue as already processed by that stream. + #[tokio::test] + async fn morsel_shared_limit_does_not_double_count_files_processed() -> Result<()> { + let test = two_partition_morsel_test(); + let unlimited_config = test.test_config(); + let limited_config = test.clone().with_limit(1).test_config(); + let shared_work_source = limited_config + .create_sibling_state() + .and_then(|state| state.as_ref().downcast_ref::().cloned()) + .expect("shared work source"); + let limited_metrics = ExecutionPlanMetricsSet::new(); + let unlimited_metrics = ExecutionPlanMetricsSet::new(); + + let limited_stream = FileStreamBuilder::new(&limited_config) + .with_partition(1) + .with_shared_work_source(Some(shared_work_source.clone())) + .with_morselizer(Box::new(test.morselizer.clone())) + .with_metrics(&limited_metrics) + .build()?; + + let unlimited_stream = FileStreamBuilder::new(&unlimited_config) + .with_partition(0) + .with_shared_work_source(Some(shared_work_source)) + .with_morselizer(Box::new(test.morselizer)) + .with_metrics(&unlimited_metrics) + .build()?; + + let limited_output = drain_stream_output(limited_stream).await?; + let unlimited_output = drain_stream_output(unlimited_stream).await?; + + insta::assert_snapshot!(format!( + "----- Limited Stream -----\n{limited_output}\n----- Unlimited Stream -----\n{unlimited_output}" + ), @r" + ----- Limited Stream ----- + Batch: 101 + ----- Unlimited Stream ----- + Batch: 102 + Batch: 103 + Batch: 201 + "); + + assert_eq!( + metric_count(&limited_metrics, "files_opened"), + 1, + "the limited stream should only open the file that produced its output" + ); + assert_eq!( + metric_count(&limited_metrics, "files_processed"), + 1, + "the limited stream should only mark its own file as processed" + ); + assert_eq!( + metric_count(&unlimited_metrics, "files_opened"), + 3, + "the draining stream should open the remaining shared files" + ); + assert_eq!( + metric_count(&unlimited_metrics, "files_processed"), + 3, + "the draining stream should process exactly the files it opened" + ); + + Ok(()) + } + /// Verifies that one fast sibling can drain shared files that originated /// in more than one other partition. #[tokio::test] @@ -1547,6 +1613,24 @@ mod tests { } } + async fn drain_stream_output(stream: FileStream) -> Result { + let output = stream + .collect::>() + .await + .into_iter() + .map(|result| result.map(|batch| format_result(Ok(batch)))) + .collect::>>()?; + Ok(output.join("\n")) + } + + fn metric_count(metrics: &ExecutionPlanMetricsSet, name: &str) -> usize { + metrics + .clone_inner() + .sum_by_name(name) + .unwrap_or_else(|| panic!("missing metric: {name}")) + .as_usize() + } + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. struct PartitionState { /// Whether the `FileStream` for this partition has been built yet. From c58a9a8746a7d78abd6c14562a847cc1ef2907f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 Apr 2026 14:07:30 -0400 Subject: [PATCH 05/16] properly account for limit --- datafusion/datasource/src/file_stream/scan_state.rs | 2 +- datafusion/datasource/src/file_stream/work_source.rs | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index fdae1bcf7e074..21125cd08896c 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -172,7 +172,7 @@ impl ScanState { (batch, false) } else { let batch = batch.slice(0, *remain); - let done = 1 + self.work_source.len(); + let done = 1 + self.work_source.skipped_on_limit(); self.metrics.files_processed.add(done); *remain = 0; (batch, true) diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs index 1dcb6082c47c7..7f31dacca9592 100644 --- a/datafusion/datasource/src/file_stream/work_source.rs +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -45,11 +45,12 @@ impl WorkSource { } } - /// Return the number of files that are still waiting to be planned. - pub(super) fn len(&self) -> usize { + /// Return how many queued files should be counted as already processed + /// when this stream stops early after hitting a global limit. + pub(super) fn skipped_on_limit(&self) -> usize { match self { Self::Local(files) => files.len(), - Self::Shared(shared) => shared.len(), + Self::Shared(_) => 0, } } } @@ -94,9 +95,4 @@ impl SharedWorkSource { fn pop_front(&self) -> Option { self.inner.files.lock().pop_front() } - - /// Return the number of files still waiting in the shared queue. - fn len(&self) -> usize { - self.inner.files.lock().len() - } } From 910c5208b934b5ca12aaa076b612706a6a866e69 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:38:47 +0000 Subject: [PATCH 06/16] feat: reorder row groups by statistics during sort pushdown When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes #21317 --- .../datasource-parquet/src/access_plan.rs | 326 ++++++++++++++++++ .../src/access_plan_optimizer.rs | 107 ++++++ datafusion/datasource-parquet/src/opener.rs | 22 +- datafusion/datasource-parquet/src/source.rs | 10 +- 4 files changed, 462 insertions(+), 3 deletions(-) create mode 100644 datafusion/datasource-parquet/src/access_plan_optimizer.rs diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a44..d98f64044db37 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -16,7 +16,12 @@ // under the License. use crate::sort::reverse_row_selection; +use arrow::datatypes::Schema; use datafusion_common::{Result, assert_eq_or_internal_err}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use log::debug; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; @@ -377,6 +382,106 @@ impl PreparedAccessPlan { }) } + /// Reorder row groups by their min statistics for the given sort order. + /// + /// This helps TopK queries find optimal values first. For ASC sort, + /// row groups with the smallest min values come first. For DESC sort, + /// row groups with the largest min values come first. + /// + /// Gracefully skips reordering when: + /// - There is a row_selection (too complex to remap) + /// - 0 or 1 row groups (nothing to reorder) + /// - Sort expression is not a simple column reference + /// - Statistics are unavailable + pub(crate) fn reorder_by_statistics( + mut self, + sort_order: &LexOrdering, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + // Skip if row_selection present (too complex to remap) + if self.row_selection.is_some() { + debug!("Skipping RG reorder: row_selection present"); + return Ok(self); + } + + // Nothing to reorder + if self.row_group_indexes.len() <= 1 { + return Ok(self); + } + + // Get the first sort expression + // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr + let first_sort_expr = sort_order.first(); + + // Extract column name from sort expression + let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::() + { + Some(col) => col, + None => { + debug!("Skipping RG reorder: sort expr is not a simple column"); + return Ok(self); + } + }; + + let descending = first_sort_expr.options.descending; + + // Build statistics converter for this column + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + file_metadata.file_metadata().schema_descr(), + ) { + Ok(c) => c, + Err(e) => { + debug!("Skipping RG reorder: cannot create stats converter: {e}"); + return Ok(self); + } + }; + + // Get min values for the selected row groups + let rg_metadata: Vec<&RowGroupMetaData> = self + .row_group_indexes + .iter() + .map(|&idx| file_metadata.row_group(idx)) + .collect(); + + let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } + }; + + // Sort indices by min values + let sort_options = arrow::compute::SortOptions { + descending, + nulls_first: first_sort_expr.options.nulls_first, + }; + let sorted_indices = match arrow::compute::sort_to_indices( + &min_values, + Some(sort_options), + None, + ) { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; + + // Apply the reordering + let original_indexes = self.row_group_indexes.clone(); + self.row_group_indexes = sorted_indices + .values() + .iter() + .map(|&i| original_indexes[i as usize]) + .collect(); + + Ok(self) + } + /// Reverse the access plan for reverse scanning pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing @@ -614,4 +719,225 @@ mod test { .unwrap(); Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + // ---- reorder_by_statistics tests ---- + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::FileMetaData; + use parquet::file::statistics::Statistics; + use parquet::schema::types::Type as SchemaType; + + /// Create ParquetMetaData with row groups that have Int32 min/max stats + fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData { + let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema))); + + let row_groups: Vec = min_max_pairs + .iter() + .map(|(min, max)| { + let stats = + Statistics::int32(Some(*min), Some(*max), None, Some(100), false); + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(100) + .set_statistics(stats) + .build() + .unwrap(); + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(100) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect(); + + let file_meta = FileMetaData::new( + 1, + min_max_pairs.len() as i64 * 100, + None, + None, + schema_descr, + None, + ); + ParquetMetaData::new(file_meta, row_groups) + } + + fn make_sort_order_asc() -> LexOrdering { + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "id", 0, + )))]) + .unwrap() + } + + fn make_sort_order_desc() -> LexOrdering { + use arrow::compute::SortOptions; + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("id", 0)), + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap() + } + + fn make_arrow_schema() -> Schema { + Schema::new(vec![Field::new("id", DataType::Int32, false)]) + } + + #[test] + fn test_reorder_by_statistics_asc() { + // RGs in wrong order: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299) + assert_eq!(plan.row_group_indexes, vec![2, 0, 1]); + } + + #[test] + fn test_reorder_by_statistics_desc() { + // RGs: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30) + assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + } + + #[test] + fn test_reorder_by_statistics_single_rg() { + let metadata = make_metadata_with_stats(&[(1, 100)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Single RG, no reorder + assert_eq!(plan.row_group_indexes, vec![0]); + } + + #[test] + fn test_reorder_by_statistics_with_skipped_rgs() { + // 4 RGs but only 0, 2, 3 are selected (RG1 was pruned) + let metadata = + make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400) + assert_eq!(plan.row_group_indexes, vec![2, 3, 0]); + } + + #[test] + fn test_reorder_by_statistics_skips_with_row_selection() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let selection = RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ]); + + let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because row_selection is present + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_already_sorted() { + // Already in correct ASC order + let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Already sorted, order preserved + assert_eq!(plan.row_group_indexes, vec![0, 1, 2]); + } + + #[test] + fn test_reorder_by_statistics_skips_non_column_expr() { + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::BinaryExpr; + + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + + // Sort expression is a binary expression (id + 1), not a simple column + let expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Plus, + Arc::new(datafusion_physical_expr::expressions::Literal::new( + datafusion_common::ScalarValue::Int32(Some(1)), + )), + )); + let sort_order = + LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because sort expr is not a simple column + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_skips_missing_column() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + // Schema has "id" but sort order references "nonexistent" + let schema = make_arrow_schema(); + let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new( + Column::new("nonexistent", 99), + ))]) + .unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because column not found in schema + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } } diff --git a/datafusion/datasource-parquet/src/access_plan_optimizer.rs b/datafusion/datasource-parquet/src/access_plan_optimizer.rs new file mode 100644 index 0000000000000..885dc0b5656ee --- /dev/null +++ b/datafusion/datasource-parquet/src/access_plan_optimizer.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`AccessPlanOptimizer`] trait and implementations for optimizing +//! row group access order during parquet scans. +//! +//! Applied after row group pruning but before building the decoder, +//! these optimizers reorder (or reverse) the row groups to improve +//! query performance — e.g., placing the "best" row groups first +//! so TopK's dynamic filter threshold tightens quickly. + +use crate::access_plan::PreparedAccessPlan; +use arrow::datatypes::Schema; +use datafusion_common::Result; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use parquet::file::metadata::ParquetMetaData; +use std::fmt::Debug; + +/// Optimizes the row group access order for a prepared access plan. +/// +/// Implementations can reorder, reverse, or otherwise transform the +/// row group read order to improve scan performance. The optimizer +/// is applied once per file, after all pruning passes are complete. +/// +/// # Examples +/// +/// - [`ReverseRowGroups`]: simple O(n) reversal for DESC on ASC-sorted data +/// - [`ReorderByStatistics`]: sort row groups by min/max statistics +/// so TopK queries find optimal values first +pub(crate) trait AccessPlanOptimizer: Send + Sync + Debug { + /// Transform the prepared access plan. + /// + /// Implementations should return the plan unchanged if they cannot + /// apply their optimization (e.g., missing statistics). + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result; +} + +/// Reverse the row group order — simple O(n) reversal. +/// +/// Used as a fallback when the sort column has no statistics available. +/// For ASC-sorted files with a DESC query, reversing row groups places +/// the highest-value row groups first. +#[derive(Debug)] +pub(crate) struct ReverseRowGroups; + +impl AccessPlanOptimizer for ReverseRowGroups { + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + _arrow_schema: &Schema, + ) -> Result { + plan.reverse(file_metadata) + } +} + +/// Reorder row groups by min/max statistics of the sort column. +/// +/// For ASC sort: row groups with the smallest min come first. +/// For DESC sort: row groups with the largest max come first. +/// +/// This is more effective than [`ReverseRowGroups`] when row groups +/// are out of order (e.g., append-heavy workloads), because it uses +/// actual statistics rather than assuming the original order is sorted. +/// +/// Gracefully falls back to the original order when statistics are +/// unavailable, the sort expression is not a simple column, etc. +#[derive(Debug)] +pub(crate) struct ReorderByStatistics { + sort_order: LexOrdering, +} + +impl ReorderByStatistics { + pub(crate) fn new(sort_order: LexOrdering) -> Self { + Self { sort_order } + } +} + +impl AccessPlanOptimizer for ReorderByStatistics { + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + plan.reorder_by_statistics(&self.sort_order, file_metadata, arrow_schema) + } +} diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..5a381f6bf975f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -50,6 +50,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, PruningMetrics, @@ -136,6 +137,10 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by their min/max statistics. + /// When set, row groups are reordered before reading so that row groups likely + /// to contain optimal values (for TopK queries) are read first. + pub sort_order_for_reorder: Option, } impl fmt::Debug for ParquetMorselizer { @@ -286,6 +291,7 @@ struct PreparedParquetOpen { predicate_creation_errors: Count, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -655,6 +661,7 @@ impl ParquetMorselizer { predicate_creation_errors, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, @@ -1126,6 +1133,16 @@ impl RowGroupsPrunedParquetOpen { // Prepare the access plan (extract row groups and row selection) let mut prepared_plan = access_plan.prepare(rg_metadata)?; + // Reorder row groups by statistics if sort order is known. + // This helps TopK queries find optimal values first. + if let Some(sort_order) = &prepared.sort_order_for_reorder { + prepared_plan = prepared_plan.reorder_by_statistics( + sort_order, + file_metadata.as_ref(), + &prepared.physical_file_schema, + )?; + } + // Potentially reverse the access plan for performance. // See `ParquetSource::try_pushdown_sort` for the rationale. if prepared.reverse_row_groups { @@ -1644,6 +1661,7 @@ mod test { use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; + use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::StreamExt; use futures::stream::BoxStream; @@ -1675,6 +1693,7 @@ mod test { coerce_int96: Option, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, } @@ -1701,6 +1720,7 @@ mod test { coerce_int96: None, max_predicate_cache_size: None, reverse_row_groups: false, + sort_order_for_reorder: None, preserve_order: false, } } @@ -1816,6 +1836,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder, } } } @@ -1840,7 +1861,6 @@ mod test { let Some(planner) = planners.pop_front() else { return Ok(Box::pin(futures::stream::empty())); - }; if let Some(mut plan) = planner.plan()? { morsels.extend(plan.take_morsels()); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..5586e29a9eae5 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -55,7 +55,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use itertools::Itertools; use object_store::ObjectStore; #[cfg(feature = "parquet_encryption")] @@ -294,6 +294,8 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by min/max statistics. + sort_order_for_reorder: Option, } impl ParquetSource { @@ -319,6 +321,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + sort_order_for_reorder: None, } } @@ -580,6 +583,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), })) } @@ -821,7 +825,9 @@ impl FileSource for ParquetSource { // Return Inexact because we're only reversing row group order, // not guaranteeing perfect row-level ordering - let new_source = self.clone().with_reverse_row_groups(true); + let sort_order = LexOrdering::new(order.iter().cloned()); + let mut new_source = self.clone().with_reverse_row_groups(true); + new_source.sort_order_for_reorder = sort_order; Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) From 10803e2f29369a5df84ab602871bbeade7746994 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:48:06 +0000 Subject: [PATCH 07/16] test: add SLT tests for row group reorder by statistics --- .../sqllogictest/test_files/sort_pushdown.slt | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f3977010..4f41c4b08aecf 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2271,6 +2271,89 @@ DROP TABLE tg_src_high; statement ok DROP TABLE tg_buffer; +# =========================================================== +# Test H: Row group reorder by statistics for TopK queries. +# When a file has multiple row groups with overlapping or +# out-of-order statistics, sort pushdown returns Inexact and +# `reorder_by_statistics` reorders row groups within the file +# so TopK finds optimal values first. +# =========================================================== + +# Create a table with 30 rows and write to parquet with small row groups +# so we get multiple row groups per file. Rows are inserted in a mixed +# order so row groups span overlapping ranges (forcing Inexact path). +statement ok +CREATE TABLE th_mixed(id INT, value INT) AS VALUES + (15, 150), (5, 50), (25, 250), + (10, 100), (20, 200), (1, 10), + (30, 300), (3, 30), (18, 180); + +# Write with row_group_size=3 → 3 rows per RG, 3 RGs total +# RG statistics (unsorted order): RG0(5-25), RG1(1-20), RG2(3-30) +# Note: files are overlapping → Inexact path → TopK retained +query I +COPY (SELECT * FROM th_mixed) +TO 'test_files/scratch/sort_pushdown/th_reorder/data.parquet' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' '3'); +---- +9 + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +CREATE EXTERNAL TABLE th_reorder(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; + +# Test H.1: ASC ORDER BY with LIMIT — reorder helps TopK find min values first +# Results must be correct regardless of RG reorder. +query II +SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query II +SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +30 300 +25 250 +20 200 + +# Test H.3: Full sort (no LIMIT) — output must still be correctly sorted +query II +SELECT * FROM th_reorder ORDER BY id ASC; +---- +1 10 +3 30 +5 50 +10 100 +15 150 +18 180 +20 200 +25 250 +30 300 + +# Test H.4: ORDER BY expression (not a simple column) — reorder should +# gracefully skip, results still correct +query II +SELECT id, value FROM th_reorder ORDER BY id + 1 ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Cleanup Test H +statement ok +DROP TABLE th_mixed; + +statement ok +DROP TABLE th_reorder; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; From 714477aeaebc4c5f0a3eb661199620ede11815a3 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:56:39 +0000 Subject: [PATCH 08/16] test: add EXPLAIN assertions for row group reorder tests --- .../sqllogictest/test_files/sort_pushdown.slt | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 4f41c4b08aecf..a9d512228fc14 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2307,7 +2307,19 @@ CREATE EXTERNAL TABLE th_reorder(id INT, value INT) STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; -# Test H.1: ASC ORDER BY with LIMIT — reorder helps TopK find min values first +# Test H.1: ASC ORDER BY with LIMIT — Inexact path (file has no declared ordering) +# Plan: SortExec(TopK) preserved. RG reorder happens inside DataSourceExec +# (not visible in EXPLAIN, but verified by unit tests in access_plan.rs). +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id ASC NULLS LAST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + # Results must be correct regardless of RG reorder. query II SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; @@ -2317,6 +2329,16 @@ SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; 5 50 # Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id DESC NULLS FIRST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + query II SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; ---- From 768e44d4329785ec3ea6e4f0e64f0d9cc153d670 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 07:35:19 +0000 Subject: [PATCH 09/16] fix: use max statistics for DESC sort reorder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For overlapping row group ranges, sorting by min for DESC can pick a worse first RG. Example: RG0(50-60) vs RG1(40-100) — min DESC picks RG0 first (max=60), but RG1 contains the largest values (max=100). Use min for ASC and max for DESC to correctly prioritize the row group most likely to contain the optimal values for TopK. --- .../datasource-parquet/src/access_plan.rs | 77 ++++++++++++++----- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index d98f64044db37..3f1a0a675e361 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -439,37 +439,51 @@ impl PreparedAccessPlan { } }; - // Get min values for the selected row groups + // Get the relevant statistics for the selected row groups. + // For ASC sort: use min values — we want the RG with the smallest min + // to come first (best candidate for "smallest values"). + // For DESC sort: use max values — we want the RG with the largest max + // to come first (best candidate for "largest values"). Using min for + // DESC can pick a worse first RG when ranges overlap (e.g., RG0 50-60 + // vs RG1 40-100 — RG1 has larger values but smaller min). let rg_metadata: Vec<&RowGroupMetaData> = self .row_group_indexes .iter() .map(|&idx| file_metadata.row_group(idx)) .collect(); - let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { - Ok(vals) => vals, - Err(e) => { - debug!("Skipping RG reorder: cannot get min values: {e}"); - return Ok(self); + let stat_values = if descending { + match converter.row_group_maxes(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get max values: {e}"); + return Ok(self); + } + } + } else { + match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } } }; - // Sort indices by min values + // Sort indices by statistic values (min for ASC, max for DESC) let sort_options = arrow::compute::SortOptions { descending, nulls_first: first_sort_expr.options.nulls_first, }; - let sorted_indices = match arrow::compute::sort_to_indices( - &min_values, - Some(sort_options), - None, - ) { - Ok(indices) => indices, - Err(e) => { - debug!("Skipping RG reorder: sort failed: {e}"); - return Ok(self); - } - }; + let sorted_indices = + match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None) + { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; // Apply the reordering let original_indexes = self.row_group_indexes.clone(); @@ -821,7 +835,7 @@ mod test { .reorder_by_statistics(&sort_order, &metadata, &schema) .unwrap(); - // DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30) + // DESC: largest max first: RG1(max=299), RG0(max=99), RG2(max=30) assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); } @@ -922,6 +936,31 @@ mod test { assert_eq!(plan.row_group_indexes, vec![0, 1]); } + #[test] + fn test_reorder_by_statistics_desc_uses_max_for_overlapping_rgs() { + // Overlapping ranges where min DESC would pick worse RG than max DESC: + // RG0: 50-60 (small range, moderate max) + // RG1: 40-100 (wide range, high max but lower min) + // RG2: 20-30 (low max) + // + // For ORDER BY DESC LIMIT N: + // Using min DESC: [RG0(50), RG1(40), RG2(20)] → reads RG0 first (max=60 only) + // Using max DESC: [RG1(100), RG0(60), RG2(30)] → reads RG1 first (max=100) + // + // RG1 is the better first choice for DESC because it contains the largest values. + let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Expected: RG1 (max=100) first, then RG0 (max=60), then RG2 (max=30) + assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + } + #[test] fn test_reorder_by_statistics_skips_missing_column() { let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); From 1d06fe771850a55abe3c917106e7842faa9f6ea2 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 14 Apr 2026 06:23:36 +0000 Subject: [PATCH 10/16] fix: prevent reorder+reverse double-reordering of row groups When sort_order_for_reorder is set, reorder_by_statistics already handles the sort direction (min for ASC, max for DESC). Applying reverse on top would undo the reorder. Use else-if so only one strategy is applied. Also adds sort_pushdown_inexact benchmark with pushdown_filters enabled to measure RG reorder benefit on wide-row TopK queries. --- benchmarks/src/sort_pushdown.rs | 9 ++++++++- datafusion/datasource-parquet/src/opener.rs | 13 +++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/sort_pushdown.rs b/benchmarks/src/sort_pushdown.rs index e7fce1921e7a8..e2a4615a3ef39 100644 --- a/benchmarks/src/sort_pushdown.rs +++ b/benchmarks/src/sort_pushdown.rs @@ -159,7 +159,14 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let sql = self.load_query(query_id)?; - let config = self.common.config()?; + let mut config = self.common.config()?; + // Enable parquet filter pushdown + late materialization. This is + // essential for the Inexact sort pushdown path: TopK's dynamic + // filter is pushed to the parquet reader, so only sort-column + // rows pass the filter's Decode non-sort columns are skipped for + // rows that don't pass the filter — this is where RG reorder's + // tight-threshold-first strategy pays off for wide-row queries. + config.options_mut().execution.parquet.pushdown_filters = true; let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5a381f6bf975f..fa5bbaced9878 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1134,18 +1134,19 @@ impl RowGroupsPrunedParquetOpen { let mut prepared_plan = access_plan.prepare(rg_metadata)?; // Reorder row groups by statistics if sort order is known. - // This helps TopK queries find optimal values first. + // This helps TopK queries find optimal values first by placing + // row groups with optimal min/max values at the front. + // When reorder is active, skip reverse — reorder already encodes + // the direction (uses min for ASC, max for DESC). if let Some(sort_order) = &prepared.sort_order_for_reorder { prepared_plan = prepared_plan.reorder_by_statistics( sort_order, file_metadata.as_ref(), &prepared.physical_file_schema, )?; - } - - // Potentially reverse the access plan for performance. - // See `ParquetSource::try_pushdown_sort` for the rationale. - if prepared.reverse_row_groups { + } else if prepared.reverse_row_groups { + // Fallback: simple reverse when no sort order statistics available. + // See `ParquetSource::try_pushdown_sort` for the rationale. prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } From d1357631630038ff08df4d0e5905e192cffdc909 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 16 Apr 2026 10:41:56 +0000 Subject: [PATCH 11/16] fix: rebase conflicts and compilation errors --- datafusion/datasource-parquet/src/access_plan.rs | 3 +-- datafusion/datasource-parquet/src/opener.rs | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 3f1a0a675e361..77b538bd52249 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -415,8 +415,7 @@ impl PreparedAccessPlan { let first_sort_expr = sort_order.first(); // Extract column name from sort expression - let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::() - { + let column: &Column = match first_sort_expr.expr.downcast_ref::() { Some(col) => col, None => { debug!("Skipping RG reorder: sort expr is not a simple column"); diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index fa5bbaced9878..d91fc9900f445 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1862,6 +1862,7 @@ mod test { let Some(planner) = planners.pop_front() else { return Ok(Box::pin(futures::stream::empty())); + }; if let Some(mut plan) = planner.plan()? { morsels.extend(plan.take_morsels()); From 32b1b951d31f5ffbb86a36e2c80b1f90f3136da4 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 16 Apr 2026 12:13:51 +0000 Subject: [PATCH 12/16] refactor: introduce AccessPlanOptimizer trait for row group reordering Address review feedback: extract row group optimization into a trait instead of post-pass flags. prepare_with_optimizer() integrates the optimization into the access plan preparation step. - AccessPlanOptimizer trait with ReverseRowGroups and ReorderByStatistics - prepare_with_optimizer() applies optimizer inside prepare flow - Original prepare() unchanged for backward compatibility --- .../datasource-parquet/src/access_plan.rs | 15 ++++++- datafusion/datasource-parquet/src/mod.rs | 1 + datafusion/datasource-parquet/src/opener.rs | 44 ++++++++++++------- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 77b538bd52249..32085fa8a177a 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -351,9 +351,22 @@ impl ParquetAccessPlan { ) -> Result { let row_group_indexes = self.row_group_indexes(); let row_selection = self.into_overall_row_selection(row_group_meta_data)?; - PreparedAccessPlan::new(row_group_indexes, row_selection) } + + /// Like [`prepare`](Self::prepare), but also applies an + /// [`AccessPlanOptimizer`] to reorder/reverse row groups after + /// preparing the plan. + pub(crate) fn prepare_with_optimizer( + self, + row_group_meta_data: &[RowGroupMetaData], + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + optimizer: &dyn crate::access_plan_optimizer::AccessPlanOptimizer, + ) -> Result { + let plan = self.prepare(row_group_meta_data)?; + optimizer.optimize(plan, file_metadata, arrow_schema) + } } /// Represents a prepared, fully resolved [`ParquetAccessPlan`] diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..de42c527845fb 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -25,6 +25,7 @@ #![cfg_attr(test, allow(clippy::needless_pass_by_value))] pub mod access_plan; +pub(crate) mod access_plan_optimizer; pub mod file_format; pub mod metadata; mod metrics; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index d91fc9900f445..41dacd04dc3e8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1130,25 +1130,35 @@ impl RowGroupsPrunedParquetOpen { ); } - // Prepare the access plan (extract row groups and row selection) - let mut prepared_plan = access_plan.prepare(rg_metadata)?; - - // Reorder row groups by statistics if sort order is known. - // This helps TopK queries find optimal values first by placing - // row groups with optimal min/max values at the front. - // When reorder is active, skip reverse — reorder already encodes - // the direction (uses min for ASC, max for DESC). - if let Some(sort_order) = &prepared.sort_order_for_reorder { - prepared_plan = prepared_plan.reorder_by_statistics( - sort_order, + // Build the access plan optimizer from sort pushdown hints. + // ReorderByStatistics is preferred (handles both ASC and DESC via + // min/max stats). ReverseRowGroups is a fallback when no statistics + // are available on the sort column. + let optimizer: Option< + Box, + > = if let Some(sort_order) = &prepared.sort_order_for_reorder { + Some(Box::new( + crate::access_plan_optimizer::ReorderByStatistics::new( + sort_order.clone(), + ), + )) + } else if prepared.reverse_row_groups { + Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups)) + } else { + None + }; + + // Prepare the access plan and apply row group optimizer if configured. + let prepared_plan = if let Some(opt) = &optimizer { + access_plan.prepare_with_optimizer( + rg_metadata, file_metadata.as_ref(), &prepared.physical_file_schema, - )?; - } else if prepared.reverse_row_groups { - // Fallback: simple reverse when no sort order statistics available. - // See `ParquetSource::try_pushdown_sort` for the rationale. - prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; - } + opt.as_ref(), + )? + } else { + access_plan.prepare(rg_metadata)? + }; let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let read_plan = build_projection_read_plan( From 0a72fe020136fdc149fb3c0ed3a1de3b4f43a8b2 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 16 Apr 2026 12:20:27 +0000 Subject: [PATCH 13/16] chore: remove benchmark from this PR (tracked in #21582) --- benchmarks/bench.sh | 130 ------------------ .../queries/sort_pushdown_inexact/q1.sql | 8 -- .../queries/sort_pushdown_inexact/q2.sql | 7 - .../queries/sort_pushdown_inexact/q3.sql | 8 -- .../queries/sort_pushdown_inexact/q4.sql | 7 - benchmarks/src/sort_pushdown.rs | 9 +- 6 files changed, 1 insertion(+), 168 deletions(-) delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q1.sql delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q2.sql delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q3.sql delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q4.sql diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index aa1ec477345c6..9dce4cf77b933 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,9 +109,6 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files -sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder -sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder -sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -319,9 +316,6 @@ main() { sort_pushdown|sort_pushdown_sorted) data_sort_pushdown ;; - sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap) - data_sort_pushdown_inexact - ;; sort_tpch) # same data as for tpch data_tpch "1" "parquet" @@ -528,15 +522,6 @@ main() { sort_pushdown_sorted) run_sort_pushdown_sorted ;; - sort_pushdown_inexact) - run_sort_pushdown_inexact - ;; - sort_pushdown_inexact_unsorted) - run_sort_pushdown_inexact_unsorted - ;; - sort_pushdown_inexact_overlap) - run_sort_pushdown_inexact_overlap - ;; sort_tpch) run_sort_tpch "1" ;; @@ -1152,121 +1137,6 @@ run_sort_pushdown_sorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } -# Generates data for sort pushdown Inexact benchmark. -# -# Produces a single large lineitem parquet file where row groups have -# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally -# sorted, RGs shuffled). This simulates append-heavy workloads where data -# is written in batches at different times. -data_sort_pushdown_inexact() { - INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" - if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then - echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" - return - fi - - echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." - - # Re-use the sort_pushdown data as the source (generate if missing) - data_sort_pushdown - - mkdir -p "${INEXACT_DIR}" - SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" - - # Use datafusion-cli to bucket rows into 64 groups by a deterministic - # scrambler, then sort within each bucket by orderkey. This produces - # ~64 RG-sized segments where each has a tight orderkey range but the - # segments appear in scrambled (non-sorted) order in the file. - (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " - CREATE EXTERNAL TABLE src - STORED AS PARQUET - LOCATION '${SRC_DIR}'; - - COPY ( - SELECT * FROM src - ORDER BY - (l_orderkey * 1664525 + 1013904223) % 64, - l_orderkey - ) - TO '${INEXACT_DIR}/shuffled.parquet' - STORED AS PARQUET - OPTIONS ('format.max_row_group_size' '100000'); - ") - - echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" - ls -la "${INEXACT_DIR}" - - # Also generate a file with partially overlapping row groups. - # Simulates streaming data with network delays: each chunk is mostly - # in order but has a small overlap with the next chunk (±5% of the - # chunk range). This is the pattern described by @adriangb — data - # arriving with timestamps that are generally increasing but with - # network-induced jitter causing small overlaps between row groups. - OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" - if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then - echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" - return - fi - - echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." - mkdir -p "${OVERLAP_DIR}" - - (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " - CREATE EXTERNAL TABLE src - STORED AS PARQUET - LOCATION '${SRC_DIR}'; - - -- Add jitter to l_orderkey: shift each row by a random-ish offset - -- proportional to its position. This creates overlap between adjacent - -- row groups while preserving the general ascending trend. - -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 - -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. - COPY ( - SELECT * FROM src - ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 - ) - TO '${OVERLAP_DIR}/overlapping.parquet' - STORED AS PARQUET - OPTIONS ('format.max_row_group_size' '100000'); - ") - - echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" - ls -la "${OVERLAP_DIR}" -} - -# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). -# Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet -# reader for late materialization (only needed for Inexact path). -run_sort_pushdown_inexact() { - INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" - RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" - echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." - DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ - debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} -} - -# Runs the sort pushdown Inexact benchmark WITHOUT declared ordering. -# Tests the Unsupported path in try_pushdown_sort where RG reorder by -# statistics can still help TopK queries without any file ordering guarantee. -run_sort_pushdown_inexact_unsorted() { - INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" - RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json" - echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..." - DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ - debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} -} - -# Runs the sort pushdown benchmark with partially overlapping RGs. -# Simulates streaming data with network jitter — RGs are mostly in order -# but have small overlaps (±2500 orderkey jitter between adjacent RGs). -run_sort_pushdown_inexact_overlap() { - OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" - RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" - echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." - DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ - debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} -} - # Runs the sort integration benchmark run_sort_tpch() { SCALE_FACTOR=$1 diff --git a/benchmarks/queries/sort_pushdown_inexact/q1.sql b/benchmarks/queries/sort_pushdown_inexact/q1.sql deleted file mode 100644 index d772bc486a12b..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q1.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Inexact path: TopK + DESC LIMIT on ASC-declared file. --- With RG reorder, the first RG read contains the highest max value, --- so TopK's threshold tightens quickly and subsequent RGs get filtered --- efficiently via dynamic filter pushdown. -SELECT l_orderkey, l_partkey, l_suppkey -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q2.sql b/benchmarks/queries/sort_pushdown_inexact/q2.sql deleted file mode 100644 index 6e2bef44fc37e..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q2.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Inexact path: TopK + DESC LIMIT with larger fetch (1000). --- Larger LIMIT means more row_replacements; RG reorder reduces the --- total replacement count by tightening the threshold faster. -SELECT l_orderkey, l_partkey, l_suppkey -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact/q3.sql b/benchmarks/queries/sort_pushdown_inexact/q3.sql deleted file mode 100644 index d858ec79a67c9..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q3.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Inexact path: wide projection (all columns) + DESC LIMIT. --- Shows the row-level filter benefit: with a tight threshold from the --- first RG, subsequent RGs skip decoding non-sort columns for filtered --- rows — bigger wins for wide tables. -SELECT * -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q4.sql b/benchmarks/queries/sort_pushdown_inexact/q4.sql deleted file mode 100644 index bd2efc5d3b992..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q4.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Inexact path: wide projection + DESC LIMIT with larger fetch. --- Combines wide-row row-level filter benefit with larger LIMIT to --- demonstrate cumulative gains from RG reorder. -SELECT * -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 1000 diff --git a/benchmarks/src/sort_pushdown.rs b/benchmarks/src/sort_pushdown.rs index e2a4615a3ef39..e7fce1921e7a8 100644 --- a/benchmarks/src/sort_pushdown.rs +++ b/benchmarks/src/sort_pushdown.rs @@ -159,14 +159,7 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let sql = self.load_query(query_id)?; - let mut config = self.common.config()?; - // Enable parquet filter pushdown + late materialization. This is - // essential for the Inexact sort pushdown path: TopK's dynamic - // filter is pushed to the parquet reader, so only sort-column - // rows pass the filter's Decode non-sort columns are skipped for - // rows that don't pass the filter — this is where RG reorder's - // tight-threshold-first strategy pays off for wide-row queries. - config.options_mut().execution.parquet.pushdown_filters = true; + let config = self.common.config()?; let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) From ad8f3562b955f148be792601be704c203b1252a9 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 17 Apr 2026 14:10:54 +0000 Subject: [PATCH 14/16] fix: resolve doc link for AccessPlanOptimizer --- datafusion/datasource-parquet/src/access_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 32085fa8a177a..f8bcb5b74c19e 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -355,7 +355,7 @@ impl ParquetAccessPlan { } /// Like [`prepare`](Self::prepare), but also applies an - /// [`AccessPlanOptimizer`] to reorder/reverse row groups after + /// `AccessPlanOptimizer` to reorder/reverse row groups after /// preparing the plan. pub(crate) fn prepare_with_optimizer( self, From 2eb2351a18050864f28e2c27cee45fcf4dbebf2c Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 17 Apr 2026 14:32:50 +0000 Subject: [PATCH 15/16] fix: restore benchmark files from upstream main --- benchmarks/bench.sh | 130 ++++++++++++++++++ .../queries/sort_pushdown_inexact/q1.sql | 8 ++ .../queries/sort_pushdown_inexact/q2.sql | 7 + .../queries/sort_pushdown_inexact/q3.sql | 8 ++ .../queries/sort_pushdown_inexact/q4.sql | 7 + 5 files changed, 160 insertions(+) create mode 100644 benchmarks/queries/sort_pushdown_inexact/q1.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q2.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q3.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q4.sql diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 9dce4cf77b933..aa1ec477345c6 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,6 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files +sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder +sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder +sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -316,6 +319,9 @@ main() { sort_pushdown|sort_pushdown_sorted) data_sort_pushdown ;; + sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap) + data_sort_pushdown_inexact + ;; sort_tpch) # same data as for tpch data_tpch "1" "parquet" @@ -522,6 +528,15 @@ main() { sort_pushdown_sorted) run_sort_pushdown_sorted ;; + sort_pushdown_inexact) + run_sort_pushdown_inexact + ;; + sort_pushdown_inexact_unsorted) + run_sort_pushdown_inexact_unsorted + ;; + sort_pushdown_inexact_overlap) + run_sort_pushdown_inexact_overlap + ;; sort_tpch) run_sort_tpch "1" ;; @@ -1137,6 +1152,121 @@ run_sort_pushdown_sorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } +# Generates data for sort pushdown Inexact benchmark. +# +# Produces a single large lineitem parquet file where row groups have +# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally +# sorted, RGs shuffled). This simulates append-heavy workloads where data +# is written in batches at different times. +data_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" + if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" + return + fi + + echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." + + # Re-use the sort_pushdown data as the source (generate if missing) + data_sort_pushdown + + mkdir -p "${INEXACT_DIR}" + SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" + + # Use datafusion-cli to bucket rows into 64 groups by a deterministic + # scrambler, then sort within each bucket by orderkey. This produces + # ~64 RG-sized segments where each has a tight orderkey range but the + # segments appear in scrambled (non-sorted) order in the file. + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + COPY ( + SELECT * FROM src + ORDER BY + (l_orderkey * 1664525 + 1013904223) % 64, + l_orderkey + ) + TO '${INEXACT_DIR}/shuffled.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" + ls -la "${INEXACT_DIR}" + + # Also generate a file with partially overlapping row groups. + # Simulates streaming data with network delays: each chunk is mostly + # in order but has a small overlap with the next chunk (±5% of the + # chunk range). This is the pattern described by @adriangb — data + # arriving with timestamps that are generally increasing but with + # network-induced jitter causing small overlaps between row groups. + OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" + if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" + return + fi + + echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." + mkdir -p "${OVERLAP_DIR}" + + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + -- Add jitter to l_orderkey: shift each row by a random-ish offset + -- proportional to its position. This creates overlap between adjacent + -- row groups while preserving the general ascending trend. + -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 + -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. + COPY ( + SELECT * FROM src + ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 + ) + TO '${OVERLAP_DIR}/overlapping.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" + ls -la "${OVERLAP_DIR}" +} + +# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). +# Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet +# reader for late materialization (only needed for Inexact path). +run_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" + echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + +# Runs the sort pushdown Inexact benchmark WITHOUT declared ordering. +# Tests the Unsupported path in try_pushdown_sort where RG reorder by +# statistics can still help TopK queries without any file ordering guarantee. +run_sort_pushdown_inexact_unsorted() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json" + echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + +# Runs the sort pushdown benchmark with partially overlapping RGs. +# Simulates streaming data with network jitter — RGs are mostly in order +# but have small overlaps (±2500 orderkey jitter between adjacent RGs). +run_sort_pushdown_inexact_overlap() { + OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" + echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + # Runs the sort integration benchmark run_sort_tpch() { SCALE_FACTOR=$1 diff --git a/benchmarks/queries/sort_pushdown_inexact/q1.sql b/benchmarks/queries/sort_pushdown_inexact/q1.sql new file mode 100644 index 0000000000000..d772bc486a12b --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q1.sql @@ -0,0 +1,8 @@ +-- Inexact path: TopK + DESC LIMIT on ASC-declared file. +-- With RG reorder, the first RG read contains the highest max value, +-- so TopK's threshold tightens quickly and subsequent RGs get filtered +-- efficiently via dynamic filter pushdown. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q2.sql b/benchmarks/queries/sort_pushdown_inexact/q2.sql new file mode 100644 index 0000000000000..6e2bef44fc37e --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q2.sql @@ -0,0 +1,7 @@ +-- Inexact path: TopK + DESC LIMIT with larger fetch (1000). +-- Larger LIMIT means more row_replacements; RG reorder reduces the +-- total replacement count by tightening the threshold faster. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact/q3.sql b/benchmarks/queries/sort_pushdown_inexact/q3.sql new file mode 100644 index 0000000000000..d858ec79a67c9 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q3.sql @@ -0,0 +1,8 @@ +-- Inexact path: wide projection (all columns) + DESC LIMIT. +-- Shows the row-level filter benefit: with a tight threshold from the +-- first RG, subsequent RGs skip decoding non-sort columns for filtered +-- rows — bigger wins for wide tables. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q4.sql b/benchmarks/queries/sort_pushdown_inexact/q4.sql new file mode 100644 index 0000000000000..bd2efc5d3b992 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q4.sql @@ -0,0 +1,7 @@ +-- Inexact path: wide projection + DESC LIMIT with larger fetch. +-- Combines wide-row row-level filter benefit with larger LIMIT to +-- demonstrate cumulative gains from RG reorder. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 From 10e6be9ce97d33bc5aa6070a40c4e74cce99b319 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sat, 18 Apr 2026 13:00:19 +0800 Subject: [PATCH 16/16] fix: compose reorder and reverse as sequential steps instead of mutually exclusive Previously reorder_by_statistics and reverse_row_groups were mutually exclusive (else-if). This meant DESC queries on unsorted data could only get one optimization. Now they compose: reorder always sorts RGs by min ASC, then reverse flips for DESC. This ensures correct results for both sorted and unsorted inputs without regression. Also removes prepare_with_optimizer in favor of calling optimize() directly on each optimizer, and simplifies reorder_by_statistics to always use min ASC (direction handled by reverse). --- .../datasource-parquet/src/access_plan.rs | 87 +++++++------------ datafusion/datasource-parquet/src/opener.rs | 60 ++++++++----- 2 files changed, 71 insertions(+), 76 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index f8bcb5b74c19e..2c877ba091294 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -353,20 +353,6 @@ impl ParquetAccessPlan { let row_selection = self.into_overall_row_selection(row_group_meta_data)?; PreparedAccessPlan::new(row_group_indexes, row_selection) } - - /// Like [`prepare`](Self::prepare), but also applies an - /// `AccessPlanOptimizer` to reorder/reverse row groups after - /// preparing the plan. - pub(crate) fn prepare_with_optimizer( - self, - row_group_meta_data: &[RowGroupMetaData], - file_metadata: &ParquetMetaData, - arrow_schema: &Schema, - optimizer: &dyn crate::access_plan_optimizer::AccessPlanOptimizer, - ) -> Result { - let plan = self.prepare(row_group_meta_data)?; - optimizer.optimize(plan, file_metadata, arrow_schema) - } } /// Represents a prepared, fully resolved [`ParquetAccessPlan`] @@ -436,8 +422,6 @@ impl PreparedAccessPlan { } }; - let descending = first_sort_expr.options.descending; - // Build statistics converter for this column let converter = match StatisticsConverter::try_new( column.name(), @@ -451,40 +435,31 @@ impl PreparedAccessPlan { } }; - // Get the relevant statistics for the selected row groups. - // For ASC sort: use min values — we want the RG with the smallest min - // to come first (best candidate for "smallest values"). - // For DESC sort: use max values — we want the RG with the largest max - // to come first (best candidate for "largest values"). Using min for - // DESC can pick a worse first RG when ranges overlap (e.g., RG0 50-60 - // vs RG1 40-100 — RG1 has larger values but smaller min). + // Always sort by min values in ASC order to align row groups with + // the file's declared output ordering. Direction (DESC) is handled + // separately by ReverseRowGroups which is applied AFTER reorder. + // + // This composable design avoids the problem where reorder(DESC) + // followed by reverse would double-flip the order, and ensures + // that for already-sorted data, reorder is a no-op and reverse + // gives the correct DESC order (including placing small tail RGs first). let rg_metadata: Vec<&RowGroupMetaData> = self .row_group_indexes .iter() .map(|&idx| file_metadata.row_group(idx)) .collect(); - let stat_values = if descending { - match converter.row_group_maxes(rg_metadata.iter().copied()) { - Ok(vals) => vals, - Err(e) => { - debug!("Skipping RG reorder: cannot get max values: {e}"); - return Ok(self); - } - } - } else { - match converter.row_group_mins(rg_metadata.iter().copied()) { - Ok(vals) => vals, - Err(e) => { - debug!("Skipping RG reorder: cannot get min values: {e}"); - return Ok(self); - } + let stat_values = match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); } }; - // Sort indices by statistic values (min for ASC, max for DESC) + // Always sort ASC by min values — direction is handled by reverse let sort_options = arrow::compute::SortOptions { - descending, + descending: false, nulls_first: first_sort_expr.options.nulls_first, }; let sorted_indices = @@ -836,7 +811,11 @@ mod test { } #[test] - fn test_reorder_by_statistics_desc() { + fn test_reorder_by_statistics_desc_sorts_asc() { + // reorder_by_statistics always sorts by min ASC regardless of sort + // direction. DESC is handled separately by ReverseRowGroups which + // is applied after reorder in the optimizer pipeline. + // // RGs: [50-99, 200-299, 1-30] let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); let schema = make_arrow_schema(); @@ -847,8 +826,9 @@ mod test { .reorder_by_statistics(&sort_order, &metadata, &schema) .unwrap(); - // DESC: largest max first: RG1(max=299), RG0(max=99), RG2(max=30) - assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + // Always ASC by min: RG2(min=1), RG0(min=50), RG1(min=200) + // Reverse is applied separately for DESC queries. + assert_eq!(plan.row_group_indexes, vec![2, 0, 1]); } #[test] @@ -949,17 +929,14 @@ mod test { } #[test] - fn test_reorder_by_statistics_desc_uses_max_for_overlapping_rgs() { - // Overlapping ranges where min DESC would pick worse RG than max DESC: - // RG0: 50-60 (small range, moderate max) - // RG1: 40-100 (wide range, high max but lower min) - // RG2: 20-30 (low max) - // - // For ORDER BY DESC LIMIT N: - // Using min DESC: [RG0(50), RG1(40), RG2(20)] → reads RG0 first (max=60 only) - // Using max DESC: [RG1(100), RG0(60), RG2(30)] → reads RG1 first (max=100) + fn test_reorder_by_statistics_overlapping_rgs_sorts_asc() { + // Overlapping ranges — reorder always uses min ASC: + // RG0: 50-60 + // RG1: 40-100 (lower min, wider range) + // RG2: 20-30 (lowest min) // - // RG1 is the better first choice for DESC because it contains the largest values. + // Sorted by min ASC: [RG2(20), RG1(40), RG0(50)] + // For DESC queries, ReverseRowGroups is applied after to flip order. let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]); let schema = make_arrow_schema(); let sort_order = make_sort_order_desc(); @@ -969,8 +946,8 @@ mod test { .reorder_by_statistics(&sort_order, &metadata, &schema) .unwrap(); - // Expected: RG1 (max=100) first, then RG0 (max=60), then RG2 (max=30) - assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + // Always ASC by min: RG2(min=20), RG1(min=40), RG0(min=50) + assert_eq!(plan.row_group_indexes, vec![2, 1, 0]); } #[test] diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 41dacd04dc3e8..2bcffbab24212 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1130,35 +1130,53 @@ impl RowGroupsPrunedParquetOpen { ); } - // Build the access plan optimizer from sort pushdown hints. - // ReorderByStatistics is preferred (handles both ASC and DESC via - // min/max stats). ReverseRowGroups is a fallback when no statistics - // are available on the sort column. - let optimizer: Option< + // Row group ordering optimization (two composable steps): + // + // 1. reorder_by_statistics: sort RGs by min values (ASC) to align + // with the file's declared output ordering. This fixes out-of-order + // RGs (e.g., from append-heavy workloads) without changing direction. + // Skipped gracefully when statistics are unavailable. + // + // 2. reverse: flip the order for DESC queries. Applied AFTER reorder + // so the reversed order is correct whether or not reorder changed + // anything. Also handles row_selection remapping. + // + // For sorted data: reorder is a no-op, reverse gives perfect DESC. + // For unsorted data: reorder fixes the order, reverse flips for DESC. + let reorder_optimizer: Option< Box, - > = if let Some(sort_order) = &prepared.sort_order_for_reorder { - Some(Box::new( - crate::access_plan_optimizer::ReorderByStatistics::new( - sort_order.clone(), - ), - )) - } else if prepared.reverse_row_groups { + > = prepared.sort_order_for_reorder.as_ref().map(|sort_order| { + Box::new(crate::access_plan_optimizer::ReorderByStatistics::new( + sort_order.clone(), + )) as Box + }); + + let reverse_optimizer: Option< + Box, + > = if prepared.reverse_row_groups { Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups)) } else { None }; - // Prepare the access plan and apply row group optimizer if configured. - let prepared_plan = if let Some(opt) = &optimizer { - access_plan.prepare_with_optimizer( - rg_metadata, + // Prepare the access plan and apply optimizers in order: + // 1. reorder (fix out-of-order RGs to match declared ordering) + // 2. reverse (flip for DESC queries) + let mut prepared_plan = access_plan.prepare(rg_metadata)?; + if let Some(opt) = &reorder_optimizer { + prepared_plan = opt.optimize( + prepared_plan, file_metadata.as_ref(), &prepared.physical_file_schema, - opt.as_ref(), - )? - } else { - access_plan.prepare(rg_metadata)? - }; + )?; + } + if let Some(opt) = &reverse_optimizer { + prepared_plan = opt.optimize( + prepared_plan, + file_metadata.as_ref(), + &prepared.physical_file_schema, + )?; + } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let read_plan = build_projection_read_plan(