diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index bc321b227ee52..71feb5555d3d1 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -518,6 +518,23 @@ config_namespace! { /// batches and merged. pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + /// Maximum size in bytes for individual spill files before rotating to a new file. + /// + /// When operators spill data to disk (e.g., RepartitionExec), they write + /// multiple batches to the same file until this size limit is reached, then rotate + /// to a new file. This reduces syscall overhead compared to one-file-per-batch + /// while preventing files from growing too large. + /// + /// A larger value reduces file creation overhead but may hold more disk space. + /// A smaller value creates more files but allows finer-grained space reclamation + /// as files can be deleted once fully consumed. + /// + /// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators + /// may create spill files larger than the limit. + /// + /// Default: 128 MB + pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024 + /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 82f2d75ac1b57..f1c768be5b48c 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -293,11 +293,13 @@ impl DiskManager { let dir_index = rng().random_range(0..local_dirs.len()); Ok(RefCountedTempFile { - _parent_temp_dir: Arc::clone(&local_dirs[dir_index]), - tempfile: Builder::new() - .tempfile_in(local_dirs[dir_index].as_ref()) - .map_err(DataFusionError::IoError)?, - current_file_disk_usage: 0, + parent_temp_dir: Arc::clone(&local_dirs[dir_index]), + tempfile: Arc::new( + Builder::new() + .tempfile_in(local_dirs[dir_index].as_ref()) + .map_err(DataFusionError::IoError)?, + ), + current_file_disk_usage: Arc::new(AtomicU64::new(0)), disk_manager: Arc::clone(self), }) } @@ -311,26 +313,50 @@ impl DiskManager { /// must invoke [`Self::update_disk_usage`] to update the global disk usage counter. /// This ensures the disk manager can properly enforce usage limits configured by /// [`DiskManager::with_max_temp_directory_size`]. +/// +/// This type is Clone-able, allowing multiple references to the same underlying file. +/// The file is deleted only when the last reference is dropped. +/// +/// The parent temporary directory is also kept alive as long as any reference to +/// this file exists, preventing premature cleanup of the directory. +/// +/// Once all references to this file are dropped, the file is deleted, and the +/// disk usage is subtracted from the disk manager's total. #[derive(Debug)] pub struct RefCountedTempFile { /// The reference to the directory in which temporary files are created to ensure /// it is not cleaned up prior to the NamedTempFile - _parent_temp_dir: Arc, - tempfile: NamedTempFile, + parent_temp_dir: Arc, + /// The underlying temporary file, wrapped in Arc to allow cloning + tempfile: Arc, /// Tracks the current disk usage of this temporary file. See /// [`Self::update_disk_usage`] for more details. - current_file_disk_usage: u64, + /// + /// This is wrapped in `Arc` so that all clones share the same + /// disk usage tracking, preventing incorrect accounting when clones are dropped. + current_file_disk_usage: Arc, /// The disk manager that created and manages this temporary file disk_manager: Arc, } +impl Clone for RefCountedTempFile { + fn clone(&self) -> Self { + Self { + parent_temp_dir: Arc::clone(&self.parent_temp_dir), + tempfile: Arc::clone(&self.tempfile), + current_file_disk_usage: Arc::clone(&self.current_file_disk_usage), + disk_manager: Arc::clone(&self.disk_manager), + } + } +} + impl RefCountedTempFile { pub fn path(&self) -> &Path { self.tempfile.path() } pub fn inner(&self) -> &NamedTempFile { - &self.tempfile + self.tempfile.as_ref() } /// Updates the global disk usage counter after modifications to the underlying file. @@ -342,11 +368,14 @@ impl RefCountedTempFile { let metadata = self.tempfile.as_file().metadata()?; let new_disk_usage = metadata.len(); + // Get the old disk usage + let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed); + // Update the global disk usage by: // 1. Subtracting the old file size from the global counter self.disk_manager .used_disk_space - .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed); + .fetch_sub(old_disk_usage, Ordering::Relaxed); // 2. Adding the new file size to the global counter self.disk_manager .used_disk_space @@ -362,23 +391,29 @@ impl RefCountedTempFile { } // 4. Update the local file size tracking - self.current_file_disk_usage = new_disk_usage; + self.current_file_disk_usage + .store(new_disk_usage, Ordering::Relaxed); Ok(()) } pub fn current_disk_usage(&self) -> u64 { - self.current_file_disk_usage + self.current_file_disk_usage.load(Ordering::Relaxed) } } /// When the temporary file is dropped, subtract its disk usage from the disk manager's total impl Drop for RefCountedTempFile { fn drop(&mut self) { - // Subtract the current file's disk usage from the global counter - self.disk_manager - .used_disk_space - .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed); + // Only subtract disk usage when this is the last reference to the file + // Check if we're the last one by seeing if there's only one strong reference + // left to the underlying tempfile (the one we're holding) + if Arc::strong_count(&self.tempfile) == 1 { + let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed); + self.disk_manager + .used_disk_space + .fetch_sub(current_usage, Ordering::Relaxed); + } } } @@ -533,4 +568,190 @@ mod tests { Ok(()) } + + #[test] + fn test_disk_usage_basic() -> Result<()> { + use std::io::Write; + + let dm = Arc::new(DiskManagerBuilder::default().build()?); + let mut temp_file = dm.create_tmp_file("Testing")?; + + // Initially, disk usage should be 0 + assert_eq!(dm.used_disk_space(), 0); + assert_eq!(temp_file.current_disk_usage(), 0); + + // Write some data to the file + temp_file.inner().as_file().write_all(b"hello world")?; + temp_file.update_disk_usage()?; + + // Disk usage should now reflect the written data + let expected_usage = temp_file.current_disk_usage(); + assert!(expected_usage > 0); + assert_eq!(dm.used_disk_space(), expected_usage); + + // Write more data + temp_file.inner().as_file().write_all(b" more data")?; + temp_file.update_disk_usage()?; + + // Disk usage should increase + let new_usage = temp_file.current_disk_usage(); + assert!(new_usage > expected_usage); + assert_eq!(dm.used_disk_space(), new_usage); + + // Drop the file + drop(temp_file); + + // Disk usage should return to 0 + assert_eq!(dm.used_disk_space(), 0); + + Ok(()) + } + + #[test] + fn test_disk_usage_with_clones() -> Result<()> { + use std::io::Write; + + let dm = Arc::new(DiskManagerBuilder::default().build()?); + let mut temp_file = dm.create_tmp_file("Testing")?; + + // Write some data + temp_file.inner().as_file().write_all(b"test data")?; + temp_file.update_disk_usage()?; + + let usage_after_write = temp_file.current_disk_usage(); + assert!(usage_after_write > 0); + assert_eq!(dm.used_disk_space(), usage_after_write); + + // Clone the file + let clone1 = temp_file.clone(); + let clone2 = temp_file.clone(); + + // All clones should see the same disk usage + assert_eq!(clone1.current_disk_usage(), usage_after_write); + assert_eq!(clone2.current_disk_usage(), usage_after_write); + + // Global disk usage should still be the same (not multiplied by number of clones) + assert_eq!(dm.used_disk_space(), usage_after_write); + + // Write more data through one clone + clone1.inner().as_file().write_all(b" more data")?; + let mut mutable_clone1 = clone1; + mutable_clone1.update_disk_usage()?; + + let new_usage = mutable_clone1.current_disk_usage(); + assert!(new_usage > usage_after_write); + + // All clones should see the updated disk usage + assert_eq!(temp_file.current_disk_usage(), new_usage); + assert_eq!(clone2.current_disk_usage(), new_usage); + assert_eq!(mutable_clone1.current_disk_usage(), new_usage); + + // Global disk usage should reflect the new size (not multiplied) + assert_eq!(dm.used_disk_space(), new_usage); + + // Drop one clone + drop(mutable_clone1); + + // Disk usage should NOT change (other clones still exist) + assert_eq!(dm.used_disk_space(), new_usage); + assert_eq!(temp_file.current_disk_usage(), new_usage); + assert_eq!(clone2.current_disk_usage(), new_usage); + + // Drop another clone + drop(clone2); + + // Disk usage should still NOT change (original still exists) + assert_eq!(dm.used_disk_space(), new_usage); + assert_eq!(temp_file.current_disk_usage(), new_usage); + + // Drop the original + drop(temp_file); + + // Now disk usage should return to 0 (last reference dropped) + assert_eq!(dm.used_disk_space(), 0); + + Ok(()) + } + + #[test] + fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> { + use std::io::Write; + + let dm = Arc::new(DiskManagerBuilder::default().build()?); + let mut temp_file = dm.create_tmp_file("Testing")?; + + // Write data + temp_file.inner().as_file().write_all(b"test")?; + temp_file.update_disk_usage()?; + + let usage = temp_file.current_disk_usage(); + assert_eq!(dm.used_disk_space(), usage); + + // Create multiple clones + let clone1 = temp_file.clone(); + let clone2 = temp_file.clone(); + let clone3 = temp_file.clone(); + + // Drop the original first (out of order) + drop(temp_file); + + // Disk usage should still be tracked (clones exist) + assert_eq!(dm.used_disk_space(), usage); + assert_eq!(clone1.current_disk_usage(), usage); + + // Drop clones in different order + drop(clone2); + assert_eq!(dm.used_disk_space(), usage); + + drop(clone1); + assert_eq!(dm.used_disk_space(), usage); + + // Drop the last clone + drop(clone3); + + // Now disk usage should be 0 + assert_eq!(dm.used_disk_space(), 0); + + Ok(()) + } + + #[test] + fn test_disk_usage_multiple_files() -> Result<()> { + use std::io::Write; + + let dm = Arc::new(DiskManagerBuilder::default().build()?); + + // Create multiple temp files + let mut file1 = dm.create_tmp_file("Testing1")?; + let mut file2 = dm.create_tmp_file("Testing2")?; + + // Write to first file + file1.inner().as_file().write_all(b"file1")?; + file1.update_disk_usage()?; + let usage1 = file1.current_disk_usage(); + + assert_eq!(dm.used_disk_space(), usage1); + + // Write to second file + file2.inner().as_file().write_all(b"file2 data")?; + file2.update_disk_usage()?; + let usage2 = file2.current_disk_usage(); + + // Global usage should be sum of both files + assert_eq!(dm.used_disk_space(), usage1 + usage2); + + // Drop first file + drop(file1); + + // Usage should only reflect second file + assert_eq!(dm.used_disk_space(), usage2); + + // Drop second file + drop(file2); + + // Usage should be 0 + assert_eq!(dm.used_disk_space(), 0); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 8174f71c31afa..890113de3d3dd 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -34,11 +34,9 @@ use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; -use crate::repartition::distributor_channels::{ - channels, partition_aware_channels, DistributionReceiver, DistributionSender, -}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; +use crate::spill::spill_pool::{self, SpillPoolWriter}; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; @@ -51,7 +49,6 @@ use datafusion_common::utils::transpose; use datafusion_common::{internal_err, ColumnStatistics, HashMap}; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; -use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; @@ -67,27 +64,101 @@ use log::trace; use parking_lot::Mutex; mod distributor_channels; +use distributor_channels::{ + channels, partition_aware_channels, DistributionReceiver, DistributionSender, +}; -/// A batch in the repartition queue - either in memory or spilled to disk +/// A batch in the repartition queue - either in memory or spilled to disk. +/// +/// This enum represents the two states a batch can be in during repartitioning. +/// The decision to spill is made based on memory availability when sending a batch +/// to an output partition. +/// +/// # Batch Flow with Spilling +/// +/// ```text +/// Input Stream ──▶ Partition Logic ──▶ try_grow() +/// │ +/// ┌───────────────┴────────────────┐ +/// │ │ +/// ▼ ▼ +/// try_grow() succeeds try_grow() fails +/// (Memory Available) (Memory Pressure) +/// │ │ +/// ▼ ▼ +/// RepartitionBatch::Memory spill_writer.push_batch() +/// (batch held in memory) (batch written to disk) +/// │ │ +/// │ ▼ +/// │ RepartitionBatch::Spilled +/// │ (marker - no batch data) +/// │ │ +/// └────────┬───────────────────────┘ +/// │ +/// ▼ +/// Send to channel +/// │ +/// ▼ +/// Output Stream (poll) +/// │ +/// ┌──────────────┴─────────────┐ +/// │ │ +/// ▼ ▼ +/// RepartitionBatch::Memory RepartitionBatch::Spilled +/// Return batch immediately Poll spill_stream (blocks) +/// │ │ +/// └────────┬───────────────────┘ +/// │ +/// ▼ +/// Return batch +/// (FIFO order preserved) +/// ``` +/// +/// See [`RepartitionExec`] for overall architecture and [`StreamState`] for +/// the state machine that handles reading these batches. #[derive(Debug)] enum RepartitionBatch { /// Batch held in memory (counts against memory reservation) Memory(RecordBatch), - /// Batch spilled to disk (one file per batch for queue semantics) - /// File automatically deleted when dropped via reference counting - /// The size field stores the original batch size for validation when reading back - Spilled { - spill_file: RefCountedTempFile, - size: usize, - }, + /// Marker indicating a batch was spilled to the partition's SpillPool. + /// The actual batch can be retrieved by reading from the SpillPoolStream. + /// This variant contains no data itself - it's just a signal to the reader + /// to fetch the next batch from the spill stream. + Spilled, } type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; type InputPartitionsToCurrentPartitionReceiver = Vec>; -/// Channels and resources for a single output partition -#[derive(Debug)] +/// Output channel with its associated memory reservation and spill writer +struct OutputChannel { + sender: DistributionSender, + reservation: SharedMemoryReservation, + spill_writer: SpillPoolWriter, +} + +/// Channels and resources for a single output partition. +/// +/// Each output partition has channels to receive data from all input partitions. +/// To handle memory pressure, each (input, output) pair gets its own +/// [`SpillPool`](crate::spill::spill_pool) channel via [`spill_pool::channel`]. +/// +/// # Structure +/// +/// For an output partition receiving from N input partitions: +/// - `tx`: N senders (one per input) for sending batches to this output +/// - `rx`: N receivers (one per input) for receiving batches at this output +/// - `spill_writers`: N spill writers (one per input) for writing spilled data +/// - `spill_readers`: N spill readers (one per input) for reading spilled data +/// +/// This 1:1 mapping between input partitions and spill channels ensures that +/// batches from each input are processed in FIFO order, even when some batches +/// are spilled to disk and others remain in memory. +/// +/// See [`RepartitionExec`] for the overall N×M architecture. +/// +/// [`spill_pool::channel`]: crate::spill::spill_pool::channel struct PartitionChannels { /// Senders for each input partition to send data to this output partition tx: InputPartitionsToCurrentPartitionSender, @@ -95,20 +166,32 @@ struct PartitionChannels { rx: InputPartitionsToCurrentPartitionReceiver, /// Memory reservation for this output partition reservation: SharedMemoryReservation, - /// Spill manager for handling disk spills for this output partition - spill_manager: Arc, + /// Spill writers for writing spilled data. + /// SpillPoolWriter is Clone, so multiple writers can share state in non-preserve-order mode. + spill_writers: Vec, + /// Spill readers for reading spilled data - one per input partition (FIFO semantics). + /// Each (input, output) pair gets its own reader to maintain proper ordering. + spill_readers: Vec, } -#[derive(Debug)] struct ConsumingInputStreamsState { /// Channels for sending batches from input partitions to output partitions. /// Key is the partition number. channels: HashMap, - /// Helper that ensures that that background job is killed once it is no longer needed. + /// Helper that ensures that background jobs are killed once they are no longer needed. abort_helper: Arc>>, } +impl Debug for ConsumingInputStreamsState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConsumingInputStreamsState") + .field("num_channels", &self.channels.len()) + .field("abort_helper", &self.abort_helper) + .finish() + } +} + /// Inner state of [`RepartitionExec`]. enum RepartitionExecState { /// Not initialized yet. This is the default state stored in the RepartitionExec node @@ -171,6 +254,7 @@ impl RepartitionExecState { Ok(()) } + #[expect(clippy::too_many_arguments)] fn consume_input_streams( &mut self, input: Arc, @@ -179,6 +263,7 @@ impl RepartitionExecState { preserve_order: bool, name: String, context: Arc, + spill_manager: SpillManager, ) -> Result<&mut ConsumingInputStreamsState> { let streams_and_metrics = match self { RepartitionExecState::NotInitialized => { @@ -202,17 +287,19 @@ impl RepartitionExecState { let num_input_partitions = streams_and_metrics.len(); let num_output_partitions = partitioning.partition_count(); + let spill_manager = Arc::new(spill_manager); + let (txs, rxs) = if preserve_order { - let (txs, rxs) = + // Create partition-aware channels with one channel per (input, output) pair + // This provides backpressure while maintaining proper ordering + let (txs_all, rxs_all) = partition_aware_channels(num_input_partitions, num_output_partitions); // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition - let txs = transpose(txs); - let rxs = transpose(rxs); + let txs = transpose(txs_all); + let rxs = transpose(rxs_all); (txs, rxs) } else { - // create one channel per *output* partition - // note we use a custom channel that ensures there is always data for each receiver - // but limits the amount of buffering if required. + // Create one channel per *output* partition with backpressure let (txs, rxs) = channels(num_output_partitions); // Clone sender for each input partitions let txs = txs @@ -230,19 +317,34 @@ impl RepartitionExecState { .with_can_spill(true) .register(context.memory_pool()), )); - let spill_metrics = SpillMetrics::new(&metrics, partition); - let spill_manager = Arc::new(SpillManager::new( - Arc::clone(&context.runtime_env()), - spill_metrics, - input.schema(), - )); + + // Create spill channels based on mode: + // - preserve_order: one spill channel per (input, output) pair for proper FIFO ordering + // - non-preserve-order: one shared spill channel per output partition since all inputs + // share the same receiver + let max_file_size = context + .session_config() + .options() + .execution + .max_spill_file_size_bytes; + let num_spill_channels = if preserve_order { + num_input_partitions + } else { + 1 + }; + let (spill_writers, spill_readers): (Vec<_>, Vec<_>) = (0 + ..num_spill_channels) + .map(|_| spill_pool::channel(max_file_size, Arc::clone(&spill_manager))) + .unzip(); + channels.insert( partition, PartitionChannels { tx, rx, reservation, - spill_manager, + spill_readers, + spill_writers, }, ); } @@ -255,34 +357,38 @@ impl RepartitionExecState { let txs: HashMap<_, _> = channels .iter() .map(|(partition, channels)| { + // In preserve_order mode: each input gets its own spill writer (index i) + // In non-preserve-order mode: all inputs share spill writer 0 via clone + let spill_writer_idx = if preserve_order { i } else { 0 }; ( *partition, - ( - channels.tx[i].clone(), - Arc::clone(&channels.reservation), - Arc::clone(&channels.spill_manager), - ), + OutputChannel { + sender: channels.tx[i].clone(), + reservation: Arc::clone(&channels.reservation), + spill_writer: channels.spill_writers[spill_writer_idx] + .clone(), + }, ) }) .collect(); + // Extract senders for wait_for_task before moving txs + let senders: HashMap<_, _> = txs + .iter() + .map(|(partition, channel)| (*partition, channel.sender.clone())) + .collect(); + let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input( stream, - txs.clone(), + txs, partitioning.clone(), metrics, )); // In a separate task, wait for each input to be done // (and pass along any errors, including panic!s) - let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task( - input_task, - txs.into_iter() - .map(|(partition, (tx, _reservation, _spill_manager))| { - (partition, tx) - }) - .collect(), - )); + let wait_for_task = + SpawnedTask::spawn(RepartitionExec::wait_for_task(input_task, senders)); spawned_tasks.push(wait_for_task); } *self = Self::ConsumingInputStreams(ConsumingInputStreamsState { @@ -511,6 +617,38 @@ impl BatchPartitioner { /// arbitrary interleaving (and thus unordered) unless /// [`Self::with_preserve_order`] specifies otherwise. /// +/// # Spilling Architecture +/// +/// RepartitionExec uses [`SpillPool`](crate::spill::spill_pool) channels to handle +/// memory pressure during repartitioning. Each (input partition, output partition) +/// pair gets its own SpillPool channel for FIFO ordering. +/// +/// ```text +/// Input Partitions (N) Output Partitions (M) +/// ──────────────────── ───────────────────── +/// +/// Input 0 ──┐ ┌──▶ Output 0 +/// │ ┌──────────────┐ │ +/// ├─▶│ SpillPool │────┤ +/// │ │ [In0→Out0] │ │ +/// Input 1 ──┤ └──────────────┘ ├──▶ Output 1 +/// │ │ +/// │ ┌──────────────┐ │ +/// ├─▶│ SpillPool │────┤ +/// │ │ [In1→Out0] │ │ +/// Input 2 ──┤ └──────────────┘ ├──▶ Output 2 +/// │ │ +/// │ ... (N×M SpillPools total) +/// │ │ +/// │ ┌──────────────┐ │ +/// └─▶│ SpillPool │────┘ +/// │ [InN→OutM] │ +/// └──────────────┘ +/// +/// Each SpillPool maintains FIFO order for its (input, output) pair. +/// See `RepartitionBatch` for details on the memory/spill decision logic. +/// ``` +/// /// # Footnote /// /// The "Exchange Operator" was first described in the 1989 paper @@ -590,7 +728,7 @@ impl RepartitionExec { &self.cache.partitioning } - /// Get preserve_order flag of the RepartitionExecutor + /// Get preserve_order flag of the RepartitionExec /// `true` means `SortPreservingRepartitionExec`, `false` means `RepartitionExec` pub fn preserve_order(&self) -> bool { self.preserve_order @@ -696,6 +834,8 @@ impl ExecutionPlan for RepartitionExec { partition ); + let spill_metrics = SpillMetrics::new(&self.metrics, partition); + let input = Arc::clone(&self.input); let partitioning = self.partitioning().clone(); let metrics = self.metrics.clone(); @@ -704,6 +844,12 @@ impl ExecutionPlan for RepartitionExec { let schema = self.schema(); let schema_captured = Arc::clone(&schema); + let spill_manager = SpillManager::new( + Arc::clone(&context.runtime_env()), + spill_metrics, + input.schema(), + ); + // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned(); @@ -717,11 +863,11 @@ impl ExecutionPlan for RepartitionExec { )?; } - let stream = futures::stream::once(async move { - let num_input_partitions = input.output_partitioning().partition_count(); + let num_input_partitions = input.output_partitioning().partition_count(); + let stream = futures::stream::once(async move { // lock scope - let (mut rx, reservation, spill_manager, abort_helper) = { + let (rx, reservation, spill_readers, abort_helper) = { // lock mutexes let mut state = state.lock(); let state = state.consume_input_streams( @@ -731,6 +877,7 @@ impl ExecutionPlan for RepartitionExec { preserve_order, name.clone(), Arc::clone(&context), + spill_manager.clone(), )?; // now return stream for the specified *output* partition which will @@ -738,7 +885,7 @@ impl ExecutionPlan for RepartitionExec { let PartitionChannels { rx, reservation, - spill_manager, + spill_readers, .. } = state .channels @@ -748,7 +895,7 @@ impl ExecutionPlan for RepartitionExec { ( rx, reservation, - spill_manager, + spill_readers, Arc::clone(&state.abort_helper), ) }; @@ -759,17 +906,20 @@ impl ExecutionPlan for RepartitionExec { if preserve_order { // Store streams from all the input partitions: + // Each input partition gets its own spill reader to maintain proper FIFO ordering let input_streams = rx .into_iter() - .map(|receiver| { - Box::pin(PerPartitionStream { - schema: Arc::clone(&schema_captured), + .zip(spill_readers) + .map(|(receiver, spill_stream)| { + // In preserve_order mode, each receiver corresponds to exactly one input partition + Box::pin(PerPartitionStream::new( + Arc::clone(&schema_captured), receiver, - _drop_helper: Arc::clone(&abort_helper), - reservation: Arc::clone(&reservation), - spill_manager: Arc::clone(&spill_manager), - state: RepartitionStreamState::ReceivingFromChannel, - }) as SendableRecordBatchStream + Arc::clone(&abort_helper), + Arc::clone(&reservation), + spill_stream, + 1, // Each receiver handles one input partition + )) as SendableRecordBatchStream }) .collect::>(); // Note that receiver size (`rx.len()`) and `num_input_partitions` are same. @@ -788,18 +938,25 @@ impl ExecutionPlan for RepartitionExec { .with_batch_size(context.session_config().batch_size()) .with_fetch(fetch) .with_reservation(merge_reservation) + .with_spill_manager(spill_manager) .build() } else { - Ok(Box::pin(RepartitionStream { - num_input_partitions, - num_input_partitions_processed: 0, - schema: input.schema(), - input: rx.swap_remove(0), - _drop_helper: abort_helper, + // Non-preserve-order case: single input stream, so use the first spill reader + let spill_stream = spill_readers + .into_iter() + .next() + .expect("at least one spill reader should exist"); + + Ok(Box::pin(PerPartitionStream::new( + schema_captured, + rx.into_iter() + .next() + .expect("at least one receiver should exist"), + abort_helper, reservation, - spill_manager, - state: RepartitionStreamState::ReceivingFromChannel, - }) as SendableRecordBatchStream) + spill_stream, + num_input_partitions, + )) as SendableRecordBatchStream) } }) .try_flatten(); @@ -1034,17 +1191,10 @@ impl RepartitionExec { /// Pulls data from the specified input plan, feeding it to the /// output partitions based on the desired partitioning /// - /// txs hold the output sending channels for each output partition + /// `output_channels` holds the output sending channels for each output partition async fn pull_from_input( mut stream: SendableRecordBatchStream, - mut output_channels: HashMap< - usize, - ( - DistributionSender, - SharedMemoryReservation, - Arc, - ), - >, + mut output_channels: HashMap, partitioning: Partitioning, metrics: RepartitionMetrics, ) -> Result<()> { @@ -1076,37 +1226,27 @@ impl RepartitionExec { let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it - if let Some((tx, reservation, spill_manager)) = - output_channels.get_mut(&partition) - { + if let Some(channel) = output_channels.get_mut(&partition) { let (batch_to_send, is_memory_batch) = - match reservation.lock().try_grow(size) { + match channel.reservation.lock().try_grow(size) { Ok(_) => { // Memory available - send in-memory batch (RepartitionBatch::Memory(batch), true) } Err(_) => { - // We're memory limited - spill this single batch to its own file - let spill_file = spill_manager - .spill_record_batch_and_finish( - &[batch], - &format!( - "RepartitionExec spill partition {partition}" - ), - )? - // Note that we handled empty batch above, so this is safe - .expect("non-empty batch should produce spill file"); - - // Store size for validation when reading back - (RepartitionBatch::Spilled { spill_file, size }, false) + // We're memory limited - spill to SpillPool + // SpillPool handles file handle reuse and rotation + channel.spill_writer.push_batch(&batch)?; + // Send marker indicating batch was spilled + (RepartitionBatch::Spilled, false) } }; - if tx.send(Some(Ok(batch_to_send))).await.is_err() { + if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { // If the other end has hung up, it was an early shutdown (e.g. LIMIT) // Only shrink memory if it was a memory batch if is_memory_batch { - reservation.lock().shrink(size); + channel.reservation.lock().shrink(size); } output_channels.remove(&partition); } @@ -1138,6 +1278,8 @@ impl RepartitionExec { } } + // Spill writers will auto-finalize when dropped + // No need for explicit flush Ok(()) } @@ -1180,7 +1322,7 @@ impl RepartitionExec { // Input task completed successfully Ok(Ok(())) => { // notify each output partition that this input partition has no more data - for (_, tx) in txs { + for (_partition, tx) in txs { tx.send(None).await.ok(); } } @@ -1188,118 +1330,55 @@ impl RepartitionExec { } } -enum RepartitionStreamState { - /// Waiting for next item from channel - ReceivingFromChannel, - /// Reading a spilled batch from disk (stream reads via tokio::fs) - ReadingSpilledBatch(SendableRecordBatchStream), -} - -struct RepartitionStream { - /// Number of input partitions that will be sending batches to this output channel - num_input_partitions: usize, - - /// Number of input partitions that have finished sending batches to this output channel - num_input_partitions_processed: usize, - - /// Schema wrapped by Arc - schema: SchemaRef, - - /// channel containing the repartitioned batches - input: DistributionReceiver, - - /// Handle to ensure background tasks are killed when no longer needed. - _drop_helper: Arc>>, - - /// Memory reservation. - reservation: SharedMemoryReservation, - - /// Spill manager for reading spilled batches - spill_manager: Arc, - - /// Current state of the stream - state: RepartitionStreamState, -} - -impl Stream for RepartitionStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - loop { - match &mut self.state { - RepartitionStreamState::ReceivingFromChannel => { - let value = futures::ready!(self.input.recv().poll_unpin(cx)); - match value { - Some(Some(v)) => match v { - Ok(RepartitionBatch::Memory(batch)) => { - // Release memory and return - self.reservation - .lock() - .shrink(batch.get_array_memory_size()); - return Poll::Ready(Some(Ok(batch))); - } - Ok(RepartitionBatch::Spilled { spill_file, size }) => { - // Read from disk - SpillReaderStream uses tokio::fs internally - // Pass the original size for validation - let stream = self - .spill_manager - .read_spill_as_stream(spill_file, Some(size))?; - self.state = - RepartitionStreamState::ReadingSpilledBatch(stream); - // Continue loop to poll the stream immediately - } - Err(e) => { - return Poll::Ready(Some(Err(e))); - } - }, - Some(None) => { - self.num_input_partitions_processed += 1; - - if self.num_input_partitions - == self.num_input_partitions_processed - { - // all input partitions have finished sending batches - return Poll::Ready(None); - } else { - // other partitions still have data to send - continue; - } - } - None => { - return Poll::Ready(None); - } - } - } - RepartitionStreamState::ReadingSpilledBatch(stream) => { - match futures::ready!(stream.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - // Return batch and stay in ReadingSpilledBatch state to read more batches - return Poll::Ready(Some(Ok(batch))); - } - Some(Err(e)) => { - self.state = RepartitionStreamState::ReceivingFromChannel; - return Poll::Ready(Some(Err(e))); - } - None => { - // Spill stream ended - go back to receiving from channel - self.state = RepartitionStreamState::ReceivingFromChannel; - continue; - } - } - } - } - } - } -} - -impl RecordBatchStream for RepartitionStream { - /// Get the schema - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } +/// State for tracking whether we're reading from memory channel or spill stream. +/// +/// This state machine ensures proper ordering when batches are mixed between memory +/// and spilled storage. When a [`RepartitionBatch::Spilled`] marker is received, +/// the stream must block on the spill stream until the corresponding batch arrives. +/// +/// # State Machine +/// +/// ```text +/// ┌─────────────────┐ +/// ┌───▶│ ReadingMemory │◀───┐ +/// │ └────────┬────────┘ │ +/// │ │ │ +/// │ Poll channel │ +/// │ │ │ +/// │ ┌──────────┼─────────────┐ +/// │ │ │ │ +/// │ ▼ ▼ │ +/// │ Memory Spilled │ +/// Got batch │ batch marker │ +/// from spill │ │ │ │ +/// │ │ ▼ │ +/// │ │ ┌──────────────────┐ │ +/// │ │ │ ReadingSpilled │ │ +/// │ │ └────────┬─────────┘ │ +/// │ │ │ │ +/// │ │ Poll spill_stream │ +/// │ │ │ │ +/// │ │ ▼ │ +/// │ │ Get batch │ +/// │ │ │ │ +/// └──┴───────────┴────────────┘ +/// │ +/// ▼ +/// Return batch +/// (Order preserved within +/// (input, output) pair) +/// ``` +/// +/// The transition to `ReadingSpilled` blocks further channel polling to maintain +/// FIFO ordering - we cannot read the next item from the channel until the spill +/// stream provides the current batch. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum StreamState { + /// Reading from the memory channel (normal operation) + ReadingMemory, + /// Waiting for a spilled batch from the spill stream. + /// Must not poll channel until spilled batch is received to preserve ordering. + ReadingSpilled, } /// This struct converts a receiver to a stream. @@ -1317,11 +1396,37 @@ struct PerPartitionStream { /// Memory reservation. reservation: SharedMemoryReservation, - /// Spill manager for reading spilled batches - spill_manager: Arc, + /// Infinite stream for reading from the spill pool + spill_stream: SendableRecordBatchStream, + + /// Internal state indicating if we are reading from memory or spill stream + state: StreamState, - /// Current state of the stream - state: RepartitionStreamState, + /// Number of input partitions that have not yet finished. + /// In non-preserve-order mode, multiple input partitions send to the same channel, + /// each sending None when complete. We must wait for all of them. + remaining_partitions: usize, +} + +impl PerPartitionStream { + fn new( + schema: SchemaRef, + receiver: DistributionReceiver, + drop_helper: Arc>>, + reservation: SharedMemoryReservation, + spill_stream: SendableRecordBatchStream, + num_input_partitions: usize, + ) -> Self { + Self { + schema, + receiver, + _drop_helper: drop_helper, + reservation, + spill_stream, + state: StreamState::ReadingMemory, + remaining_partitions: num_input_partitions, + } + } } impl Stream for PerPartitionStream { @@ -1331,55 +1436,74 @@ impl Stream for PerPartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + use futures::StreamExt; + loop { - match &mut self.state { - RepartitionStreamState::ReceivingFromChannel => { - let value = futures::ready!(self.receiver.recv().poll_unpin(cx)); + match self.state { + StreamState::ReadingMemory => { + // Poll the memory channel for next message + let value = match self.receiver.recv().poll_unpin(cx) { + Poll::Ready(v) => v, + Poll::Pending => { + // Nothing from channel, wait + return Poll::Pending; + } + }; + match value { Some(Some(v)) => match v { Ok(RepartitionBatch::Memory(batch)) => { - // Release memory and return + // Release memory and return batch self.reservation .lock() .shrink(batch.get_array_memory_size()); return Poll::Ready(Some(Ok(batch))); } - Ok(RepartitionBatch::Spilled { spill_file, size }) => { - // Read from disk - SpillReaderStream uses tokio::fs internally - // Pass the original size for validation - let stream = self - .spill_manager - .read_spill_as_stream(spill_file, Some(size))?; - self.state = - RepartitionStreamState::ReadingSpilledBatch(stream); - // Continue loop to poll the stream immediately + Ok(RepartitionBatch::Spilled) => { + // Batch was spilled, transition to reading from spill stream + // We must block on spill stream until we get the batch + // to preserve ordering + self.state = StreamState::ReadingSpilled; + continue; } Err(e) => { return Poll::Ready(Some(Err(e))); } }, Some(None) => { - // Input partition has finished sending batches + // One input partition finished + self.remaining_partitions -= 1; + if self.remaining_partitions == 0 { + // All input partitions finished + return Poll::Ready(None); + } + // Continue to poll for more data from other partitions + continue; + } + None => { + // Channel closed unexpectedly return Poll::Ready(None); } - None => return Poll::Ready(None), } } - - RepartitionStreamState::ReadingSpilledBatch(stream) => { - match futures::ready!(stream.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - // Return batch and stay in ReadingSpilledBatch state to read more batches + StreamState::ReadingSpilled => { + // Poll spill stream for the spilled batch + match self.spill_stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + self.state = StreamState::ReadingMemory; return Poll::Ready(Some(Ok(batch))); } - Some(Err(e)) => { - self.state = RepartitionStreamState::ReceivingFromChannel; + Poll::Ready(Some(Err(e))) => { return Poll::Ready(Some(Err(e))); } - None => { - // Spill stream ended - go back to receiving from channel - self.state = RepartitionStreamState::ReceivingFromChannel; - continue; + Poll::Ready(None) => { + // Spill stream ended, keep draining the memory channel + self.state = StreamState::ReadingMemory; + } + Poll::Pending => { + // Spilled batch not ready yet, must wait + // This preserves ordering by blocking until spill data arrives + return Poll::Pending; } } } @@ -2140,12 +2264,105 @@ mod tests { ) .unwrap() } + + /// Create batches with sequential values for ordering tests + fn create_ordered_batches(num_batches: usize) -> Vec { + let schema = test_schema(); + (0..num_batches) + .map(|i| { + let start = (i * 8) as u32; + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(UInt32Array::from( + (start..start + 8).collect::>(), + ))], + ) + .unwrap() + }) + .collect() + } + + #[tokio::test] + async fn test_repartition_ordering_with_spilling() -> Result<()> { + // Test that repartition preserves ordering when spilling occurs + // This tests the state machine fix where we must block on spill_stream + // when a Spilled marker is received, rather than continuing to poll the channel + + let schema = test_schema(); + // Create batches with sequential values: batch 0 has [0,1,2,3,4,5,6,7], + // batch 1 has [8,9,10,11,12,13,14,15], etc. + let partition = create_ordered_batches(20); + let input_partitions = vec![partition]; + + // Use RoundRobinBatch to ensure predictable ordering + let partitioning = Partitioning::RoundRobinBatch(2); + + // Set up context with very tight memory limit to force spilling + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(1, 1.0) + .build_arc()?; + + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + // create physical plan + let exec = + TestMemoryExec::try_new_exec(&input_partitions, Arc::clone(&schema), None)?; + let exec = RepartitionExec::try_new(exec, partitioning)?; + + // Collect all output partitions + let mut all_batches = Vec::new(); + for i in 0..exec.partitioning().partition_count() { + let mut partition_batches = Vec::new(); + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; + while let Some(result) = stream.next().await { + let batch = result?; + partition_batches.push(batch); + } + all_batches.push(partition_batches); + } + + // Verify spilling occurred + let metrics = exec.metrics().unwrap(); + assert!( + metrics.spill_count().unwrap() > 0, + "Expected spilling to occur, but spill_count = 0" + ); + + // Verify ordering is preserved within each partition + // With RoundRobinBatch, even batches go to partition 0, odd batches to partition 1 + for (partition_idx, batches) in all_batches.iter().enumerate() { + let mut last_value = None; + for batch in batches { + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..array.len() { + let value = array.value(i); + if let Some(last) = last_value { + assert!( + value > last, + "Ordering violated in partition {partition_idx}: {value} is not greater than {last}" + ); + } + last_value = Some(value); + } + } + } + + Ok(()) + } } #[cfg(test)] mod test { + use arrow::array::record_batch; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::assert_batches_eq; use super::*; use crate::test::TestMemoryExec; @@ -2229,6 +2446,204 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_preserve_order_with_spilling() -> Result<()> { + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_execution::TaskContext; + + // Create sorted input data across multiple partitions + // Partition1: [1,3], [5,7], [9,11] + // Partition2: [2,4], [6,8], [10,12] + let batch1 = record_batch!(("c0", UInt32, [1, 3])).unwrap(); + let batch2 = record_batch!(("c0", UInt32, [2, 4])).unwrap(); + let batch3 = record_batch!(("c0", UInt32, [5, 7])).unwrap(); + let batch4 = record_batch!(("c0", UInt32, [6, 8])).unwrap(); + let batch5 = record_batch!(("c0", UInt32, [9, 11])).unwrap(); + let batch6 = record_batch!(("c0", UInt32, [10, 12])).unwrap(); + let schema = batch1.schema(); + let sort_exprs = LexOrdering::new([PhysicalSortExpr { + expr: col("c0", &schema).unwrap(), + options: SortOptions::default().asc(), + }]) + .unwrap(); + let partition1 = vec![batch1.clone(), batch3.clone(), batch5.clone()]; + let partition2 = vec![batch2.clone(), batch4.clone(), batch6.clone()]; + let input_partitions = vec![partition1, partition2]; + + // Set up context with tight memory limit to force spilling + // Sorting needs some non-spillable memory, so 64 bytes should force spilling while still allowing the query to complete + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(64, 1.0) + .build_arc()?; + + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + // Create physical plan with order preservation + let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)? + .try_with_sort_information(vec![sort_exprs.clone(), sort_exprs])?; + let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec))); + // Repartition into 3 partitions with order preservation + // We expect 1 batch per output partition after repartitioning + let exec = RepartitionExec::try_new(exec, Partitioning::RoundRobinBatch(3))? + .with_preserve_order(); + + let mut batches = vec![]; + + // Collect all partitions - should succeed by spilling to disk + for i in 0..exec.partitioning().partition_count() { + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; + while let Some(result) = stream.next().await { + let batch = result?; + batches.push(batch); + } + } + + #[rustfmt::skip] + let expected = [ + [ + "+----+", + "| c0 |", + "+----+", + "| 1 |", + "| 2 |", + "| 3 |", + "| 4 |", + "+----+", + ], + [ + "+----+", + "| c0 |", + "+----+", + "| 5 |", + "| 6 |", + "| 7 |", + "| 8 |", + "+----+", + ], + [ + "+----+", + "| c0 |", + "+----+", + "| 9 |", + "| 10 |", + "| 11 |", + "| 12 |", + "+----+", + ], + ]; + + for (batch, expected) in batches.iter().zip(expected.iter()) { + assert_batches_eq!(expected, std::slice::from_ref(batch)); + } + + // We should have spilled ~ all of the data. + // - We spill data during the repartitioning phase + // - We may also spill during the final merge sort + let all_batches = [batch1, batch2, batch3, batch4, batch5, batch6]; + let metrics = exec.metrics().unwrap(); + assert!( + metrics.spill_count().unwrap() > input_partitions.len(), + "Expected spill_count > {} for order-preserving repartition, but got {:?}", + input_partitions.len(), + metrics.spill_count() + ); + assert!( + metrics.spilled_bytes().unwrap() + > all_batches + .iter() + .map(|b| b.get_array_memory_size()) + .sum::(), + "Expected spilled_bytes > {} for order-preserving repartition, got {}", + all_batches + .iter() + .map(|b| b.get_array_memory_size()) + .sum::(), + metrics.spilled_bytes().unwrap() + ); + assert!( + metrics.spilled_rows().unwrap() + >= all_batches.iter().map(|b| b.num_rows()).sum::(), + "Expected spilled_rows > {} for order-preserving repartition, got {}", + all_batches.iter().map(|b| b.num_rows()).sum::(), + metrics.spilled_rows().unwrap() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_hash_partitioning_with_spilling() -> Result<()> { + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_execution::TaskContext; + + // Create input data similar to the round-robin test + let batch1 = record_batch!(("c0", UInt32, [1, 3])).unwrap(); + let batch2 = record_batch!(("c0", UInt32, [2, 4])).unwrap(); + let batch3 = record_batch!(("c0", UInt32, [5, 7])).unwrap(); + let batch4 = record_batch!(("c0", UInt32, [6, 8])).unwrap(); + let schema = batch1.schema(); + + let partition1 = vec![batch1.clone(), batch3.clone()]; + let partition2 = vec![batch2.clone(), batch4.clone()]; + let input_partitions = vec![partition1, partition2]; + + // Set up context with memory limit to test hash partitioning with spilling infrastructure + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(1, 1.0) + .build_arc()?; + + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + + // Create physical plan with hash partitioning + let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; + let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec))); + // Hash partition into 2 partitions by column c0 + let hash_expr = col("c0", &schema)?; + let exec = + RepartitionExec::try_new(exec, Partitioning::Hash(vec![hash_expr], 2))?; + + // Collect all partitions concurrently using JoinSet - this prevents deadlock + // where the distribution channel gate closes when all output channels are full + let mut join_set = tokio::task::JoinSet::new(); + for i in 0..exec.partitioning().partition_count() { + let stream = exec.execute(i, Arc::clone(&task_ctx))?; + join_set.spawn(async move { + let mut count = 0; + futures::pin_mut!(stream); + while let Some(result) = stream.next().await { + let batch = result?; + count += batch.num_rows(); + } + Ok::(count) + }); + } + + // Wait for all partitions and sum the rows + let mut total_rows = 0; + while let Some(result) = join_set.join_next().await { + total_rows += result.unwrap()?; + } + + // Verify we got all rows back + let all_batches = [batch1, batch2, batch3, batch4]; + let expected_rows: usize = all_batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, expected_rows); + + // Verify metrics are available + let metrics = exec.metrics().unwrap(); + // Just verify the metrics can be retrieved (spilling may or may not occur) + let spill_count = metrics.spill_count().unwrap_or(0); + assert!(spill_count > 0); + let spilled_bytes = metrics.spilled_bytes().unwrap_or(0); + assert!(spilled_bytes > 0); + let spilled_rows = metrics.spilled_rows().unwrap_or(0); + assert!(spilled_rows > 0); + + Ok(()) + } + #[tokio::test] async fn test_repartition() -> Result<()> { let schema = test_schema(); diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 14917e23b7921..e7f354a73b4cd 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -88,6 +88,12 @@ impl InProgressSpillFile { Ok(()) } + /// Returns a reference to the in-progress file, if it exists. + /// This can be used to get the file path for creating readers before the file is finished. + pub fn file(&self) -> Option<&RefCountedTempFile> { + self.in_progress_file.as_ref() + } + /// Finalizes the file, returning the completed file reference. /// If there are no batches spilled before, it returns `None`. pub fn finish(&mut self) -> Result> { diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 5b9a91e781b16..58fd016a63dd7 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -19,6 +19,11 @@ pub(crate) mod in_progress_spill_file; pub(crate) mod spill_manager; +pub mod spill_pool; + +// Re-export SpillManager for doctests only (hidden from public docs) +#[doc(hidden)] +pub use spill_manager::SpillManager; use std::fs::File; use std::io::BufReader; diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index cc39102d89819..6fd97a8e2e6a0 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -72,6 +72,11 @@ impl SpillManager { self } + /// Returns the schema for batches managed by this SpillManager + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + /// Creates a temporary file for in-progress operations, returning an error /// message if file creation fails. The file can be used to append batches /// incrementally and then finish the file when done. diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs new file mode 100644 index 0000000000000..bbe54ca45caa3 --- /dev/null +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -0,0 +1,1425 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::{Stream, StreamExt}; +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// Shared state between the writer and readers of a spill pool. +/// This contains the queue of files and coordination state. +/// +/// # Locking Design +/// +/// This struct uses **fine-grained locking** with nested `Arc>`: +/// - `SpillPoolShared` is wrapped in `Arc>` (outer lock) +/// - Each `ActiveSpillFileShared` is wrapped in `Arc>` (inner lock) +/// +/// This enables: +/// 1. **Short critical sections**: The outer lock is held only for queue operations +/// 2. **I/O outside locks**: Disk I/O happens while holding only the file-specific lock +/// 3. **Concurrent operations**: Reader can access the queue while writer does I/O +/// +/// **Lock ordering discipline**: Never hold both locks simultaneously to prevent deadlock. +/// Always: acquire outer lock → release outer lock → acquire inner lock (if needed). +struct SpillPoolShared { + /// Queue of ALL files (including the current write file if it exists). + /// Readers always read from the front of this queue (FIFO). + /// Each file has its own lock to enable concurrent reader/writer access. + files: VecDeque>>, + /// SpillManager for creating files and tracking metrics + spill_manager: Arc, + /// Pool-level waker to notify when new files are available (single reader) + waker: Option, + /// Whether the writer has been dropped (no more files will be added) + writer_dropped: bool, + /// Writer's reference to the current file (shared by all cloned writers). + /// Has its own lock to allow I/O without blocking queue access. + current_write_file: Option>>, +} + +impl SpillPoolShared { + /// Creates a new shared pool state + fn new(spill_manager: Arc) -> Self { + Self { + files: VecDeque::new(), + spill_manager, + waker: None, + writer_dropped: false, + current_write_file: None, + } + } + + /// Registers a waker to be notified when new data is available (pool-level) + fn register_waker(&mut self, waker: Waker) { + self.waker = Some(waker); + } + + /// Wakes the pool-level reader + fn wake(&mut self) { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +/// Writer for a spill pool. Provides coordinated write access with FIFO semantics. +/// +/// Created by [`channel`]. See that function for architecture diagrams and usage examples. +/// +/// The writer is `Clone`, allowing multiple writers to coordinate on the same pool. +/// All clones share the same current write file and coordinate file rotation. +/// The writer automatically manages file rotation based on the `max_file_size_bytes` +/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the +/// current file so readers can access all written data. +#[derive(Clone)] +pub struct SpillPoolWriter { + /// Maximum size in bytes before rotating to a new file. + /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`. + max_file_size_bytes: usize, + /// Shared state with readers (includes current_write_file for coordination) + shared: Arc>, +} + +impl SpillPoolWriter { + /// Spills a batch to the pool, rotating files when necessary. + /// + /// If the current file would exceed `max_file_size_bytes` after adding + /// this batch, the file is finalized and a new one is started. + /// + /// See [`channel`] for overall architecture and examples. + /// + /// # File Rotation Logic + /// + /// ```text + /// push_batch() + /// │ + /// ▼ + /// Current file exists? + /// │ + /// ├─ No ──▶ Create new file ──▶ Add to shared queue + /// │ Wake readers + /// ▼ + /// Write batch to current file + /// │ + /// ▼ + /// estimated_size > max_file_size_bytes? + /// │ + /// ├─ No ──▶ Keep current file for next batch + /// │ + /// ▼ + /// Yes: finish() current file + /// Mark writer_finished = true + /// Wake readers + /// │ + /// ▼ + /// Next push_batch() creates new file + /// ``` + /// + /// # Errors + /// + /// Returns an error if disk I/O fails or disk quota is exceeded. + pub fn push_batch(&self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + // Skip empty batches + return Ok(()); + } + + let batch_size = batch.get_array_memory_size(); + + // Fine-grained locking: Lock shared state briefly for queue access + let mut shared = self.shared.lock(); + + // Create new file if we don't have one yet + if shared.current_write_file.is_none() { + let spill_manager = Arc::clone(&shared.spill_manager); + // Release shared lock before disk I/O (fine-grained locking) + drop(shared); + + let writer = spill_manager.create_in_progress_file("SpillPool")?; + // Clone the file so readers can access it immediately + let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone(); + + let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared { + writer: Some(writer), + file: Some(file), // Set immediately so readers can access it + batches_written: 0, + estimated_size: 0, + writer_finished: false, + waker: None, + })); + + // Re-acquire lock and push to shared queue + shared = self.shared.lock(); + shared.files.push_back(Arc::clone(&file_shared)); + shared.current_write_file = Some(file_shared); + shared.wake(); // Wake readers waiting for new files + } + + let current_write_file = shared.current_write_file.take(); + // Release shared lock before file I/O (fine-grained locking) + // This allows readers to access the queue while we do disk I/O + drop(shared); + + // Write batch to current file - lock only the specific file + if let Some(current_file) = current_write_file { + // Now lock just this file for I/O (separate from shared lock) + let mut file_shared = current_file.lock(); + + // Append the batch + if let Some(ref mut writer) = file_shared.writer { + writer.append_batch(batch)?; + file_shared.batches_written += 1; + file_shared.estimated_size += batch_size; + } + + // Wake reader waiting on this specific file + file_shared.wake(); + + // Check if we need to rotate + let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes; + + if needs_rotation { + // Finish the IPC writer + if let Some(mut writer) = file_shared.writer.take() { + writer.finish()?; + } + // Mark as finished so readers know not to wait for more data + file_shared.writer_finished = true; + // Wake reader waiting on this file (it's now finished) + file_shared.wake(); + // Don't put back current_write_file - let it rotate + } else { + // Release file lock + drop(file_shared); + // Put back the current file for further writing + let mut shared = self.shared.lock(); + shared.current_write_file = Some(current_file); + } + } + + Ok(()) + } +} + +impl Drop for SpillPoolWriter { + fn drop(&mut self) { + let mut shared = self.shared.lock(); + + // Finalize the current file when the last writer is dropped + if let Some(current_file) = shared.current_write_file.take() { + // Release shared lock before locking file + drop(shared); + + let mut file_shared = current_file.lock(); + + // Finish the current writer if it exists + if let Some(mut writer) = file_shared.writer.take() { + // Ignore errors on drop - we're in destructor + let _ = writer.finish(); + } + + // Mark as finished so readers know not to wait for more data + file_shared.writer_finished = true; + + // Wake reader waiting on this file (it's now finished) + file_shared.wake(); + + drop(file_shared); + shared = self.shared.lock(); + } + + // Mark writer as dropped and wake pool-level readers + shared.writer_dropped = true; + shared.wake(); + } +} + +/// Creates a paired writer and reader for a spill pool with MPSC (multi-producer, single-consumer) +/// semantics. +/// +/// This is the recommended way to create a spill pool. The writer is `Clone`, allowing +/// multiple producers to coordinate writes to the same pool. The reader can consume batches +/// in FIFO order. The reader can start reading immediately after a writer appends a batch +/// to the spill file, without waiting for the file to be sealed, while writers continue to +/// write more data. +/// +/// Internally this coordinates rotating spill files based on size limits, and +/// handles asynchronous notification between the writer and reader using wakers. +/// This ensures that we manage disk usage efficiently while allowing concurrent +/// I/O between the writer and reader. +/// +/// # Data Flow Overview +/// +/// 1. Writer write batch `B0` to F1 +/// 2. Writer write batch `B1` to F1, notices the size limit exceeded, finishes F1. +/// 3. Reader read `B0` from F1 +/// 4. Reader read `B1`, no more batch to read -> wait on the waker +/// 5. Writer write batch `B2` to a new file `F2`, wake up the waiting reader. +/// 6. Reader read `B2` from F2. +/// 7. Repeat until writer is dropped. +/// +/// # Architecture +/// +/// ```text +/// ┌─────────────────────────────────────────────────────────────────────────┐ +/// │ SpillPool │ +/// │ │ +/// │ Writer Side Shared State Reader Side │ +/// │ ─────────── ──────────── ─────────── │ +/// │ │ +/// │ SpillPoolWriter ┌────────────────────┐ SpillPoolReader │ +/// │ │ │ VecDeque │ │ │ +/// │ │ │ ┌────┐┌────┐ │ │ │ +/// │ push_batch() │ │ F1 ││ F2 │ ... │ next().await │ +/// │ │ │ └────┘└────┘ │ │ │ +/// │ ▼ │ (FIFO order) │ ▼ │ +/// │ ┌─────────┐ │ │ ┌──────────┐ │ +/// │ │Current │───────▶│ Coordination: │◀───│ Current │ │ +/// │ │Write │ │ - Wakers │ │ Read │ │ +/// │ │File │ │ - Batch counts │ │ File │ │ +/// │ └─────────┘ │ - Writer status │ └──────────┘ │ +/// │ │ └────────────────────┘ │ │ +/// │ │ │ │ +/// │ Size > limit? Read all batches? │ +/// │ │ │ │ +/// │ ▼ ▼ │ +/// │ Rotate to new file Pop from queue │ +/// └─────────────────────────────────────────────────────────────────────────┘ +/// +/// Writer produces → Shared FIFO queue → Reader consumes +/// ``` +/// +/// # File State Machine +/// +/// Each file in the pool coordinates between writer and reader: +/// +/// ```text +/// Writer View Reader View +/// ─────────── ─────────── +/// +/// Created writer: Some(..) batches_read: 0 +/// batches_written: 0 (waiting for data) +/// │ +/// ▼ +/// Writing append_batch() Can read if: +/// batches_written++ batches_read < batches_written +/// wake readers +/// │ │ +/// │ ▼ +/// ┌──────┴──────┐ poll_next() → batch +/// │ │ batches_read++ +/// ▼ ▼ +/// Size > limit? More data? +/// │ │ +/// │ └─▶ Yes ──▶ Continue writing +/// ▼ +/// finish() Reader catches up: +/// writer_finished = true batches_read == batches_written +/// wake readers │ +/// │ ▼ +/// └─────────────────────▶ Returns Poll::Ready(None) +/// File complete, pop from queue +/// ``` +/// +/// # Arguments +/// +/// * `max_file_size_bytes` - Maximum size per file before rotation. When a file +/// exceeds this size, the writer automatically rotates to a new file. +/// * `spill_manager` - Manager for file creation and metrics tracking +/// +/// # Returns +/// +/// A tuple of `(SpillPoolWriter, SendableRecordBatchStream)` that share the same +/// underlying pool. The reader is returned as a stream for immediate use with +/// async stream combinators. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// use arrow::array::{ArrayRef, Int32Array}; +/// use arrow::datatypes::{DataType, Field, Schema}; +/// use arrow::record_batch::RecordBatch; +/// use datafusion_execution::runtime_env::RuntimeEnv; +/// use futures::StreamExt; +/// +/// # use datafusion_physical_plan::spill::spill_pool; +/// # use datafusion_physical_plan::spill::SpillManager; // Re-exported for doctests +/// # use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; +/// # +/// # #[tokio::main] +/// # async fn main() -> datafusion_common::Result<()> { +/// # // Setup for the example (typically comes from TaskContext in production) +/// # let env = Arc::new(RuntimeEnv::default()); +/// # let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); +/// # let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); +/// # let spill_manager = Arc::new(SpillManager::new(env, metrics, schema.clone())); +/// # +/// // Create channel with 1MB file size limit +/// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager); +/// +/// // Spawn writer task to produce batches +/// let write_handle = tokio::spawn(async move { +/// for i in 0..5 { +/// let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100])); +/// let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); +/// writer.push_batch(&batch).unwrap(); +/// } +/// // Writer dropped here, finalizing current file +/// }); +/// +/// // Reader consumes batches in FIFO order (can run concurrently with writer) +/// let mut batches_read = 0; +/// while let Some(result) = reader.next().await { +/// let batch = result?; +/// batches_read += 1; +/// // Process batch... +/// if batches_read == 5 { +/// break; // Got all expected batches +/// } +/// } +/// +/// write_handle.await.unwrap(); +/// assert_eq!(batches_read, 5); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Why rotate files? +/// +/// File rotation ensures we don't end up with unreferenced disk usage. +/// If we used a single file for all spilled data, we would end up with +/// unreferenced data at the beginning of the file that has already been read +/// by readers but we can't delete because you can't truncate from the start of a file. +/// +/// Consider the case of a query like `SELECT * FROM large_table WHERE false`. +/// Obviously this query produces no output rows, but if we had a spilling operator +/// in the middle of this query between the scan and the filter it would see the entire +/// `large_table` flow through it and thus would spill all of that data to disk. +/// So we'd end up using up to `size(large_table)` bytes of disk space. +/// If instead we use file rotation, and as long as the readers can keep up with the writer, +/// then we can ensure that once a file is fully read by all readers it can be deleted, +/// thus bounding the maximum disk usage to roughly `max_file_size_bytes`. +pub fn channel( + max_file_size_bytes: usize, + spill_manager: Arc, +) -> (SpillPoolWriter, SendableRecordBatchStream) { + let schema = Arc::clone(spill_manager.schema()); + let shared = Arc::new(Mutex::new(SpillPoolShared::new(spill_manager))); + + let writer = SpillPoolWriter { + max_file_size_bytes, + shared: Arc::clone(&shared), + }; + + let reader = SpillPoolReader::new(shared, schema); + + (writer, Box::pin(reader)) +} + +/// Shared state between writer and readers for an active spill file. +/// Protected by a Mutex to coordinate between concurrent readers and the writer. +struct ActiveSpillFileShared { + /// Writer handle - taken (set to None) when finish() is called + writer: Option, + /// The spill file, set when the writer finishes. + /// Taken by the reader when creating a stream (the file stays open via file handles). + file: Option, + /// Total number of batches written to this file + batches_written: usize, + /// Estimated size in bytes of data written to this file + estimated_size: usize, + /// Whether the writer has finished writing to this file + writer_finished: bool, + /// Waker for reader waiting on this specific file (SPSC: only one reader) + waker: Option, +} + +impl ActiveSpillFileShared { + /// Registers a waker to be notified when new data is written to this file + fn register_waker(&mut self, waker: Waker) { + self.waker = Some(waker); + } + + /// Wakes the reader waiting on this file + fn wake(&mut self) { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +/// Reader state for a SpillFile (owned by individual SpillFile instances). +/// This is kept separate from the shared state to avoid holding locks during I/O. +struct SpillFileReader { + /// The actual stream reading from disk + stream: SendableRecordBatchStream, + /// Number of batches this reader has consumed + batches_read: usize, +} + +struct SpillFile { + /// Shared coordination state (contains writer and batch counts) + shared: Arc>, + /// Reader state (lazy-initialized, owned by this SpillFile) + reader: Option, + /// Spill manager for creating readers + spill_manager: Arc, +} + +impl Stream for SpillFile { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + use std::task::Poll; + + // Step 1: Lock shared state and check coordination + let (should_read, file) = { + let mut shared = self.shared.lock(); + + // Determine if we can read + let batches_read = self.reader.as_ref().map_or(0, |r| r.batches_read); + + if batches_read < shared.batches_written { + // More data available to read - take the file if we don't have a reader yet + let file = if self.reader.is_none() { + shared.file.take() + } else { + None + }; + (true, file) + } else if shared.writer_finished { + // No more data and writer is done - EOF + return Poll::Ready(None); + } else { + // Caught up to writer, but writer still active - register waker and wait + shared.register_waker(cx.waker().clone()); + return Poll::Pending; + } + }; // Lock released here + + // Step 2: Lazy-create reader stream if needed + if self.reader.is_none() && should_read { + if let Some(file) = file { + match self.spill_manager.read_spill_as_stream(file, None) { + Ok(stream) => { + self.reader = Some(SpillFileReader { + stream, + batches_read: 0, + }); + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } else { + // File not available yet (writer hasn't finished or already taken) + // Register waker and wait for file to be ready + let mut shared = self.shared.lock(); + shared.register_waker(cx.waker().clone()); + return Poll::Pending; + } + } + + // Step 3: Poll the reader stream (no lock held) + if let Some(reader) = &mut self.reader { + match reader.stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + // Successfully read a batch - increment counter + reader.batches_read += 1; + Poll::Ready(Some(Ok(batch))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => { + // Stream exhausted unexpectedly + // This shouldn't happen if coordination is correct, but handle gracefully + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } else { + // Should not reach here, but handle gracefully + Poll::Ready(None) + } + } +} + +/// A stream that reads from a SpillPool in FIFO order. +/// +/// Created by [`channel`]. See that function for architecture diagrams and usage examples. +/// +/// The stream automatically handles file rotation and reads from completed files. +/// When no data is available, it returns `Poll::Pending` and registers a waker to +/// be notified when the writer produces more data. +/// +/// # Infinite Stream Semantics +/// +/// This stream never returns `None` (`Poll::Ready(None)`) on its own - it will keep +/// waiting for the writer to produce more data. The stream ends only when: +/// - The reader is dropped +/// - The writer is dropped AND all queued data has been consumed +/// +/// This makes it suitable for continuous streaming scenarios where the writer may +/// produce data intermittently. +pub struct SpillPoolReader { + /// Shared reference to the spill pool + shared: Arc>, + /// Current SpillFile we're reading from + current_file: Option, + /// Schema of the spilled data + schema: SchemaRef, +} + +impl SpillPoolReader { + /// Creates a new reader from shared pool state. + /// + /// This is private - use the `channel()` function to create a reader/writer pair. + /// + /// # Arguments + /// + /// * `shared` - Shared reference to the pool state + fn new(shared: Arc>, schema: SchemaRef) -> Self { + Self { + shared, + current_file: None, + schema, + } + } +} + +impl Stream for SpillPoolReader { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + use std::task::Poll; + + loop { + // If we have a current file, try to read from it + if let Some(ref mut file) = self.current_file { + match file.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + // Got a batch, return it + return Poll::Ready(Some(Ok(batch))); + } + Poll::Ready(Some(Err(e))) => { + // Error reading batch + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(None) => { + // Current file stream exhausted + // Check if this file is marked as writer_finished + let writer_finished = { file.shared.lock().writer_finished }; + + if writer_finished { + // File is complete, pop it from the queue and move to next + let mut shared = self.shared.lock(); + shared.files.pop_front(); + drop(shared); // Release lock + + // Clear current file and continue loop to get next file + self.current_file = None; + continue; + } else { + // Stream exhausted but writer not finished - unexpected + // This shouldn't happen with proper coordination + return Poll::Ready(None); + } + } + Poll::Pending => { + // File not ready yet (waiting for writer) + // Register waker so we get notified when writer adds more batches + let mut shared = self.shared.lock(); + shared.register_waker(cx.waker().clone()); + return Poll::Pending; + } + } + } + + // No current file, need to get the next one + let mut shared = self.shared.lock(); + + // Peek at the front of the queue (don't pop yet) + if let Some(file_shared) = shared.files.front() { + // Create a SpillFile from the shared state + let spill_manager = Arc::clone(&shared.spill_manager); + let file_shared = Arc::clone(file_shared); + drop(shared); // Release lock before creating SpillFile + + self.current_file = Some(SpillFile { + shared: file_shared, + reader: None, + spill_manager, + }); + + // Continue loop to poll the new file + continue; + } + + // No files in queue - check if writer is done + if shared.writer_dropped { + // Writer is done and no more files will be added - EOF + return Poll::Ready(None); + } + + // Writer still active, register waker that will get notified when new files are added + shared.register_waker(cx.waker().clone()); + return Poll::Pending; + } + } +} + +impl RecordBatchStream for SpillPoolReader { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common_runtime::SpawnedTask; + use datafusion_execution::runtime_env::RuntimeEnv; + use futures::StreamExt; + + fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) + } + + fn create_test_batch(start: i32, count: usize) -> RecordBatch { + let schema = create_test_schema(); + let a: ArrayRef = Arc::new(Int32Array::from( + (start..start + count as i32).collect::>(), + )); + RecordBatch::try_new(schema, vec![a]).unwrap() + } + + fn create_spill_channel( + max_file_size: usize, + ) -> (SpillPoolWriter, SendableRecordBatchStream) { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = create_test_schema(); + let spill_manager = Arc::new(SpillManager::new(env, metrics, schema)); + + channel(max_file_size, spill_manager) + } + + fn create_spill_channel_with_metrics( + max_file_size: usize, + ) -> (SpillPoolWriter, SendableRecordBatchStream, SpillMetrics) { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = create_test_schema(); + let spill_manager = Arc::new(SpillManager::new(env, metrics.clone(), schema)); + + let (writer, reader) = channel(max_file_size, spill_manager); + (writer, reader, metrics) + } + + #[tokio::test] + async fn test_basic_write_and_read() -> Result<()> { + let (writer, mut reader) = create_spill_channel(1024 * 1024); + + // Write one batch + let batch1 = create_test_batch(0, 10); + writer.push_batch(&batch1)?; + + // Read the batch + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 10); + + // Write another batch + let batch2 = create_test_batch(10, 5); + writer.push_batch(&batch2)?; + // Read the second batch + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 5); + + Ok(()) + } + + #[tokio::test] + async fn test_single_batch_write_read() -> Result<()> { + let (writer, mut reader) = create_spill_channel(1024 * 1024); + + // Write one batch + let batch = create_test_batch(0, 5); + writer.push_batch(&batch)?; + + // Read it back + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 5); + + // Verify the actual data + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 0); + assert_eq!(col.value(4), 4); + + Ok(()) + } + + #[tokio::test] + async fn test_multiple_batches_sequential() -> Result<()> { + let (writer, mut reader) = create_spill_channel(1024 * 1024); + + // Write multiple batches + for i in 0..5 { + let batch = create_test_batch(i * 10, 10); + writer.push_batch(&batch)?; + } + + // Read all batches and verify FIFO order + for i in 0..5 { + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 10); + + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), i * 10, "Batch {i} not in FIFO order"); + } + + Ok(()) + } + + #[tokio::test] + async fn test_empty_writer() -> Result<()> { + let (_writer, reader) = create_spill_channel(1024 * 1024); + + // Reader should pend since no batches were written + let mut reader = reader; + let result = + tokio::time::timeout(std::time::Duration::from_millis(100), reader.next()) + .await; + + assert!(result.is_err(), "Reader should timeout on empty writer"); + + Ok(()) + } + + #[tokio::test] + async fn test_empty_batch_skipping() -> Result<()> { + let (writer, mut reader) = create_spill_channel(1024 * 1024); + + // Write empty batch + let empty_batch = create_test_batch(0, 0); + writer.push_batch(&empty_batch)?; + + // Write non-empty batch + let batch = create_test_batch(0, 5); + writer.push_batch(&batch)?; + + // Should only read the non-empty batch + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 5); + + Ok(()) + } + + #[tokio::test] + async fn test_rotation_triggered_by_size() -> Result<()> { + // Set a small max_file_size to trigger rotation after one batch + let batch1 = create_test_batch(0, 10); + let batch_size = batch1.get_array_memory_size() + 1; + + let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size); + + // Write first batch (should fit in first file) + writer.push_batch(&batch1)?; + + // Check metrics after first batch - file created but not finalized yet + assert_eq!( + metrics.spill_file_count.value(), + 1, + "Should have created 1 file after first batch" + ); + assert_eq!( + metrics.spilled_bytes.value(), + 0, + "Spilled bytes should be 0 before file finalization" + ); + assert_eq!( + metrics.spilled_rows.value(), + 10, + "Should have spilled 10 rows from first batch" + ); + + // Write second batch (should trigger rotation - finalize first file) + let batch2 = create_test_batch(10, 10); + assert!( + batch2.get_array_memory_size() <= batch_size, + "batch2 size {} exceeds limit {batch_size}", + batch2.get_array_memory_size(), + ); + assert!( + batch1.get_array_memory_size() + batch2.get_array_memory_size() > batch_size, + "Combined size {} does not exceed limit to trigger rotation", + batch1.get_array_memory_size() + batch2.get_array_memory_size() + ); + writer.push_batch(&batch2)?; + + // Check metrics after rotation - first file finalized, but second file not created yet + // (new file created lazily on next push_batch call) + assert_eq!( + metrics.spill_file_count.value(), + 1, + "Should still have 1 file (second file not created until next write)" + ); + assert!( + metrics.spilled_bytes.value() > 0, + "Spilled bytes should be > 0 after first file finalized (got {})", + metrics.spilled_bytes.value() + ); + assert_eq!( + metrics.spilled_rows.value(), + 20, + "Should have spilled 20 total rows (10 + 10)" + ); + + // Write a third batch to confirm rotation occurred (creates second file) + let batch3 = create_test_batch(20, 5); + writer.push_batch(&batch3)?; + + // Now check that second file was created + assert_eq!( + metrics.spill_file_count.value(), + 2, + "Should have created 2 files after writing to new file" + ); + assert_eq!( + metrics.spilled_rows.value(), + 25, + "Should have spilled 25 total rows (10 + 10 + 5)" + ); + + // Read all three batches + let result1 = reader.next().await.unwrap()?; + assert_eq!(result1.num_rows(), 10); + + let result2 = reader.next().await.unwrap()?; + assert_eq!(result2.num_rows(), 10); + + let result3 = reader.next().await.unwrap()?; + assert_eq!(result3.num_rows(), 5); + + Ok(()) + } + + #[tokio::test] + async fn test_multiple_rotations() -> Result<()> { + let batches = (0..10) + .map(|i| create_test_batch(i * 10, 10)) + .collect::>(); + + let batch_size = batches[0].get_array_memory_size() * 2 + 1; + + // Very small max_file_size to force frequent rotations + let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size); + + // Write many batches to cause multiple rotations + for i in 0..10 { + let batch = create_test_batch(i * 10, 10); + writer.push_batch(&batch)?; + } + + // Check metrics after all writes - should have multiple files due to rotations + // With batch_size = 2 * one_batch + 1, each file fits ~2 batches before rotating + // 10 batches should create multiple files (exact count depends on rotation timing) + let file_count = metrics.spill_file_count.value(); + assert!( + file_count >= 4, + "Should have created at least 4 files with multiple rotations (got {file_count})" + ); + assert!( + metrics.spilled_bytes.value() > 0, + "Spilled bytes should be > 0 after rotations (got {})", + metrics.spilled_bytes.value() + ); + assert_eq!( + metrics.spilled_rows.value(), + 100, + "Should have spilled 100 total rows (10 batches * 10 rows)" + ); + + // Read all batches and verify order + for i in 0..10 { + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 10); + + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + col.value(0), + i * 10, + "Batch {i} not in correct order after rotations" + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_single_batch_larger_than_limit() -> Result<()> { + // Very small limit + let (writer, mut reader, metrics) = create_spill_channel_with_metrics(100); + + // Write a batch that exceeds the limit + let large_batch = create_test_batch(0, 100); + writer.push_batch(&large_batch)?; + + // Check metrics after large batch - should trigger rotation immediately + assert_eq!( + metrics.spill_file_count.value(), + 1, + "Should have created 1 file for large batch" + ); + assert_eq!( + metrics.spilled_rows.value(), + 100, + "Should have spilled 100 rows from large batch" + ); + + // Should still write and read successfully + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 100); + + // Next batch should go to a new file + let batch2 = create_test_batch(100, 10); + writer.push_batch(&batch2)?; + + // Check metrics after second batch - should have rotated to a new file + assert_eq!( + metrics.spill_file_count.value(), + 2, + "Should have created 2 files after rotation" + ); + assert_eq!( + metrics.spilled_rows.value(), + 110, + "Should have spilled 110 total rows (100 + 10)" + ); + + let result2 = reader.next().await.unwrap()?; + assert_eq!(result2.num_rows(), 10); + + Ok(()) + } + + #[tokio::test] + async fn test_very_small_max_file_size() -> Result<()> { + // Test with just 1 byte max (extreme case) + let (writer, mut reader) = create_spill_channel(1); + + // Any batch will exceed this limit + let batch = create_test_batch(0, 5); + writer.push_batch(&batch)?; + + // Should still work + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 5); + + Ok(()) + } + + #[tokio::test] + async fn test_exact_size_boundary() -> Result<()> { + // Create a batch and measure its approximate size + let batch = create_test_batch(0, 10); + let batch_size = batch.get_array_memory_size(); + + // Set max_file_size to exactly the batch size + let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size); + + // Write first batch (exactly at the size limit) + writer.push_batch(&batch)?; + + // Check metrics after first batch - should NOT rotate yet (size == limit, not >) + assert_eq!( + metrics.spill_file_count.value(), + 1, + "Should have created 1 file after first batch at exact boundary" + ); + assert_eq!( + metrics.spilled_rows.value(), + 10, + "Should have spilled 10 rows from first batch" + ); + + // Write second batch (exceeds the limit, should trigger rotation) + let batch2 = create_test_batch(10, 10); + writer.push_batch(&batch2)?; + + // Check metrics after second batch - rotation triggered, first file finalized + // Note: second file not created yet (lazy creation on next write) + assert_eq!( + metrics.spill_file_count.value(), + 1, + "Should still have 1 file after rotation (second file created lazily)" + ); + assert_eq!( + metrics.spilled_rows.value(), + 20, + "Should have spilled 20 total rows (10 + 10)" + ); + // Verify first file was finalized by checking spilled_bytes + assert!( + metrics.spilled_bytes.value() > 0, + "Spilled bytes should be > 0 after file finalization (got {})", + metrics.spilled_bytes.value() + ); + + // Both should be readable + let result1 = reader.next().await.unwrap()?; + assert_eq!(result1.num_rows(), 10); + + let result2 = reader.next().await.unwrap()?; + assert_eq!(result2.num_rows(), 10); + + // Spill another batch, now we should see the second file created + let batch3 = create_test_batch(20, 5); + writer.push_batch(&batch3)?; + assert_eq!( + metrics.spill_file_count.value(), + 2, + "Should have created 2 files after writing to new file" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_concurrent_reader_writer() -> Result<()> { + let (writer, mut reader) = create_spill_channel(1024 * 1024); + + // Spawn writer task + let writer_handle = SpawnedTask::spawn(async move { + for i in 0..10 { + let batch = create_test_batch(i * 10, 10); + writer.push_batch(&batch).unwrap(); + // Small delay to simulate real concurrent work + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + }); + + // Reader task (runs concurrently) + let reader_handle = SpawnedTask::spawn(async move { + let mut count = 0; + for i in 0..10 { + let result = reader.next().await.unwrap().unwrap(); + assert_eq!(result.num_rows(), 10); + + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), i * 10); + count += 1; + } + count + }); + + // Wait for both to complete + writer_handle.await.unwrap(); + let batches_read = reader_handle.await.unwrap(); + assert_eq!(batches_read, 10); + + Ok(()) + } + + #[tokio::test] + async fn test_reader_catches_up_to_writer() -> Result<()> { + let (writer, mut reader) = create_spill_channel(1024 * 1024); + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum ReadWriteEvent { + ReadStart, + Read(usize), + Write(usize), + } + + let events = Arc::new(Mutex::new(vec![])); + // Start reader first (will pend) + let reader_events = Arc::clone(&events); + let reader_handle = SpawnedTask::spawn(async move { + reader_events.lock().push(ReadWriteEvent::ReadStart); + let result = reader.next().await.unwrap().unwrap(); + reader_events + .lock() + .push(ReadWriteEvent::Read(result.num_rows())); + let result = reader.next().await.unwrap().unwrap(); + reader_events + .lock() + .push(ReadWriteEvent::Read(result.num_rows())); + }); + + // Give reader time to start pending + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + + // Now write a batch (should wake the reader) + let batch = create_test_batch(0, 5); + events.lock().push(ReadWriteEvent::Write(batch.num_rows())); + writer.push_batch(&batch)?; + + // Wait for the reader to process + let processed = async { + loop { + if events.lock().len() >= 3 { + break; + } + tokio::time::sleep(std::time::Duration::from_micros(500)).await; + } + }; + tokio::time::timeout(std::time::Duration::from_secs(1), processed) + .await + .unwrap(); + + // Write another batch + let batch = create_test_batch(5, 10); + events.lock().push(ReadWriteEvent::Write(batch.num_rows())); + writer.push_batch(&batch)?; + + // Reader should complete + reader_handle.await.unwrap(); + let events = events.lock().clone(); + assert_eq!( + events, + vec![ + ReadWriteEvent::ReadStart, + ReadWriteEvent::Write(5), + ReadWriteEvent::Read(5), + ReadWriteEvent::Write(10), + ReadWriteEvent::Read(10) + ] + ); + + Ok(()) + } + + #[tokio::test] + async fn test_reader_starts_after_writer_finishes() -> Result<()> { + let (writer, reader) = create_spill_channel(128); + + // Writer writes all data + for i in 0..5 { + let batch = create_test_batch(i * 10, 10); + writer.push_batch(&batch)?; + } + + drop(writer); + + // Now start reader + let mut reader = reader; + let mut count = 0; + for i in 0..5 { + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 10); + + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), i * 10); + count += 1; + } + + assert_eq!(count, 5, "Should read all batches after writer finishes"); + + Ok(()) + } + + #[tokio::test] + async fn test_writer_drop_finalizes_file() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = create_test_schema(); + let spill_manager = + Arc::new(SpillManager::new(Arc::clone(&env), metrics.clone(), schema)); + + let (writer, mut reader) = channel(1024 * 1024, spill_manager); + + // Write some batches + for i in 0..5 { + let batch = create_test_batch(i * 10, 10); + writer.push_batch(&batch)?; + } + + // Check metrics before drop - spilled_bytes should be 0 since file isn't finalized yet + let spilled_bytes_before = metrics.spilled_bytes.value(); + assert_eq!( + spilled_bytes_before, 0, + "Spilled bytes should be 0 before writer is dropped" + ); + + // Explicitly drop the writer - this should finalize the current file + drop(writer); + + // Check metrics after drop - spilled_bytes should be > 0 now + let spilled_bytes_after = metrics.spilled_bytes.value(); + assert!( + spilled_bytes_after > 0, + "Spilled bytes should be > 0 after writer is dropped (got {spilled_bytes_after})" + ); + + // Verify reader can still read all batches + let mut count = 0; + for i in 0..5 { + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), 10); + + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), i * 10); + count += 1; + } + + assert_eq!(count, 5, "Should read all batches after writer is dropped"); + + Ok(()) + } + + #[tokio::test] + async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> { + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + + // Test configuration + const NUM_BATCHES: usize = 3; + const ROWS_PER_BATCH: usize = 100; + + // Step 1: Create a test batch and measure its size + let batch = create_test_batch(0, ROWS_PER_BATCH); + let batch_size = batch.get_array_memory_size(); + + // Step 2: Configure file rotation to approximately 1 batch per file + // Create a custom RuntimeEnv so we can access the DiskManager + let runtime = Arc::new(RuntimeEnvBuilder::default().build()?); + let disk_manager = Arc::clone(&runtime.disk_manager); + + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = create_test_schema(); + let spill_manager = Arc::new(SpillManager::new(runtime, metrics.clone(), schema)); + + let (writer, mut reader) = channel(batch_size, spill_manager); + + // Step 3: Write NUM_BATCHES batches to create approximately NUM_BATCHES files + for i in 0..NUM_BATCHES { + let start = (i * ROWS_PER_BATCH) as i32; + writer.push_batch(&create_test_batch(start, ROWS_PER_BATCH))?; + } + + // Check how many files were created (should be at least a few due to file rotation) + let file_count = metrics.spill_file_count.value(); + assert_eq!( + file_count, + NUM_BATCHES - 1, + "Expected at {} files with rotation, got {file_count}", + NUM_BATCHES - 1 + ); + + // Step 4: Verify initial disk usage reflects all files + let initial_disk_usage = disk_manager.used_disk_space(); + assert!( + initial_disk_usage > 0, + "Expected disk usage > 0 after writing batches, got {initial_disk_usage}" + ); + + // Step 5: Read NUM_BATCHES - 1 batches (all but 1) + // As each file is fully consumed, it should be dropped and disk usage should decrease + for i in 0..(NUM_BATCHES - 1) { + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), ROWS_PER_BATCH); + + let col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), (i * ROWS_PER_BATCH) as i32); + } + + // Step 6: Verify disk usage decreased but is not zero (at least 1 batch remains) + let partial_disk_usage = disk_manager.used_disk_space(); + assert!( + partial_disk_usage > 0 + && partial_disk_usage < (batch_size * NUM_BATCHES * 2) as u64, + "Disk usage should be > 0 with remaining batches" + ); + assert!( + partial_disk_usage < initial_disk_usage, + "Disk usage should have decreased after reading most batches: initial={initial_disk_usage}, partial={partial_disk_usage}" + ); + + // Step 7: Read the final batch + let result = reader.next().await.unwrap()?; + assert_eq!(result.num_rows(), ROWS_PER_BATCH); + + // Step 8: Drop writer first to signal no more data will be written + // The reader has infinite stream semantics and will wait for the writer + // to be dropped before returning None + drop(writer); + + // Verify we've read all batches - now the reader should return None + assert!( + reader.next().await.is_none(), + "Should have no more batches to read" + ); + + // Step 9: Drop reader to release all references + drop(reader); + + // Step 10: Verify complete cleanup - disk usage should be 0 + let final_disk_usage = disk_manager.used_disk_space(); + assert_eq!( + final_disk_usage, 0, + "Disk usage should be 0 after all files dropped, got {final_disk_usage}" + ); + + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index c674057151492..35e5dc5745996 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -223,6 +223,7 @@ datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 +datafusion.execution.max_spill_file_size_bytes 134217728 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.minimum_parallel_output_files 4 datafusion.execution.objectstore_writer_buffer_size 10485760 @@ -343,6 +344,7 @@ datafusion.execution.keep_partition_by_columns false Should DataFusion keep the datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption +datafusion.execution.max_spill_file_size_bytes 134217728 Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. datafusion.execution.objectstore_writer_buffer_size 10485760 Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 7ca5eb8f7be45..403546873d912 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -114,6 +114,7 @@ The following configuration settings are available: | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.max_spill_file_size_bytes | 134217728 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |