Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
69ab5d2
implement SpillingPool
adriangb Oct 21, 2025
2c9bb58
clippy
adriangb Oct 21, 2025
bfabc5e
use buffered stream
adriangb Oct 21, 2025
f183b9e
lint
adriangb Oct 21, 2025
0cc7a63
rework
adriangb Oct 21, 2025
6dd6b6d
Add some tests
adriangb Oct 21, 2025
b25bdfa
fix lints
adriangb Oct 22, 2025
03ffc3c
bugfix
adriangb Oct 22, 2025
966a213
address pr feedback
adriangb Oct 22, 2025
1dbc730
fix build
adriangb Oct 23, 2025
e2ed52d
fix dropped task
adriangb Oct 23, 2025
c850240
updarte docstrings
adriangb Oct 23, 2025
303258e
remove wrapper struct
adriangb Oct 23, 2025
ab7f350
hide stream behind a trait to avoid making more public stuff
adriangb Oct 23, 2025
9033e84
wip on cleanup
adriangb Oct 24, 2025
cccbd73
add a detailed test
adriangb Oct 24, 2025
f43bbb6
Add slt test
adriangb Oct 24, 2025
5bf14ec
use 128MB as the default spill file size
adriangb Oct 24, 2025
39577f0
make SpillPool pub(crate)
adriangb Oct 24, 2025
468ad57
fmt
adriangb Oct 24, 2025
89dc51c
lint
adriangb Oct 24, 2025
54652ef
update slt
adriangb Oct 24, 2025
d7af8ef
remove test
adriangb Oct 30, 2025
0f6c83a
clippy
adriangb Oct 30, 2025
93779c0
Update datafusion/physical-plan/src/spill/spill_pool.rs
adriangb Nov 2, 2025
192f3ef
update docs
adriangb Nov 2, 2025
374319b
Apply suggestion from @2010YOUY01
adriangb Nov 2, 2025
b091574
lints
adriangb Nov 4, 2025
c34a5df
Add state machine for stream polling
adriangb Nov 4, 2025
e9aedf6
wip
adriangb Nov 5, 2025
30bcfc9
rewrite spilling pool, incorporate new architecture
adriangb Nov 5, 2025
df58e8b
re-incorporate gated channels
adriangb Nov 5, 2025
66790ec
add lots of docs
adriangb Nov 5, 2025
3f40fdb
add metrics for file and byte counts to tests
adriangb Nov 5, 2025
f61148a
Update datafusion/physical-plan/src/repartition/mod.rs
adriangb Nov 5, 2025
2dcf3f5
Update datafusion/physical-plan/src/repartition/mod.rs
adriangb Nov 5, 2025
e31f827
clippy
adriangb Nov 5, 2025
8cba32e
tweak docstrings
adriangb Nov 6, 2025
f1de9da
fmt
adriangb Nov 6, 2025
d05b0cf
fix and track disk usage
adriangb Nov 7, 2025
23ebd30
clean up test
adriangb Nov 7, 2025
34eb564
fix docs
adriangb Nov 7, 2025
195f72f
use a single waker
adriangb Nov 7, 2025
86364b2
fix test, make MPSC, docs, cleanup
adriangb Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,23 @@ config_namespace! {
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Maximum size in bytes for individual spill files before rotating to a new file.
///
/// When operators spill data to disk (e.g., RepartitionExec), they write
/// multiple batches to the same file until this size limit is reached, then rotate
/// to a new file. This reduces syscall overhead compared to one-file-per-batch
/// while preventing files from growing too large.
///
/// A larger value reduces file creation overhead but may hold more disk space.
/// A smaller value creates more files but allows finer-grained space reclamation
/// as files can be deleted once fully consumed.
///
/// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
/// may create spill files larger than the limit.
///
/// Default: 128 MB
pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

Expand Down
253 changes: 237 additions & 16 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,13 @@ impl DiskManager {

let dir_index = rng().random_range(0..local_dirs.len());
Ok(RefCountedTempFile {
_parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
current_file_disk_usage: 0,
parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Arc::new(
Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
),
current_file_disk_usage: Arc::new(AtomicU64::new(0)),
disk_manager: Arc::clone(self),
})
}
Expand All @@ -311,26 +313,50 @@ impl DiskManager {
/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
/// This ensures the disk manager can properly enforce usage limits configured by
/// [`DiskManager::with_max_temp_directory_size`].
///
/// This type is Clone-able, allowing multiple references to the same underlying file.
/// The file is deleted only when the last reference is dropped.
///
/// The parent temporary directory is also kept alive as long as any reference to
/// this file exists, preventing premature cleanup of the directory.
///
/// Once all references to this file are dropped, the file is deleted, and the
/// disk usage is subtracted from the disk manager's total.
#[derive(Debug)]
pub struct RefCountedTempFile {
/// The reference to the directory in which temporary files are created to ensure
/// it is not cleaned up prior to the NamedTempFile
_parent_temp_dir: Arc<TempDir>,
tempfile: NamedTempFile,
parent_temp_dir: Arc<TempDir>,
/// The underlying temporary file, wrapped in Arc to allow cloning
tempfile: Arc<NamedTempFile>,
/// Tracks the current disk usage of this temporary file. See
/// [`Self::update_disk_usage`] for more details.
current_file_disk_usage: u64,
///
/// This is wrapped in `Arc<AtomicU64>` so that all clones share the same
/// disk usage tracking, preventing incorrect accounting when clones are dropped.
current_file_disk_usage: Arc<AtomicU64>,
/// The disk manager that created and manages this temporary file
disk_manager: Arc<DiskManager>,
}

impl Clone for RefCountedTempFile {
fn clone(&self) -> Self {
Self {
parent_temp_dir: Arc::clone(&self.parent_temp_dir),
tempfile: Arc::clone(&self.tempfile),
current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
disk_manager: Arc::clone(&self.disk_manager),
}
}
}

impl RefCountedTempFile {
pub fn path(&self) -> &Path {
self.tempfile.path()
}

pub fn inner(&self) -> &NamedTempFile {
&self.tempfile
self.tempfile.as_ref()
}

/// Updates the global disk usage counter after modifications to the underlying file.
Expand All @@ -342,11 +368,14 @@ impl RefCountedTempFile {
let metadata = self.tempfile.as_file().metadata()?;
let new_disk_usage = metadata.len();

// Get the old disk usage
let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);

// Update the global disk usage by:
// 1. Subtracting the old file size from the global counter
self.disk_manager
.used_disk_space
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
.fetch_sub(old_disk_usage, Ordering::Relaxed);
// 2. Adding the new file size to the global counter
self.disk_manager
.used_disk_space
Expand All @@ -362,23 +391,29 @@ impl RefCountedTempFile {
}

// 4. Update the local file size tracking
self.current_file_disk_usage = new_disk_usage;
self.current_file_disk_usage
.store(new_disk_usage, Ordering::Relaxed);

Ok(())
}

pub fn current_disk_usage(&self) -> u64 {
self.current_file_disk_usage
self.current_file_disk_usage.load(Ordering::Relaxed)
}
}

/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
impl Drop for RefCountedTempFile {
fn drop(&mut self) {
// Subtract the current file's disk usage from the global counter
self.disk_manager
.used_disk_space
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
// Only subtract disk usage when this is the last reference to the file
// Check if we're the last one by seeing if there's only one strong reference
// left to the underlying tempfile (the one we're holding)
if Arc::strong_count(&self.tempfile) == 1 {
let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
self.disk_manager
.used_disk_space
.fetch_sub(current_usage, Ordering::Relaxed);
}
}
}

Expand Down Expand Up @@ -533,4 +568,190 @@ mod tests {

Ok(())
}

#[test]
fn test_disk_usage_basic() -> Result<()> {
use std::io::Write;

let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut temp_file = dm.create_tmp_file("Testing")?;

// Initially, disk usage should be 0
assert_eq!(dm.used_disk_space(), 0);
assert_eq!(temp_file.current_disk_usage(), 0);

// Write some data to the file
temp_file.inner().as_file().write_all(b"hello world")?;
temp_file.update_disk_usage()?;

// Disk usage should now reflect the written data
let expected_usage = temp_file.current_disk_usage();
assert!(expected_usage > 0);
assert_eq!(dm.used_disk_space(), expected_usage);

// Write more data
temp_file.inner().as_file().write_all(b" more data")?;
temp_file.update_disk_usage()?;

// Disk usage should increase
let new_usage = temp_file.current_disk_usage();
assert!(new_usage > expected_usage);
assert_eq!(dm.used_disk_space(), new_usage);

// Drop the file
drop(temp_file);

// Disk usage should return to 0
assert_eq!(dm.used_disk_space(), 0);

Ok(())
}

#[test]
fn test_disk_usage_with_clones() -> Result<()> {
use std::io::Write;

let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut temp_file = dm.create_tmp_file("Testing")?;

// Write some data
temp_file.inner().as_file().write_all(b"test data")?;
temp_file.update_disk_usage()?;

let usage_after_write = temp_file.current_disk_usage();
assert!(usage_after_write > 0);
assert_eq!(dm.used_disk_space(), usage_after_write);

// Clone the file
let clone1 = temp_file.clone();
let clone2 = temp_file.clone();

// All clones should see the same disk usage
assert_eq!(clone1.current_disk_usage(), usage_after_write);
assert_eq!(clone2.current_disk_usage(), usage_after_write);

// Global disk usage should still be the same (not multiplied by number of clones)
assert_eq!(dm.used_disk_space(), usage_after_write);

// Write more data through one clone
clone1.inner().as_file().write_all(b" more data")?;
let mut mutable_clone1 = clone1;
mutable_clone1.update_disk_usage()?;

let new_usage = mutable_clone1.current_disk_usage();
assert!(new_usage > usage_after_write);

// All clones should see the updated disk usage
assert_eq!(temp_file.current_disk_usage(), new_usage);
assert_eq!(clone2.current_disk_usage(), new_usage);
assert_eq!(mutable_clone1.current_disk_usage(), new_usage);

// Global disk usage should reflect the new size (not multiplied)
assert_eq!(dm.used_disk_space(), new_usage);

// Drop one clone
drop(mutable_clone1);

// Disk usage should NOT change (other clones still exist)
assert_eq!(dm.used_disk_space(), new_usage);
assert_eq!(temp_file.current_disk_usage(), new_usage);
assert_eq!(clone2.current_disk_usage(), new_usage);

// Drop another clone
drop(clone2);

// Disk usage should still NOT change (original still exists)
assert_eq!(dm.used_disk_space(), new_usage);
assert_eq!(temp_file.current_disk_usage(), new_usage);

// Drop the original
drop(temp_file);

// Now disk usage should return to 0 (last reference dropped)
assert_eq!(dm.used_disk_space(), 0);

Ok(())
}

#[test]
fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
use std::io::Write;

let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut temp_file = dm.create_tmp_file("Testing")?;

// Write data
temp_file.inner().as_file().write_all(b"test")?;
temp_file.update_disk_usage()?;

let usage = temp_file.current_disk_usage();
assert_eq!(dm.used_disk_space(), usage);

// Create multiple clones
let clone1 = temp_file.clone();
let clone2 = temp_file.clone();
let clone3 = temp_file.clone();

// Drop the original first (out of order)
drop(temp_file);

// Disk usage should still be tracked (clones exist)
assert_eq!(dm.used_disk_space(), usage);
assert_eq!(clone1.current_disk_usage(), usage);

// Drop clones in different order
drop(clone2);
assert_eq!(dm.used_disk_space(), usage);

drop(clone1);
assert_eq!(dm.used_disk_space(), usage);

// Drop the last clone
drop(clone3);

// Now disk usage should be 0
assert_eq!(dm.used_disk_space(), 0);

Ok(())
}

#[test]
fn test_disk_usage_multiple_files() -> Result<()> {
use std::io::Write;

let dm = Arc::new(DiskManagerBuilder::default().build()?);

// Create multiple temp files
let mut file1 = dm.create_tmp_file("Testing1")?;
let mut file2 = dm.create_tmp_file("Testing2")?;

// Write to first file
file1.inner().as_file().write_all(b"file1")?;
file1.update_disk_usage()?;
let usage1 = file1.current_disk_usage();

assert_eq!(dm.used_disk_space(), usage1);

// Write to second file
file2.inner().as_file().write_all(b"file2 data")?;
file2.update_disk_usage()?;
let usage2 = file2.current_disk_usage();

// Global usage should be sum of both files
assert_eq!(dm.used_disk_space(), usage1 + usage2);

// Drop first file
drop(file1);

// Usage should only reflect second file
assert_eq!(dm.used_disk_space(), usage2);

// Drop second file
drop(file2);

// Usage should be 0
assert_eq!(dm.used_disk_space(), 0);

Ok(())
}
}
Loading