-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
When using a grouping (I tested with distinct_on) in combination with a FairSpillPool not all memory seems to be tagged correctly. While the grouping itself does gracefully handle allocation errors by spilling, the same reservation is also shared with a BatchBuilder that does not.
To Reproduce
(Tested on Linux, Fedora 40)
- Clone my reproducer: https://github.com/Ablu/datafusion-repro-resource-allocation (chances are that this can be simplified further)
- Generate test file:
cargo run --release -- generate out.parquet - increase open file limit:
ulimit -nS 102400 - Attempt to run the deduplication query:
cargo run --release -- deduplicate out.parquet dedup.parquet
Failure to write back final result: ResourcesExhausted("Failed to allocate additional 8939616 bytes for GroupedHashAggregateStream[10] with 58373092 bytes already allocated - maximum available is 62500000")
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
It looks like it tries to acquire from the smaller "spillable" part of the memory while it should probably allocate with a non-spillable reservation.
As a hack I remove the offending resource allocations:
diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs
index d32c60697..d0305c0bf 100644
--- a/datafusion/physical-plan/src/sorts/builder.rs
+++ b/datafusion/physical-plan/src/sorts/builder.rs
@@ -65,15 +65,15 @@ impl BatchBuilder {
indices: Vec::with_capacity(batch_size),
reservation,
}
}
/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
- self.reservation.try_grow(batch.get_array_memory_size())?;
+ // dbg!(self.reservation.try_grow(batch.get_array_memory_size()))?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
};
Ok(())
@@ -137,15 +137,15 @@ impl BatchBuilder {
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;
if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
- self.reservation.shrink(batch.get_array_memory_size());
+ // self.reservation.shrink(batch.get_array_memory_size());
}
retain
});
Ok(Some(RecordBatch::try_new(
Arc::clone(&self.schema),
columns,
That avoids the immediate error. It looks like we progress further, but eventually end in some kind of deadlock (that I have not fully understand yet, but it does not seem to be related to this hack?)
Expected behavior
The query + writeback should pass after reading back spilled data.
Additional context
Originally discussed/analyzed on Discord: https://discord.com/channels/885562378132000778/1166447479609376850/1260302583637999756