diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs index e42c8073ae1c0..40c8f7634c8c4 100644 --- a/datafusion/physical-plan/benches/spill_io.rs +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -115,8 +115,9 @@ fn bench_spill_io(c: &mut Criterion) { // - Wait for the consumer to finish processing |spill_file| { rt.block_on(async { - let stream = - spill_manager.read_spill_as_stream(spill_file).unwrap(); + let stream = spill_manager + .read_spill_as_stream(spill_file, None) + .unwrap(); let _ = collect(stream).await.unwrap(); }) }, @@ -519,8 +520,9 @@ fn benchmark_spill_batches_for_all_codec( ) .unwrap() .unwrap(); - let stream = - spill_manager.read_spill_as_stream(spill_file).unwrap(); + let stream = spill_manager + .read_spill_as_stream(spill_file, None) + .unwrap(); let _ = collect(stream).await.unwrap(); }) }, @@ -553,7 +555,9 @@ fn benchmark_spill_batches_for_all_codec( let rt = Runtime::new().unwrap(); let start = Instant::now(); rt.block_on(async { - let stream = spill_manager.read_spill_as_stream(spill_file).unwrap(); + let stream = spill_manager + .read_spill_as_stream(spill_file, None) + .unwrap(); let _ = collect(stream).await.unwrap(); }); let read_time = start.elapsed(); diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index bb6fc751b8974..58d046cc90911 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -237,7 +237,8 @@ impl MultiLevelMergeBuilder { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - self.spill_manager.read_spill_as_stream(spill_file.file) + self.spill_manager + .read_spill_as_stream(spill_file.file, None) } // Only in memory streams, so merge them all in a single pass @@ -274,10 +275,12 @@ impl MultiLevelMergeBuilder { .spill_manager .clone() .with_batch_read_buffer_capacity(buffer_size) - .read_spill_as_stream(spill.file)?; + .read_spill_as_stream( + spill.file, + Some(spill.max_record_batch_memory), + )?; sorted_streams.push(stream); } - let merge_sort_stream = self.create_new_merge_sort( sorted_streams, // If we have no sorted spill files left, this is the last run diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index fab62bff840f6..33a7cdbe3f721 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -43,6 +43,7 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::RecordBatchStream; use futures::{FutureExt as _, Stream}; +use log::warn; /// Stream that reads spill files from disk where each batch is read in a spawned blocking task /// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`] @@ -54,8 +55,16 @@ use futures::{FutureExt as _, Stream}; struct SpillReaderStream { schema: SchemaRef, state: SpillReaderStreamState, + /// Maximum memory size observed among spilling sorted record batches. + /// This is used for validation purposes during reading each RecordBatch from spill. + /// For context on why this value is recorded and validated, + /// see `physical_plan/sort/multi_level_merge.rs`. + max_record_batch_memory: Option, } +// Small margin allowed to accommodate slight memory accounting variation +const SPILL_BATCH_MEMORY_MARGIN: usize = 4096; + /// When we poll for the next batch, we will get back both the batch and the reader, /// so we can call `next` again. type NextRecordBatchResult = Result<(StreamReader>, Option)>; @@ -76,10 +85,15 @@ enum SpillReaderStreamState { } impl SpillReaderStream { - fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self { + fn new( + schema: SchemaRef, + spill_file: RefCountedTempFile, + max_record_batch_memory: Option, + ) -> Self { Self { schema, state: SpillReaderStreamState::Uninitialized(spill_file), + max_record_batch_memory, } } @@ -125,6 +139,23 @@ impl SpillReaderStream { Ok((reader, batch)) => { match batch { Some(batch) => { + if let Some(max_record_batch_memory) = + self.max_record_batch_memory + { + let actual_size = + get_record_batch_memory_size(&batch); + if actual_size + > max_record_batch_memory + + SPILL_BATCH_MEMORY_MARGIN + { + warn!( + "Record batch memory usage ({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} bytes) \n\ + by more than the allowed tolerance ({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\ + This likely indicates a bug in memory accounting during spilling.\n\ + Please report this issue in https://github.com/apache/datafusion/issues/17340." + ); + } + } self.state = SpillReaderStreamState::Waiting(reader); Poll::Ready(Some(Ok(batch))) @@ -417,7 +448,7 @@ mod tests { let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), schema); let batches = collect(stream).await?; @@ -481,7 +512,7 @@ mod tests { let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), dict_schema); let batches = collect(stream).await?; assert_eq!(batches.len(), 2); @@ -512,7 +543,7 @@ mod tests { assert!(spill_file.path().exists()); assert!(max_batch_mem > 0); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), schema); let batches = collect(stream).await?; @@ -547,7 +578,7 @@ mod tests { let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), schema); let batches = collect(stream).await?; @@ -931,8 +962,10 @@ mod tests { .spill_record_batch_and_finish(&batches, "Test2")? .unwrap(); - let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?; - let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?; + let mut stream_1 = + spill_manager.read_spill_as_stream(spill_file_1, None)?; + let mut stream_2 = + spill_manager.read_spill_as_stream(spill_file_2, None)?; stream_1.next().await; stream_2.next().await; diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index ad23bd66a021a..cc39102d89819 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -174,10 +174,12 @@ impl SpillManager { pub fn read_spill_as_stream( &self, spill_file_path: RefCountedTempFile, + max_record_batch_memory: Option, ) -> Result { let stream = Box::pin(cooperative(SpillReaderStream::new( Arc::clone(&self.schema), spill_file_path, + max_record_batch_memory, ))); Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))