perf!: remove shuffle buffer#5912
Merged
westonpace merged 3 commits intolance-format:mainfrom Feb 10, 2026
Merged
Conversation
This removes a buffer in the shuffler that accumulated batches for batched writes to temporary storage. This was configured with a public buffer_size parameter hence the breaking change. Previously, when we shuffled data we accumulated this many batches for each partition in memory and then flushed them all to disk at once. This may have been intended as an optimization in the original implementation of the shuffler, which supported external shuffling through arbitrary object storage. However, the shuffler was subsequently hardcoded to use local disk (where this kind of buffering serves no benefit) and even on remote object storage, we already have a layer of buffering in the storage writer. Instead of buffering batches, just write them directly to the FileWriter. This results in much more predictable memory usage and also faster index builds.
Contributor
Author
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
westonpace
approved these changes
Feb 9, 2026
Member
westonpace
left a comment
There was a problem hiding this comment.
Nice work! The writers will already do their own buffering if they need to so I agree this extra layer of buffering is not needed.
| if !batches.is_empty() { | ||
| partition_sizes[part_id] += batches.iter().map(|b| b.num_rows()).sum::<usize>(); | ||
| futs.push(writer.write_batches(batches.iter())); | ||
| writers[part_id].write_batches(batches.iter()).await?; |
Member
There was a problem hiding this comment.
We can do this in a follow-up but it might be nice to still do all the writes in parallel. E.g. keep the futs Vec. shuffled is a Vec and not any kind of stream / iterator so the data is all in memory already (I think the important point is getting rid of the if counter % self.buffer_size == 0)
let mut futs = vec![];
if !batches.is_empty() {
futs.push(writers[part_id].write_batches(batches.iter()));
}
try_join_all(futs).await?;
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

This removes a buffer in the shuffler that accumulated batches for batched writes to temporary storage. This was configured with a public buffer_size parameter hence the breaking change.
Previously, when we shuffled data we accumulated this many batches for each partition in memory and then flushed them all to disk at once. This may have been intended as an optimization in the original implementation of the shuffler, which supported external shuffling through arbitrary object storage. However, the shuffler was subsequently hardcoded to use local disk (where this kind of buffering serves no benefit) and even on remote object storage, we already have a layer of buffering in the storage writer.
Instead of buffering batches, just write them directly to the FileWriter. This results in much more predictable memory usage and also faster index builds.