Cherry-pick unreleased yet fixed sort pushdown to spiceai-52.5#154
Cherry-pick unreleased yet fixed sort pushdown to spiceai-52.5#154krinart wants to merge 5 commits intospiceai-52.5from
Conversation
…)" This reverts commit f9443ab.
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Prerequisite for the following PRs: - apache#19760 - apache#19761 Even if the api on the `MemoryPool` does not require `&mut self` for growing/shrinking the reserved size, the api in `MemoryReservation` does, making simple implementations irrepresentable without synchronization primitives. For example, the following would require a `Mutex` for concurrent access to the `MemoryReservation` in different threads, even though the `MemoryPool` doesn't: ```rust let mut stream: SendableRecordBatchStream = SendableRecordBatchStream::new(); let mem: Arc<MemoryReservation> = Arc::new(MemoryReservation::new_empty()); let mut builder = ReceiverStreamBuilder::new(10); let tx = builder.tx(); { let mem = mem.clone(); builder.spawn(async move { while let Some(msg) = stream.next().await { mem.try_grow(msg.unwrap().get_array_memory_size()); // ❌ `mem` is not mutable tx.send(msg).unwrap(); } }); } builder .build() .inspect_ok(|msg| mem.shrink(msg.get_array_memory_size())); // ❌ `mem` is not mutable ``` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Make the methods in `MemoryReservation` require `&self` instead of `&mut self` for allowing concurrent shrink/grows from different tasks for the same reservation. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> yes, by current tests Users can now safely call methods of `MemoryReservation` from different tasks without synchronization primitives. This is a backwards compatible API change, as it will work out of the box for current users, however, depending on their clippy configuration, they might see some new warnings about "unused muts" in their codebase. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. This is a PR from a batch of PRs that attempt to improve performance in hash joins: - apache#19759 - This PR - apache#19761 It adds a building block that allows eagerly collecting data on the probe side of a hash join before the build side is finished. Even if the intended use case is for hash joins, the new execution node is generic and is designed to work anywhere in the plan. > [!NOTE] > The new BufferExec node introduced in this PR is still not wired up automatically <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Adds a new `BufferExec` node that can buffer up to a certain size in bytes for each partition eagerly performing work that otherwise would be delayed. Schematically, it looks like this: ``` ┌───────────────────────────┐ │ BufferExec │ │ │ │┌────── Partition 0 ──────┐│ ││ ┌────┐ ┌────┐││ ┌────┐ ──background poll────────▶│ │ │ ├┼┼───────▶ │ ││ └────┘ └────┘││ └────┘ │└─────────────────────────┘│ │┌────── Partition 1 ──────┐│ ││ ┌────┐ ┌────┐ ┌────┐││ ┌────┐ ──background poll─▶│ │ │ │ │ ├┼┼───────▶ │ ││ └────┘ └────┘ └────┘││ └────┘ │└─────────────────────────┘│ │ │ │ ... │ │ │ │┌────── Partition N ──────┐│ ││ ┌────┐││ ┌────┐ ──background poll───────────────▶│ ├┼┼───────▶ │ ││ └────┘││ └────┘ │└─────────────────────────┘│ └───────────────────────────┘ ``` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> yes, by new unit tests <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> users can import a new `BufferExec` execution plan in their codebase, but no internal usage is shipped yet in this PR. <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
- Related to apache#17348 - Precursor to apache#21182 Add benchmark and integration tests for sort pushdown optimization, split out from apache#21182 per [reviewer request](apache#21182 (comment)). This allows comparing benchmark results before and after the optimization lands, and the SLT diff in apache#21182 will clearly show which test expectations changed due to the optimization. New `sort-pushdown` benchmark subcommand with 4 queries testing sort elimination: | Query | Description | |-------|-------------| | Q1 | `ORDER BY l_orderkey ASC` (full scan) | | Q2 | `ORDER BY l_orderkey ASC LIMIT 100` | | Q3 | `SELECT * ORDER BY l_orderkey ASC` (wide) | | Q4 | `SELECT * ORDER BY l_orderkey ASC LIMIT 100` (wide) | Usage: \`\`\`bash ./bench.sh data sort_pushdown ./bench.sh run sort_pushdown # baseline ./bench.sh run sort_pushdown_sorted # with sort elimination \`\`\` - **Test A**: Non-overlapping files + WITH ORDER → Sort eliminated (single partition) - **Test B**: Overlapping files → SortExec retained (baseline, files in original order) - **Test C**: LIMIT queries (ASC sort elimination + DESC reverse scan) - **Test D**: \`target_partitions=2\` → SPM + per-partition sort elimination - **Test E**: Inferred ordering from Parquet metadata (no WITH ORDER) — single and multi partition | File | Change | |------|--------| | \`benchmarks/src/sort_pushdown.rs\` | New benchmark module | | \`benchmarks/src/lib.rs\` | Register module | | \`benchmarks/src/bin/dfbench.rs\` | Register subcommand | | \`benchmarks/bench.sh\` | Add data/run entries | | \`datafusion/sqllogictest/test_files/sort_pushdown.slt\` | 5 new SLT test groups | - [x] \`cargo clippy -p datafusion-benchmarks\` — 0 warnings - [x] \`cargo test -p datafusion-sqllogictest -- sort_pushdown\` — all tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…own phase 2) (apache#21182) - Closes apache#17348 - Closes apache#19329 When a partition (file group) contains multiple files in wrong order, `validated_output_ordering()` strips the ordering and `EnforceSorting` inserts an unnecessary `SortExec` — even though the files are non-overlapping and internally sorted. This PR fixes it by **sorting files within each group by min/max statistics** during sort pushdown. After sorting, the file order matches the sort key order, the ordering becomes valid, and `SortExec` can be eliminated. This works for both single-partition and multi-partition plans with multi-file groups. ```text Files in wrong order within a partition: After statistics-based sorting: [a_high(400k+), b_mid(200k), c_low(1)] [c_low(1), b_mid(200k), a_high(400k+)] → ordering stripped → ordering valid, non-overlapping → SortExec stays → SortExec eliminated ``` When `PushdownSort` finds a `SortExec` above a file-based `DataSourceExec`: 1. **FileSource returns Exact** (natural ordering satisfies request): - Sort files within each group by statistics, verify non-overlapping - SortExec removed, fetch (LIMIT) pushed to DataSourceExec 2. **FileSource returns Unsupported** (ordering stripped due to wrong file order): - Sort files within each group by statistics - Re-check: if files are now non-overlapping and ordering is valid → upgrade to Exact - SortExec eliminated + fetch pushed down 3. **FileSource returns Inexact** (reverse scan): - SortExec kept, scan optimized with reverse_row_groups | File | Change | |------|--------| | `datasource-parquet/src/source.rs` | ParquetSource returns `Exact` when natural ordering satisfies request | | `datasource/src/file_scan_config.rs` | Statistics-based file sorting, non-overlapping re-check, Unsupported→Exact upgrade | | `physical-optimizer/src/pushdown_sort.rs` | Preserve fetch (LIMIT) when eliminating SortExec, module doc update | | `core/tests/physical_optimizer/pushdown_sort.rs` | Updated prefix match test | | `sqllogictest/test_files/sort_pushdown.slt` | Updated existing tests + 5 new test groups (A-E) | Local release build, `--partitions 1`, 3 non-overlapping files with reversed naming (6M rows): | Query | Description | Main (ms) | PR (ms) | Speedup | |-------|-------------|-----------|---------|---------| | Q1 | `ORDER BY ASC` (full scan) | 259 | 122 | **2.1x faster** | | Q2 | `ORDER BY ASC LIMIT 100` | 80 | 3 | **27x faster** | | Q3 | `SELECT * ORDER BY ASC` | 700 | 313 | **2.2x faster** | | Q4 | `SELECT * LIMIT 100` | 342 | 7 | **49x faster** | LIMIT queries benefit most because sort elimination + limit pushdown means only the first ~100 rows are read. - 13 new unit tests covering all sort pushdown paths - 5 new SLT integration test groups (sort elimination, overlapping files, LIMIT, multi-partition, inferred ordering) - All existing tests pass with no regressions - [x] `cargo test -p datafusion-datasource` — all tests pass - [x] `cargo test -p datafusion-datasource-parquet` — all tests pass - [x] `cargo test -p datafusion-physical-optimizer` — all tests pass - [x] `cargo test -p datafusion --test core_integration` — all tests pass - [x] SLT sort/order/topk/window/union/joins tests pass (no regressions) - [x] `cargo clippy` — 0 warnings 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR cherry-picks upstream DataFusion sort-pushdown improvements onto spiceai-52.5, including statistics-based file reordering for sort elimination, introducing BufferExec to preserve pipelining when SortExec is removed under SortPreservingMergeExec, and updating protobuf + SLT/bench coverage accordingly.
Changes:
- Add statistics-based file sorting + broader sort-elimination behavior (including new SLT coverage and a benchmark suite).
- Introduce
BufferExecand add protobuf serialization support for it. - Refactor
MemoryReservationto support shared mutation (atomic size) and update call sites.
Reviewed changes
Copilot reviewed 30 out of 33 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Updates/extends SLT expectations and adds new statistics-based sort-elimination scenarios. |
| datafusion/proto/src/physical_plan/mod.rs | Adds encode/decode support for BufferExec in physical-plan protobuf conversion. |
| datafusion/proto/src/generated/prost.rs | Regenerates prost types to include BufferExecNode (tag 37). |
| datafusion/proto/src/generated/pbjson.rs | Regenerates pbjson serde support for BufferExecNode + PhysicalPlanNode variant. |
| datafusion/proto/proto/datafusion.proto | Adds BufferExecNode message and wires it into PhysicalPlanNode. |
| datafusion/physical-plan/src/work_table.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion/physical-plan/src/stream.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion/physical-plan/src/sorts/stream.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | Fixes with_fetch to preserve enable_round_robin_repartition. |
| datafusion/physical-plan/src/sorts/sort.rs | Adjusts reservation parameter mutability to match new MemoryReservation API. |
| datafusion/physical-plan/src/lib.rs | Exports new buffer module. |
| datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion/physical-plan/src/joins/nested_loop_join.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion/physical-plan/src/joins/cross_join.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion/physical-plan/src/buffer.rs | Introduces BufferExec + MemoryBufferedStream and associated tests. |
| datafusion/physical-optimizer/src/pushdown_sort.rs | Enhances sort pushdown, inserts BufferExec under SPM when eliminating SortExec, and updates exact/inexact handling. |
| datafusion/execution/src/memory_pool/pool.rs | Updates pool accounting to use the new MemoryReservation::size() API. |
| datafusion/execution/src/memory_pool/mod.rs | Refactors MemoryReservation to store size in an atomic and mutating methods to take &self. |
| datafusion/datasource/src/file_scan_config.rs | Implements statistics-based file sorting, ordering validation, null-safety checks for upgrading to Exact, and refactors ordering validation logic. |
| datafusion/datasource-parquet/src/source.rs | Allows Parquet source to return Exact when metadata ordering satisfies the request. |
| datafusion/datasource-parquet/src/file_format.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model in Parquet sink paths. |
| datafusion/core/tests/physical_optimizer/test_utils.rs | Removes ExactTestScan helper previously used for exact pushdown tests. |
| datafusion/core/tests/physical_optimizer/pushdown_sort.rs | Updates tests to assert sort elimination for exact prefix matches and removes ExactTestScan-based tests. |
| datafusion/core/src/execution/context/mod.rs | Adjusts memory reservation usage for updated MemoryReservation mutability model. |
| datafusion-cli/src/exec.rs | Adjusts CLI result-reservation code for updated MemoryReservation mutability model. |
| benchmarks/src/sort_pushdown.rs | Adds new benchmark harness for measuring sort pushdown / elimination impact. |
| benchmarks/src/lib.rs | Registers the new sort-pushdown benchmark module. |
| benchmarks/src/bin/dfbench.rs | Wires the new benchmark option into the CLI. |
| benchmarks/queries/sort_pushdown/q1.sql | Adds benchmark query (ORDER BY key). |
| benchmarks/queries/sort_pushdown/q2.sql | Adds benchmark query (ORDER BY + LIMIT). |
| benchmarks/queries/sort_pushdown/q3.sql | Adds benchmark query (wide projection ORDER BY). |
| benchmarks/queries/sort_pushdown/q4.sql | Adds benchmark query (wide projection ORDER BY + LIMIT). |
| benchmarks/bench.sh | Adds bench.sh targets for sort pushdown baseline vs sorted/WITH ORDER runs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let task = SpawnedTask::spawn(async move { | ||
| loop { | ||
| // Select on both the input stream and the channel being closed. | ||
| // By down this, we abort polling the input as soon as the consumer channel is |
There was a problem hiding this comment.
Typo in comment: “By down this” should be “By doing this”.
| // By down this, we abort polling the input as soon as the consumer channel is | |
| // By doing this, we abort polling the input as soon as the consumer channel is |
| // CoalesceBatchesExec is transparent for sort ordering - it preserves order | ||
| // Delegate to the child and wrap with a new CoalesceBatchesExec |
There was a problem hiding this comment.
The comment in try_pushdown_sort refers to CoalesceBatchesExec (“wrap with a new CoalesceBatchesExec”), but this is BufferExec. This looks like a copy/paste and is misleading for future maintainers—update the comment to describe BufferExec’s sort-order transparency.
| // CoalesceBatchesExec is transparent for sort ordering - it preserves order | |
| // Delegate to the child and wrap with a new CoalesceBatchesExec | |
| // BufferExec is transparent for sort ordering because it preserves its input order. | |
| // Delegate to the child and wrap any rewritten child in a new BufferExec. |
| SortOrderPushdownResult::Exact { inner } => { | ||
| // Data source guarantees perfect ordering - remove the Sort operator. | ||
| // | ||
| // If the SortExec carried a fetch (LIMIT), we must preserve it. | ||
| // First try pushing the limit into the source via `with_fetch()` | ||
| // If the source doesn't support `with_fetch`, fall back to | ||
| // wrapping with GlobalLimitExec. | ||
| // | ||
| // Note: LimitPushdown runs *before* PushdownSort in the optimizer | ||
| // pipeline, so we need to handle the limit manually here. | ||
| if let Some(fetch) = sort_exec.fetch() { | ||
| let limited = inner | ||
| .with_fetch(Some(fetch)) | ||
| .unwrap_or_else(|| { | ||
| Arc::new(GlobalLimitExec::new( | ||
| inner, 0, Some(fetch), | ||
| )) | ||
| }); | ||
| Ok(Transformed::yes(limited)) | ||
| // Preserve the fetch (LIMIT) from the original SortExec so the | ||
| // data source can stop reading early. | ||
| let inner = if let Some(fetch) = sort_exec.fetch() { | ||
| inner.with_fetch(Some(fetch)).unwrap_or(inner) | ||
| } else { | ||
| Ok(Transformed::yes(inner)) | ||
| } | ||
| inner | ||
| }; | ||
| Ok(Transformed::yes(inner)) |
There was a problem hiding this comment.
PushdownSort removes SortExec in the Exact branch but only preserves fetch if inner.with_fetch(Some(fetch)) returns Some(_). Since ExecutionPlan::with_fetch defaults to None, this can silently drop LIMIT again for any source that can return Exact but doesn’t implement with_fetch (the same class of bug described in PR #153). Restore a fallback that preserves semantics (e.g., wrap with a limit exec when with_fetch is unsupported).
| let new_spm = | ||
| SortPreservingMergeExec::new(spm.expr().clone(), buffered) | ||
| .with_fetch(spm.fetch()); | ||
| return Ok(Transformed::yes(Arc::new(new_spm))); | ||
| } | ||
| SortOrderPushdownResult::Inexact { inner } => { | ||
| let new_sort = SortExec::new(required_ordering.clone(), inner) | ||
| .with_fetch(sort_child.fetch()) | ||
| .with_preserve_partitioning(true); | ||
| let new_spm = SortPreservingMergeExec::new( | ||
| spm.expr().clone(), | ||
| Arc::new(new_sort), | ||
| ) | ||
| .with_fetch(spm.fetch()); | ||
| return Ok(Transformed::yes(Arc::new(new_spm))); |
There was a problem hiding this comment.
This SPM→SortExec rewrite rebuilds a new SortPreservingMergeExec via SortPreservingMergeExec::new(...), which resets enable_round_robin_repartition to its default (true). If the original SPM was configured with with_round_robin_repartition(false) (stable merge), this transformation changes behavior. Preserve the original round-robin setting when reconstructing the SPM (may require adding a getter / builder support on SortPreservingMergeExec).
| let new_spm = | |
| SortPreservingMergeExec::new(spm.expr().clone(), buffered) | |
| .with_fetch(spm.fetch()); | |
| return Ok(Transformed::yes(Arc::new(new_spm))); | |
| } | |
| SortOrderPushdownResult::Inexact { inner } => { | |
| let new_sort = SortExec::new(required_ordering.clone(), inner) | |
| .with_fetch(sort_child.fetch()) | |
| .with_preserve_partitioning(true); | |
| let new_spm = SortPreservingMergeExec::new( | |
| spm.expr().clone(), | |
| Arc::new(new_sort), | |
| ) | |
| .with_fetch(spm.fetch()); | |
| return Ok(Transformed::yes(Arc::new(new_spm))); | |
| let new_spm = plan.with_new_children(vec![buffered])?; | |
| return Ok(Transformed::yes(new_spm)); | |
| } | |
| SortOrderPushdownResult::Inexact { inner } => { | |
| let new_sort = SortExec::new(required_ordering.clone(), inner) | |
| .with_fetch(sort_child.fetch()) | |
| .with_preserve_partitioning(true); | |
| let new_spm = | |
| plan.with_new_children(vec![Arc::new(new_sort)])?; | |
| return Ok(Transformed::yes(new_spm)); |
| pub fn shrink(&mut self, capacity: usize) { | ||
| let new_size = self.size.checked_sub(capacity).unwrap(); | ||
| pub fn shrink(&self, capacity: usize) { | ||
| self.size.fetch_sub(capacity, atomic::Ordering::Relaxed); |
There was a problem hiding this comment.
MemoryReservation::shrink is documented to panic if capacity exceeds the current reservation size, but the new implementation uses fetch_sub without any checked underflow. If shrink is ever called with too-large capacity, the atomic will wrap and the pool accounting will be corrupted. Reintroduce an explicit checked-sub (panic) or use fetch_update with checked_sub like split/try_shrink.
| self.size.fetch_sub(capacity, atomic::Ordering::Relaxed); | |
| let updated = self.size.fetch_update( | |
| atomic::Ordering::Relaxed, | |
| atomic::Ordering::Relaxed, | |
| |prev| prev.checked_sub(capacity), | |
| ); | |
| if let Err(prev) = updated { | |
| panic!("Cannot free the capacity {capacity} out of allocated size {prev}"); | |
| } |
| write!(f, "BufferExec: capacity={}", self.capacity) | ||
| } | ||
| DisplayFormatType::TreeRender => { | ||
| writeln!(f, "target_batch_size={}", self.capacity) |
There was a problem hiding this comment.
DisplayFormatType::TreeRender prints target_batch_size=... for BufferExec, but this exec is parameterized by capacity (bytes). This makes explain/tree output misleading and inconsistent with the Default/Verbose formatting. Use a label like capacity (and ensure the TreeRender output matches conventions used by other execs).
| writeln!(f, "target_batch_size={}", self.capacity) | |
| writeln!(f, "capacity={}", self.capacity) |
spiceai-52.5with its 4 dependencies