From a7a6a0eb9545e8a881b2496598f6b391cb085295 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 13:59:07 +0200 Subject: [PATCH 01/14] fix: Unaccounted spill sort in row_hash --- datafusion/physical-plan/src/aggregates/row_hash.rs | 13 ++++++++++++- datafusion/physical-plan/src/sorts/sort.rs | 12 +++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index de857370ce28..153a48c22cc5 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -30,7 +30,7 @@ use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; -use crate::sorts::sort::sort_batch; +use crate::sorts::sort::{get_reserved_bytes_for_record_batch, sort_batch}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::SpillManager; use crate::{PhysicalExpr, aggregates, metrics}; @@ -1110,6 +1110,15 @@ impl GroupedHashAggregateStream { let Some(emit) = self.emit(EmitTo::All, true)? else { return Ok(()); }; + + // Free accumulated state now that data has been emitted into `emit`. + // This must happen before reserving sort memory so the pool has room. + self.clear_shrink(self.batch_size); + self.update_memory_reservation()?; + + let sort_memory = get_reserved_bytes_for_record_batch(&emit)?; + self.reservation.try_grow(sort_memory)?; + let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?; // Spill sorted state to disk @@ -1121,6 +1130,8 @@ impl GroupedHashAggregateStream { "HashAggSpill", self.batch_size, )?; + + self.reservation.shrink(sort_memory); match spillfile { Some((spillfile, max_record_batch_memory)) => { self.spill_state.spills.push(SortedSpillFile { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a8361f7b2941..7b65f108e049 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -850,11 +850,13 @@ pub(crate) fn get_reserved_bytes_for_record_batch_size( /// Estimate how much memory is needed to sort a `RecordBatch`. /// This will just call `get_reserved_bytes_for_record_batch_size` with the /// memory size of the record batch and its sliced size. -pub(super) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result { - Ok(get_reserved_bytes_for_record_batch_size( - get_record_batch_memory_size(batch), - batch.get_sliced_size()?, - )) +pub(crate) fn get_reserved_bytes_for_record_batch(batch: &RecordBatch) -> Result { + batch.get_sliced_size().map(|sliced_size| { + get_reserved_bytes_for_record_batch_size( + get_record_batch_memory_size(batch), + sliced_size, + ) + }) } impl Debug for ExternalSorter { From 0c10f68a64840f1376071c295a9893e7824b36c6 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 14:29:31 +0200 Subject: [PATCH 02/14] Add failing test --- .../physical-plan/src/aggregates/mod.rs | 136 ++++++++++++++++++ .../physical-plan/src/aggregates/row_hash.rs | 6 +- 2 files changed, 141 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 6cd8557421e5..1625a70c214f 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -3828,6 +3828,142 @@ mod tests { Ok(()) } + /// Tests that when the memory pool is too small to accommodate the sort + /// reservation during spill, the error is properly propagated as + /// ResourcesExhausted rather than silently producing wrong results. + #[tokio::test] + async fn test_sort_reservation_fails_during_spill() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("g", DataType::Int64, false), + Field::new("a", DataType::Float64, false), + Field::new("b", DataType::Float64, false), + Field::new("c", DataType::Float64, false), + Field::new("d", DataType::Float64, false), + Field::new("e", DataType::Float64, false), + ])); + + let batches = vec![vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![1])), + Arc::new(Float64Array::from(vec![10.0])), + Arc::new(Float64Array::from(vec![20.0])), + Arc::new(Float64Array::from(vec![30.0])), + Arc::new(Float64Array::from(vec![40.0])), + Arc::new(Float64Array::from(vec![50.0])), + ], + )?, + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![2])), + Arc::new(Float64Array::from(vec![11.0])), + Arc::new(Float64Array::from(vec![21.0])), + Arc::new(Float64Array::from(vec![31.0])), + Arc::new(Float64Array::from(vec![41.0])), + Arc::new(Float64Array::from(vec![51.0])), + ], + )?, + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![3])), + Arc::new(Float64Array::from(vec![12.0])), + Arc::new(Float64Array::from(vec![22.0])), + Arc::new(Float64Array::from(vec![32.0])), + Arc::new(Float64Array::from(vec![42.0])), + Arc::new(Float64Array::from(vec![52.0])), + ], + )?, + ]]; + + let scan = TestMemoryExec::try_new(&batches, Arc::clone(&schema), None)?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new( + vec![(col("g", schema.as_ref())?, "g".to_string())], + vec![], + vec![vec![false]], + false, + ), + vec![ + Arc::new( + AggregateExprBuilder::new( + avg_udaf(), + vec![col("a", schema.as_ref())?], + ) + .schema(Arc::clone(&schema)) + .alias("AVG(a)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new( + avg_udaf(), + vec![col("b", schema.as_ref())?], + ) + .schema(Arc::clone(&schema)) + .alias("AVG(b)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new( + avg_udaf(), + vec![col("c", schema.as_ref())?], + ) + .schema(Arc::clone(&schema)) + .alias("AVG(c)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new( + avg_udaf(), + vec![col("d", schema.as_ref())?], + ) + .schema(Arc::clone(&schema)) + .alias("AVG(d)") + .build()?, + ), + Arc::new( + AggregateExprBuilder::new( + avg_udaf(), + vec![col("e", schema.as_ref())?], + ) + .schema(Arc::clone(&schema)) + .alias("AVG(e)") + .build()?, + ), + ], + vec![None, None, None, None, None], + Arc::new(scan) as Arc, + Arc::clone(&schema), + )?); + + // Pool must be large enough for accumulation to start but too small for + // sort_memory after clearing. + let task_ctx = new_spill_ctx(1, 500); + let result = collect(aggr.execute(0, Arc::clone(&task_ctx))?).await; + + match &result { + Ok(_) => panic!("Expected ResourcesExhausted error but query succeeded"), + Err(e) => { + let root = e.find_root(); + assert!( + matches!(root, DataFusionError::ResourcesExhausted(_)), + "Expected ResourcesExhausted, got: {root}", + ); + let msg = root.to_string(); + assert!( + msg.contains("Failed to reserve memory for sort during spill"), + "Expected sort reservation error, got: {msg}", + ); + } + } + + Ok(()) + } + /// Tests that PartialReduce mode: /// 1. Accepts state as input (like Final) /// 2. Produces state as output (like Partial) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 153a48c22cc5..da868993b5d0 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1117,7 +1117,11 @@ impl GroupedHashAggregateStream { self.update_memory_reservation()?; let sort_memory = get_reserved_bytes_for_record_batch(&emit)?; - self.reservation.try_grow(sort_memory)?; + self.reservation.try_grow(sort_memory).map_err(|e| { + DataFusionError::ResourcesExhausted(format!( + "Failed to reserve memory for sort during spill: {e}" + )) + })?; let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?; From 6239d9078a4ccbba6fa2d59e894208214859018c Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 14:31:27 +0200 Subject: [PATCH 03/14] fix doc --- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 1625a70c214f..706951abe47c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -3830,7 +3830,7 @@ mod tests { /// Tests that when the memory pool is too small to accommodate the sort /// reservation during spill, the error is properly propagated as - /// ResourcesExhausted rather than silently producing wrong results. + /// ResourcesExhausted rather than silently exceeding memory limits. #[tokio::test] async fn test_sort_reservation_fails_during_spill() -> Result<()> { let schema = Arc::new(Schema::new(vec![ From 0eaf5cda6efb6a8ad22036b066bb4aeb779fd1ab Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 15:19:59 +0200 Subject: [PATCH 04/14] Use sort_chunked --- .../physical-plan/src/aggregates/row_hash.rs | 24 +++++++--- datafusion/physical-plan/src/spill/mod.rs | 14 +++--- .../physical-plan/src/spill/spill_manager.rs | 45 +++++++------------ 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index da868993b5d0..7416c9a71ab6 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -30,7 +30,7 @@ use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; -use crate::sorts::sort::{get_reserved_bytes_for_record_batch, sort_batch}; +use crate::sorts::sort::{get_reserved_bytes_for_record_batch, sort_batch_chunked}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::SpillManager; use crate::{PhysicalExpr, aggregates, metrics}; @@ -52,6 +52,7 @@ use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_common::instant::Instant; +use datafusion_common::utils::memory::get_record_batch_memory_size; use futures::ready; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -1123,19 +1124,30 @@ impl GroupedHashAggregateStream { )) })?; - let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?; + // Sort the batch into chunks and spill each chunk to disk. This ensures we don't have to hold the entire + // batch in memory until the end of the spill, and can both drop it and release memory pressure sooner, + // as spilling may take time and other things can happen in the background. + let sorted = + sort_batch_chunked(&emit, &self.spill_state.spill_expr, self.batch_size)?; + drop(emit); + + let new_sort_memory: usize = + sorted.iter().map(get_record_batch_memory_size).sum(); + let mem_to_free = sort_memory.saturating_sub(new_sort_memory); + self.reservation.shrink(mem_to_free); // Mem pool should now only hold the memory for the sorted batches. // Spill sorted state to disk let spillfile = self .spill_state .spill_manager - .spill_record_batch_by_size_and_return_max_batch_memory( - &sorted, + .spill_record_batch_iter_and_return_max_batch_memory( + sorted.into_iter(), "HashAggSpill", - self.batch_size, )?; - self.reservation.shrink(sort_memory); + // Shrink the remaining memory we allocated + self.reservation + .shrink(sort_memory.saturating_sub(mem_to_free)); match spillfile { Some((spillfile, max_record_batch_memory)) => { self.spill_state.spills.push(SortedSpillFile { diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 4c93c03b342e..2c8bfb32b530 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -477,11 +477,12 @@ mod tests { let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema)); + let row_batches: Vec = + (0..batch1.num_rows()).map(|i| batch1.slice(i, 1)).collect(); let (spill_file, max_batch_mem) = spill_manager - .spill_record_batch_by_size_and_return_max_batch_memory( - &batch1, + .spill_record_batch_iter_and_return_max_batch_memory( + row_batches.iter(), "Test Spill", - 1, )? .unwrap(); assert!(spill_file.path().exists()); @@ -731,7 +732,7 @@ mod tests { let completed_file = spill_manager.spill_record_batch_and_finish(&[], "Test")?; assert!(completed_file.is_none()); - // Test write empty batch with interface `spill_record_batch_by_size_and_return_max_batch_memory()` + // Test write empty batch with interface `spill_record_batch_iter_and_return_max_batch_memory()` let empty_batch = RecordBatch::try_new( Arc::clone(&schema), vec![ @@ -740,10 +741,9 @@ mod tests { ], )?; let completed_file = spill_manager - .spill_record_batch_by_size_and_return_max_batch_memory( - &empty_batch, + .spill_record_batch_iter_and_return_max_batch_memory( + std::iter::once(&empty_batch), "Test", - 1, )?; assert!(completed_file.is_none()); diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 6d931112ad88..8a5629a03252 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -17,19 +17,20 @@ //! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations. +use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile}; +use crate::coop::cooperative; +use crate::{common::spawn_buffered, metrics::SpillMetrics}; use arrow::array::StringViewArray; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use datafusion_common::utils::memory::get_record_batch_memory_size; use datafusion_common::{Result, config::SpillCompression}; use datafusion_execution::SendableRecordBatchStream; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::runtime_env::RuntimeEnv; +use std::borrow::Borrow; use std::sync::Arc; -use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile}; -use crate::coop::cooperative; -use crate::{common::spawn_buffered, metrics::SpillMetrics}; - /// The `SpillManager` is responsible for the following tasks: /// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. /// - Updating the associated metrics. @@ -109,38 +110,26 @@ impl SpillManager { in_progress_file.finish() } - /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method - /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. - /// - /// # Errors - /// - Returns an error if spilling would exceed the disk usage limit configured - /// by `max_temp_directory_size` in `DiskManager` - pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory( + /// Spill an iterator of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory + /// Note that this expects the caller to provide *non-sliced* batches, so the memory calculation of each batch is accurate. + pub(crate) fn spill_record_batch_iter_and_return_max_batch_memory( &self, - batch: &RecordBatch, + iter: impl Iterator>, request_description: &str, - row_limit: usize, ) -> Result> { - let total_rows = batch.num_rows(); - let mut batches = Vec::new(); - let mut offset = 0; - - // It's ok to calculate all slices first, because slicing is zero-copy. - while offset < total_rows { - let length = std::cmp::min(total_rows - offset, row_limit); - let sliced_batch = batch.slice(offset, length); - batches.push(sliced_batch); - offset += length; - } - let mut in_progress_file = self.create_in_progress_file(request_description)?; let mut max_record_batch_size = 0; - for batch in batches { - in_progress_file.append_batch(&batch)?; + for batch in iter { + let borrowed = batch.borrow(); + if borrowed.num_rows() == 0 { + continue; + } + in_progress_file.append_batch(borrowed)?; - max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?); + max_record_batch_size = + max_record_batch_size.max(get_record_batch_memory_size(borrowed)); } let file = in_progress_file.finish()?; From ab2cb8f7e22b5df19aabd746039153a53111fec3 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 15:24:23 +0200 Subject: [PATCH 05/14] use try_for_each --- datafusion/physical-plan/src/spill/spill_manager.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 8a5629a03252..91d9b0461615 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -24,7 +24,7 @@ use arrow::array::StringViewArray; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::utils::memory::get_record_batch_memory_size; -use datafusion_common::{Result, config::SpillCompression}; +use datafusion_common::{DataFusionError, Result, config::SpillCompression}; use datafusion_execution::SendableRecordBatchStream; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::runtime_env::RuntimeEnv; @@ -114,23 +114,24 @@ impl SpillManager { /// Note that this expects the caller to provide *non-sliced* batches, so the memory calculation of each batch is accurate. pub(crate) fn spill_record_batch_iter_and_return_max_batch_memory( &self, - iter: impl Iterator>, + mut iter: impl Iterator>, request_description: &str, ) -> Result> { let mut in_progress_file = self.create_in_progress_file(request_description)?; let mut max_record_batch_size = 0; - for batch in iter { + iter.try_for_each(|batch| { let borrowed = batch.borrow(); if borrowed.num_rows() == 0 { - continue; + return Ok(()); } in_progress_file.append_batch(borrowed)?; max_record_batch_size = max_record_batch_size.max(get_record_batch_memory_size(borrowed)); - } + Result::<_, DataFusionError>::Ok(()) + })?; let file = in_progress_file.finish()?; From 22f29dd9c2cfe2183427c50f8df8c492e99e2d90 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 21:05:32 +0200 Subject: [PATCH 06/14] Everything passes now, row_hash is much more robust --- .../physical-plan/src/aggregates/row_hash.rs | 132 +++++++++++++----- datafusion/physical-plan/src/sorts/mod.rs | 2 + datafusion/physical-plan/src/sorts/stream.rs | 96 ++++++++++++- datafusion/physical-plan/src/spill/mod.rs | 4 +- .../physical-plan/src/spill/spill_manager.rs | 3 +- 5 files changed, 196 insertions(+), 41 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 7416c9a71ab6..162695d16d43 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -40,7 +40,7 @@ use arrow::array::*; use arrow::datatypes::SchemaRef; use datafusion_common::{ DataFusionError, Result, assert_eq_or_internal_err, assert_or_internal_err, - internal_err, + internal_err, resources_datafusion_err, }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::proxy::VecAllocExt; @@ -51,6 +51,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use crate::sorts::IncrementingSortIterator; use datafusion_common::instant::Instant; use datafusion_common::utils::memory::get_record_batch_memory_size; use futures::ready; @@ -1049,10 +1050,27 @@ impl GroupedHashAggregateStream { fn update_memory_reservation(&mut self) -> Result<()> { let acc = self.accumulators.iter().map(|x| x.size()).sum::(); - let new_size = acc + let groups_and_acc_size = acc + self.group_values.size() + self.group_ordering.size() + self.current_group_indices.allocated_size(); + + // Reserve extra headroom for sorting during potential spill. + // When OOM triggers, group_aggregate_batch has already processed the + // latest input batch, so the internal state may have grown well beyond + // the last successful reservation. The emit batch reflects this larger + // actual state, and the sort needs memory proportional to it. + // By reserving headroom equal to the data size, we trigger OOM earlier + // (before too much data accumulates), ensuring the freed reservation + // after clear_shrink is sufficient to cover the sort memory. + let sort_headroom = + if self.oom_mode == OutOfMemoryMode::Spill && !self.group_values.is_empty() { + acc + self.group_values.size() + } else { + 0 + }; + + let new_size = groups_and_acc_size + sort_headroom; let reservation_result = self.reservation.try_resize(new_size); if reservation_result.is_ok() { @@ -1114,40 +1132,82 @@ impl GroupedHashAggregateStream { // Free accumulated state now that data has been emitted into `emit`. // This must happen before reserving sort memory so the pool has room. - self.clear_shrink(self.batch_size); + // Use 0 to minimize allocated capacity and maximize memory available for sorting. + self.clear_shrink(0); self.update_memory_reservation()?; + // Our first strategy is to simply sort the batch eagerly, this requires the most peak memory(about 2x of the batch), + // but will also mean we can immediately drop the unsorted batch and free its memory. let sort_memory = get_reserved_bytes_for_record_batch(&emit)?; - self.reservation.try_grow(sort_memory).map_err(|e| { - DataFusionError::ResourcesExhausted(format!( - "Failed to reserve memory for sort during spill: {e}" - )) - })?; - - // Sort the batch into chunks and spill each chunk to disk. This ensures we don't have to hold the entire - // batch in memory until the end of the spill, and can both drop it and release memory pressure sooner, - // as spilling may take time and other things can happen in the background. - let sorted = - sort_batch_chunked(&emit, &self.spill_state.spill_expr, self.batch_size)?; - drop(emit); - - let new_sort_memory: usize = - sorted.iter().map(get_record_batch_memory_size).sum(); - let mem_to_free = sort_memory.saturating_sub(new_sort_memory); - self.reservation.shrink(mem_to_free); // Mem pool should now only hold the memory for the sorted batches. - - // Spill sorted state to disk - let spillfile = self - .spill_state - .spill_manager - .spill_record_batch_iter_and_return_max_batch_memory( - sorted.into_iter(), - "HashAggSpill", - )?; - - // Shrink the remaining memory we allocated - self.reservation - .shrink(sort_memory.saturating_sub(mem_to_free)); + let spillfile = match self.reservation.try_grow(sort_memory) { + Ok(_) => { + // Sort the batch into chunks and spill each chunk to disk. This ensures we don't have to hold the entire + // batch in memory until the end of the spill, and can both drop it and release memory pressure sooner, + // as spilling may take time and other things can happen in the background. + let sorted = sort_batch_chunked( + &emit, + &self.spill_state.spill_expr, + self.batch_size, + )?; + drop(emit); + + let new_sort_memory: usize = + sorted.iter().map(get_record_batch_memory_size).sum(); + let mem_to_free = sort_memory.saturating_sub(new_sort_memory); + self.reservation.shrink(mem_to_free); // Mem pool should now only hold the memory for the sorted batches. + + // Spill sorted state to disk + let spillfile = self + .spill_state + .spill_manager + .spill_record_batch_iter_and_return_max_batch_memory( + sorted.into_iter().map(Ok), + "HashAggSpill", + )?; + + // Shrink the remaining memory we allocated + self.reservation + .shrink(sort_memory.saturating_sub(mem_to_free)); + + spillfile + } + Err(_) => { + // However, if we don't have that peak memory, we can fallback to sorting lazily. + // This means we hold the original batch for longer, but we only require reserving the original batch's size, + // plus a fraction of it for each batch as we emit it and write it to file. + // this is still 2x the batch memory at the *worst case*, but the larger the batch, the smaller the fraction. + let batch_size_ratio = self.batch_size as f32 / emit.num_rows() as f32; + let batch_memory = get_record_batch_memory_size(&emit); + let sort_memory = + batch_memory + (batch_memory as f32 * batch_size_ratio) as usize; + + // If we can't grow even that, we have no choice but to return an error since we can't spill to disk without sorting the data first. + self.reservation.try_grow(sort_memory).map_err(|err| { + resources_datafusion_err!( + "Failed to reserver memory for sort during spill: {err}" + ) + })?; + + let sorted_iter = IncrementingSortIterator::new( + emit, + self.spill_state.spill_expr.clone(), + self.batch_size, + ); + let spillfile = self + .spill_state + .spill_manager + .spill_record_batch_iter_and_return_max_batch_memory( + sorted_iter, + "HashAggSpill", + )?; + + // Shrink the memory we allocated for sorting as the sorting is fully done at this point. + self.reservation.shrink(sort_memory); + + spillfile + } + }; + match spillfile { Some((spillfile, max_record_batch_memory)) => { self.spill_state.spills.push(SortedSpillFile { @@ -1165,14 +1225,14 @@ impl GroupedHashAggregateStream { Ok(()) } - /// Clear memory and shirk capacities to the size of the batch. + /// Clear memory and shrink capacities to the given number of rows. fn clear_shrink(&mut self, num_rows: usize) { self.group_values.clear_shrink(num_rows); self.current_group_indices.clear(); self.current_group_indices.shrink_to(num_rows); } - /// Clear memory and shirk capacities to zero. + /// Clear memory and shrink capacities to zero. fn clear_all(&mut self) { self.clear_shrink(0); } @@ -1211,7 +1271,7 @@ impl GroupedHashAggregateStream { // instead. // Spilling to disk and reading back also ensures batch size is consistent // rather than potentially having one significantly larger last batch. - self.spill()?; // TODO: use sort_batch_chunked instead? + self.spill()?; // Mark that we're switching to stream merging mode. self.spill_state.is_stream_merging = true; diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index 9c72e34fe343..3eee6b9606a7 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -26,3 +26,5 @@ pub mod sort; pub mod sort_preserving_merge; mod stream; pub mod streaming_merge; + +pub(crate) use stream::IncrementingSortIterator; diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 779511a865b6..588b88be1745 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -18,16 +18,20 @@ use crate::SendableRecordBatchStream; use crate::sorts::cursor::{ArrayValues, CursorArray, RowValues}; use crate::{PhysicalExpr, PhysicalSortExpr}; -use arrow::array::Array; +use arrow::array::{Array, UInt32Array}; +use arrow::compute::take_record_batch; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; +use arrow_ord::sort::lexsort_to_indices; use datafusion_common::{Result, internal_datafusion_err}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::stream::{Fuse, StreamExt}; +use std::iter::FusedIterator; use std::marker::PhantomData; +use std::mem; use std::sync::Arc; use std::task::{Context, Poll, ready}; @@ -103,7 +107,7 @@ impl ReusableRows { self.inner[stream_idx][1] = Some(Arc::clone(rows)); // swap the current with the previous one, so that the next poll can reuse the Rows from the previous poll let [a, b] = &mut self.inner[stream_idx]; - std::mem::swap(a, b); + mem::swap(a, b); } } @@ -276,3 +280,91 @@ impl PartitionedStream for FieldCursorStream { })) } } + +pub(crate) struct IncrementingSortIterator { + batch: RecordBatch, + expressions: LexOrdering, + batch_size: usize, + indices: Option, + cursor: usize, +} + +impl IncrementingSortIterator { + pub(crate) fn new( + batch: RecordBatch, + expressions: LexOrdering, + batch_size: usize, + ) -> Self { + Self { + batch, + expressions, + batch_size, + cursor: 0, + indices: None, + } + } +} + +impl Iterator for IncrementingSortIterator { + type Item = Result; + + fn next(&mut self) -> Option { + if self.cursor >= self.batch.num_rows() { + return None; + } + + match self.indices.as_ref() { + None => { + let sort_columns = match self + .expressions + .iter() + .map(|expr| expr.evaluate_to_sort_column(&self.batch)) + .collect::>>() + { + Ok(cols) => cols, + Err(e) => return Some(Err(e)), + }; + + let indices = match lexsort_to_indices(&sort_columns, None) { + Ok(indices) => indices, + Err(e) => return Some(Err(e.into())), + }; + self.indices = Some(indices); + + // Call again, this time it will hit the Some(indices) branch and return the first batch + self.next() + } + Some(indices) => { + let batch_size = self.batch_size.min(self.batch.num_rows() - self.cursor); + + // Perform the take to produce the next batch + let new_batch_indices = indices.slice(self.cursor, batch_size); + let new_batch = match take_record_batch(&self.batch, &new_batch_indices) { + Ok(batch) => batch, + Err(e) => return Some(Err(e.into())), + }; + + self.cursor += batch_size; + + // If this is the last batch, we can release the memory + if self.cursor >= self.batch.num_rows() { + let schema = self.batch.schema(); + let _ = mem::replace(&mut self.batch, RecordBatch::new_empty(schema)); + self.indices = None; + } + + // Return the new batch + Some(Ok(new_batch)) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let num_rows = self.batch.num_rows(); + let batch_size = self.batch_size; + let num_batches = num_rows.div_ceil(batch_size); + (num_batches, Some(num_batches)) + } +} + +impl FusedIterator for IncrementingSortIterator {} diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 2c8bfb32b530..f6ce546a4223 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -481,7 +481,7 @@ mod tests { (0..batch1.num_rows()).map(|i| batch1.slice(i, 1)).collect(); let (spill_file, max_batch_mem) = spill_manager .spill_record_batch_iter_and_return_max_batch_memory( - row_batches.iter(), + row_batches.iter().map(Ok), "Test Spill", )? .unwrap(); @@ -742,7 +742,7 @@ mod tests { )?; let completed_file = spill_manager .spill_record_batch_iter_and_return_max_batch_memory( - std::iter::once(&empty_batch), + std::iter::once(Ok(&empty_batch)), "Test", )?; assert!(completed_file.is_none()); diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 91d9b0461615..07ba6d3989bc 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -114,7 +114,7 @@ impl SpillManager { /// Note that this expects the caller to provide *non-sliced* batches, so the memory calculation of each batch is accurate. pub(crate) fn spill_record_batch_iter_and_return_max_batch_memory( &self, - mut iter: impl Iterator>, + mut iter: impl Iterator>>, request_description: &str, ) -> Result> { let mut in_progress_file = self.create_in_progress_file(request_description)?; @@ -122,6 +122,7 @@ impl SpillManager { let mut max_record_batch_size = 0; iter.try_for_each(|batch| { + let batch = batch?; let borrowed = batch.borrow(); if borrowed.num_rows() == 0 { return Ok(()); From d2efbb0dc2c1c087c1fd0f9bca7b6f9074034f0f Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 12 Feb 2026 21:09:44 +0200 Subject: [PATCH 07/14] fix typo --- datafusion/physical-plan/src/aggregates/row_hash.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 162695d16d43..a2e713fd4f63 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1184,7 +1184,7 @@ impl GroupedHashAggregateStream { // If we can't grow even that, we have no choice but to return an error since we can't spill to disk without sorting the data first. self.reservation.try_grow(sort_memory).map_err(|err| { resources_datafusion_err!( - "Failed to reserver memory for sort during spill: {err}" + "Failed to reserve memory for sort during spill: {err}" ) })?; From 1b68d8573e672e322a0c723b0c40119f55bc5f91 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 16 Feb 2026 12:27:26 +0200 Subject: [PATCH 08/14] rename, add docs --- .../physical-plan/src/aggregates/row_hash.rs | 4 ++-- datafusion/physical-plan/src/sorts/mod.rs | 2 +- datafusion/physical-plan/src/sorts/stream.rs | 22 +++++++++++++++---- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a2e713fd4f63..d969422b9def 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -51,7 +51,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use crate::sorts::IncrementingSortIterator; +use crate::sorts::IncrementalSortIterator; use datafusion_common::instant::Instant; use datafusion_common::utils::memory::get_record_batch_memory_size; use futures::ready; @@ -1188,7 +1188,7 @@ impl GroupedHashAggregateStream { ) })?; - let sorted_iter = IncrementingSortIterator::new( + let sorted_iter = IncrementalSortIterator::new( emit, self.spill_state.spill_expr.clone(), self.batch_size, diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index 3eee6b9606a7..a73872a175b9 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -27,4 +27,4 @@ pub mod sort_preserving_merge; mod stream; pub mod streaming_merge; -pub(crate) use stream::IncrementingSortIterator; +pub(crate) use stream::IncrementalSortIterator; diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 588b88be1745..5dc0c5682481 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -281,7 +281,21 @@ impl PartitionedStream for FieldCursorStream { } } -pub(crate) struct IncrementingSortIterator { +/// A lazy, memory-efficient sort iterator used as a fallback during aggregate +/// spill when there is not enough memory for an eager sort (which requires ~2x +/// peak memory to hold both the unsorted and sorted copies simultaneously). +/// +/// On the first call to `next()`, a sorted index array (`UInt32Array`) is +/// computed via `lexsort_to_indices`. Subsequent calls yield chunks of +/// `batch_size` rows by `take`-ing from the original batch using slices of +/// this index array. Each `take` copies data for the chunk (not zero-copy), +/// but only one chunk is live at a time since the caller consumes it before +/// requesting the next. Once all rows have been yielded, the original batch +/// and index array are dropped to free memory. +/// +/// The caller must reserve `sizeof(batch) + sizeof(one chunk)` for this iterator, +/// and free the reservation once the iterator is depleted. +pub(crate) struct IncrementalSortIterator { batch: RecordBatch, expressions: LexOrdering, batch_size: usize, @@ -289,7 +303,7 @@ pub(crate) struct IncrementingSortIterator { cursor: usize, } -impl IncrementingSortIterator { +impl IncrementalSortIterator { pub(crate) fn new( batch: RecordBatch, expressions: LexOrdering, @@ -305,7 +319,7 @@ impl IncrementingSortIterator { } } -impl Iterator for IncrementingSortIterator { +impl Iterator for IncrementalSortIterator { type Item = Result; fn next(&mut self) -> Option { @@ -367,4 +381,4 @@ impl Iterator for IncrementingSortIterator { } } -impl FusedIterator for IncrementingSortIterator {} +impl FusedIterator for IncrementalSortIterator {} From ebae692235cf89feb41f089e9d630f52e1a086e2 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 23 Feb 2026 15:11:25 +0200 Subject: [PATCH 09/14] Add test to guarantee notification in case of slice usage --- datafusion/physical-plan/src/sorts/stream.rs | 61 ++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 5dc0c5682481..0e19ab9e50b1 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -382,3 +382,64 @@ impl Iterator for IncrementalSortIterator { } impl FusedIterator for IncrementalSortIterator {} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field}; + use datafusion_common::DataFusionError; + use datafusion_physical_expr::expressions::col; + + /// Verifies that `take_record_batch` in `IncrementalSortIterator` actually + /// copies the data into a new allocation rather than returning a zero-copy + /// slice of the original batch. If the output arrays were slices, their + /// underlying buffer length would match the original array's length; a true + /// copy will have a buffer sized to fit only the chunk. + #[test] + fn incremental_sort_iterator_copies_data() -> Result<()> { + let original_len = 10; + let batch_size = 3; + + // Build a batch with a single Int32 column of descending values + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a: Int32Array = (0..original_len as i32).rev().collect(); + let batch = RecordBatch::try_new(schema, vec![Arc::new(col_a)])?; + + // Sort ascending on column "a" + let expressions = LexOrdering::new(vec![PhysicalSortExpr::new_default(col( + "a", + &batch.schema(), + )?)]) + .unwrap(); + + let mut total_rows = 0; + IncrementalSortIterator::new(batch, expressions, batch_size).try_for_each( + |result| { + let chunk = result?; + total_rows += chunk.num_rows(); + + // Every output column must be a fresh allocation whose length + // equals the chunk size, NOT the original array length. + chunk.columns().iter().for_each(|arr| { + assert_eq!( + arr.len(), + chunk.num_rows(), + "array len should equal chunk row count" + ); + assert_ne!( + arr.len(), + original_len, + "array len must differ from original batch length \ + (data should be copied, not sliced)" + ); + }); + + Result::<_, DataFusionError>::Ok(()) + }, + )?; + + assert_eq!(total_rows, original_len); + Ok(()) + } +} From 4d24cd9e53c74821ff626dab57907e05876bbd93 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 23 Feb 2026 16:39:35 +0200 Subject: [PATCH 10/14] Remove previous logic in favor of incremental one in agg, fix test --- .../physical-plan/src/aggregates/row_hash.rs | 95 +++++-------------- datafusion/physical-plan/src/sorts/stream.rs | 25 ++--- 2 files changed, 34 insertions(+), 86 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index d969422b9def..42292517ea37 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -30,7 +30,6 @@ use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; -use crate::sorts::sort::{get_reserved_bytes_for_record_batch, sort_batch_chunked}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::SpillManager; use crate::{PhysicalExpr, aggregates, metrics}; @@ -1136,77 +1135,33 @@ impl GroupedHashAggregateStream { self.clear_shrink(0); self.update_memory_reservation()?; - // Our first strategy is to simply sort the batch eagerly, this requires the most peak memory(about 2x of the batch), - // but will also mean we can immediately drop the unsorted batch and free its memory. - let sort_memory = get_reserved_bytes_for_record_batch(&emit)?; - let spillfile = match self.reservation.try_grow(sort_memory) { - Ok(_) => { - // Sort the batch into chunks and spill each chunk to disk. This ensures we don't have to hold the entire - // batch in memory until the end of the spill, and can both drop it and release memory pressure sooner, - // as spilling may take time and other things can happen in the background. - let sorted = sort_batch_chunked( - &emit, - &self.spill_state.spill_expr, - self.batch_size, - )?; - drop(emit); - - let new_sort_memory: usize = - sorted.iter().map(get_record_batch_memory_size).sum(); - let mem_to_free = sort_memory.saturating_sub(new_sort_memory); - self.reservation.shrink(mem_to_free); // Mem pool should now only hold the memory for the sorted batches. - - // Spill sorted state to disk - let spillfile = self - .spill_state - .spill_manager - .spill_record_batch_iter_and_return_max_batch_memory( - sorted.into_iter().map(Ok), - "HashAggSpill", - )?; - - // Shrink the remaining memory we allocated - self.reservation - .shrink(sort_memory.saturating_sub(mem_to_free)); + let batch_size_ratio = self.batch_size as f32 / emit.num_rows() as f32; + let batch_memory = get_record_batch_memory_size(&emit); + let sort_memory = + batch_memory + (batch_memory as f32 * batch_size_ratio) as usize; - spillfile - } - Err(_) => { - // However, if we don't have that peak memory, we can fallback to sorting lazily. - // This means we hold the original batch for longer, but we only require reserving the original batch's size, - // plus a fraction of it for each batch as we emit it and write it to file. - // this is still 2x the batch memory at the *worst case*, but the larger the batch, the smaller the fraction. - let batch_size_ratio = self.batch_size as f32 / emit.num_rows() as f32; - let batch_memory = get_record_batch_memory_size(&emit); - let sort_memory = - batch_memory + (batch_memory as f32 * batch_size_ratio) as usize; - - // If we can't grow even that, we have no choice but to return an error since we can't spill to disk without sorting the data first. - self.reservation.try_grow(sort_memory).map_err(|err| { - resources_datafusion_err!( - "Failed to reserve memory for sort during spill: {err}" - ) - })?; - - let sorted_iter = IncrementalSortIterator::new( - emit, - self.spill_state.spill_expr.clone(), - self.batch_size, - ); - let spillfile = self - .spill_state - .spill_manager - .spill_record_batch_iter_and_return_max_batch_memory( - sorted_iter, - "HashAggSpill", - )?; - - // Shrink the memory we allocated for sorting as the sorting is fully done at this point. - self.reservation.shrink(sort_memory); + // If we can't grow even that, we have no choice but to return an error since we can't spill to disk without sorting the data first. + self.reservation.try_grow(sort_memory).map_err(|err| { + resources_datafusion_err!( + "Failed to reserve memory for sort during spill: {err}" + ) + })?; - spillfile - } - }; + let sorted_iter = IncrementalSortIterator::new( + emit, + self.spill_state.spill_expr.clone(), + self.batch_size, + ); + let spillfile = self + .spill_state + .spill_manager + .spill_record_batch_iter_and_return_max_batch_memory( + sorted_iter, + "HashAggSpill", + )?; + + // Shrink the memory we allocated for sorting as the sorting is fully done at this point. + self.reservation.shrink(sort_memory); match spillfile { Some((spillfile, max_record_batch_memory)) => { diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 0e19ab9e50b1..d023cdc1d710 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -386,8 +386,8 @@ impl FusedIterator for IncrementalSortIterator {} #[cfg(test)] mod tests { use super::*; - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Field}; + use arrow::array::{AsArray, Int32Array}; + use arrow::datatypes::{DataType, Field, Int32Type}; use datafusion_common::DataFusionError; use datafusion_physical_expr::expressions::col; @@ -403,7 +403,7 @@ mod tests { // Build a batch with a single Int32 column of descending values let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let col_a: Int32Array = (0..original_len as i32).rev().collect(); + let col_a: Int32Array = Int32Array::from(vec![0; original_len]); let batch = RecordBatch::try_new(schema, vec![Arc::new(col_a)])?; // Sort ascending on column "a" @@ -414,25 +414,18 @@ mod tests { .unwrap(); let mut total_rows = 0; - IncrementalSortIterator::new(batch, expressions, batch_size).try_for_each( + IncrementalSortIterator::new(batch.clone(), expressions, batch_size).try_for_each( |result| { let chunk = result?; total_rows += chunk.num_rows(); // Every output column must be a fresh allocation whose length // equals the chunk size, NOT the original array length. - chunk.columns().iter().for_each(|arr| { - assert_eq!( - arr.len(), - chunk.num_rows(), - "array len should equal chunk row count" - ); - assert_ne!( - arr.len(), - original_len, - "array len must differ from original batch length \ - (data should be copied, not sliced)" - ); + chunk.columns().iter().zip(batch.columns()).for_each(|(arr, original_arr)| { + let (_, scalar_buf, _) = arr.as_primitive::().clone().into_parts(); + let (_, original_scalar_buf, _) = original_arr.as_primitive::().clone().into_parts(); + + assert!(!scalar_buf.inner().ptr_eq(original_scalar_buf.inner()), "Expected a copy of the data for each chunk, but got a slice that shares the same buffer as the original array"); }); Result::<_, DataFusionError>::Ok(()) From 38691252929033f2755de50538bb2be6849e574c Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 23 Feb 2026 16:47:46 +0200 Subject: [PATCH 11/14] suffering driven development --- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index d023cdc1d710..ff7f259dd134 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -425,7 +425,7 @@ mod tests { let (_, scalar_buf, _) = arr.as_primitive::().clone().into_parts(); let (_, original_scalar_buf, _) = original_arr.as_primitive::().clone().into_parts(); - assert!(!scalar_buf.inner().ptr_eq(original_scalar_buf.inner()), "Expected a copy of the data for each chunk, but got a slice that shares the same buffer as the original array"); + assert_ne!(scalar_buf.inner().data_ptr(), original_scalar_buf.inner().data_ptr(), "Expected a copy of the data for each chunk, but got a slice that shares the same buffer as the original array"); }); Result::<_, DataFusionError>::Ok(()) From e6110264ba16e23f8bf043116ca18ec976c2c2d9 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 23 Feb 2026 16:58:50 +0200 Subject: [PATCH 12/14] use IncrementalSortIterator --- datafusion/physical-plan/src/sorts/sort.rs | 35 ++-------------------- 1 file changed, 2 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 0315af023fd2..66dbabe18c04 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -37,6 +37,7 @@ use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, }; use crate::projection::{ProjectionExec, make_with_child, update_ordering}; +use crate::sorts::IncrementalSortIterator; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; @@ -726,7 +727,6 @@ impl ExternalSorter { // Sort the batch immediately and get all output batches let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?; - drop(batch); // Free the old reservation and grow it to match the actual sorted output size reservation.free(); @@ -900,38 +900,7 @@ pub fn sort_batch_chunked( expressions: &LexOrdering, batch_size: usize, ) -> Result> { - let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch)) - .collect::>>()?; - - let indices = lexsort_to_indices(&sort_columns, None)?; - - // Split indices into chunks of batch_size - let num_rows = indices.len(); - let num_chunks = num_rows.div_ceil(batch_size); - - let result_batches = (0..num_chunks) - .map(|chunk_idx| { - let start = chunk_idx * batch_size; - let end = (start + batch_size).min(num_rows); - let chunk_len = end - start; - - // Create a slice of indices for this chunk - let chunk_indices = indices.slice(start, chunk_len); - - // Take the columns using this chunk of indices - let columns = take_arrays(batch.columns(), &chunk_indices, None)?; - - let options = RecordBatchOptions::new().with_row_count(Some(chunk_len)); - let chunk_batch = - RecordBatch::try_new_with_options(batch.schema(), columns, &options)?; - - Ok(chunk_batch) - }) - .collect::>>()?; - - Ok(result_batches) + IncrementalSortIterator::new(batch.clone(), expressions.clone(), batch_size).collect() } /// Sort execution plan. From 2ca0be81c6f7c6d359c8a31e6ebe50206c973c5c Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 23 Feb 2026 17:26:24 +0200 Subject: [PATCH 13/14] use limits and better reservation --- datafusion/physical-plan/src/aggregates/row_hash.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 42292517ea37..dfcf76d34271 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -31,7 +31,7 @@ use crate::aggregates::{ }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; -use crate::spill::spill_manager::SpillManager; +use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; use crate::{PhysicalExpr, aggregates, metrics}; use crate::{RecordBatchStream, SendableRecordBatchStream}; @@ -1137,8 +1137,9 @@ impl GroupedHashAggregateStream { let batch_size_ratio = self.batch_size as f32 / emit.num_rows() as f32; let batch_memory = get_record_batch_memory_size(&emit); - let sort_memory = - batch_memory + (batch_memory as f32 * batch_size_ratio) as usize; + let sort_memory = (batch_memory + + (emit.get_sliced_size()? as f32 * batch_size_ratio) as usize) + .min(batch_memory * 2); // If we can't grow even that, we have no choice but to return an error since we can't spill to disk without sorting the data first. self.reservation.try_grow(sort_memory).map_err(|err| { From 5db82ae4e175539ec6c32891d6a4255508b26b3d Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 23 Feb 2026 17:30:49 +0200 Subject: [PATCH 14/14] new memory calculation, add comment --- datafusion/physical-plan/src/aggregates/row_hash.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index dfcf76d34271..1bab3f649bdf 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1137,6 +1137,11 @@ impl GroupedHashAggregateStream { let batch_size_ratio = self.batch_size as f32 / emit.num_rows() as f32; let batch_memory = get_record_batch_memory_size(&emit); + // The maximum worst case for a sort is 2X the original underlying buffers(regardless of slicing) + // First we get the underlying buffers' size, then we get the sliced("actual") size of the batch, + // and multiply it by the ratio of batch_size to actual size to get the estimated memory needed for sorting the batch. + // If something goes wrong in get_sliced_size()(double counting or something), + // we fall back to the worst case. let sort_memory = (batch_memory + (emit.get_sliced_size()? as f32 * batch_size_ratio) as usize) .min(batch_memory * 2);