From 74efab5206920ea920e00a122d6e14420f847342 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sun, 8 Feb 2026 06:48:14 -0800 Subject: [PATCH 1/3] perf!: remove shuffle buffer This removes a buffer in the shuffler that accumulated batches for batched writes to temporary storage. This was configured with a public buffer_size parameter hence the breaking change. Previously, when we shuffled data we accumulated this many batches for each partition in memory and then flushed them all to disk at once. This may have been intended as an optimization in the original implementation of the shuffler, which supported external shuffling through arbitrary object storage. However, the shuffler was subsequently hardcoded to use local disk (where this kind of buffering serves no benefit) and even on remote object storage, we already have a layer of buffering in the storage writer. Instead of buffering batches, just write them directly to the FileWriter. This results in much more predictable memory usage and also faster index builds. --- rust/lance-index/src/vector/v3/shuffler.rs | 42 +++------------------- 1 file changed, 4 insertions(+), 38 deletions(-) diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index c1d74812b85..f054f8fe34d 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -9,7 +9,6 @@ use std::sync::Arc; use arrow::{array::AsArray, compute::sort_to_indices}; use arrow_array::{RecordBatch, UInt32Array}; use arrow_schema::Schema; -use future::try_join_all; use futures::prelude::*; use lance_arrow::{RecordBatchExt, SchemaExt}; use lance_core::{ @@ -69,7 +68,6 @@ pub struct IvfShuffler { num_partitions: usize, // options - buffer_size: usize, precomputed_shuffle_buffers: Option>, } @@ -79,16 +77,10 @@ impl IvfShuffler { object_store: Arc::new(ObjectStore::local()), output_dir, num_partitions, - buffer_size: 4096, precomputed_shuffle_buffers: None, } } - pub fn with_buffer_size(mut self, buffer_size: usize) -> Self { - self.buffer_size = buffer_size; - self - } - pub fn with_precomputed_shuffle_buffers( mut self, precomputed_shuffle_buffers: Option>, @@ -163,43 +155,17 @@ impl Shuffler for IvfShuffler { }) .buffered(get_num_compute_intensive_cpus()); - // part_id: | 0 | 1 | 3 | - // partition_buffers: |[batch,batch,..]|[batch,batch,..]|[batch,batch,..]| - let mut partition_buffers = vec![Vec::new(); num_partitions]; - - let mut counter = 0; let mut total_loss = 0.0; while let Some(shuffled) = parallel_sort_stream.next().await { let (shuffled, loss) = shuffled?; total_loss += loss; for (part_id, batches) in shuffled.into_iter().enumerate() { - let part_batches = &mut partition_buffers[part_id]; - part_batches.extend(batches); - } - - counter += 1; - - // do flush - if counter % self.buffer_size == 0 { - let mut futs = vec![]; - for (part_id, writer) in writers.iter_mut().enumerate() { - let batches = &partition_buffers[part_id]; - partition_sizes[part_id] += batches.iter().map(|b| b.num_rows()).sum::(); - futs.push(writer.write_batches(batches.iter())); + if !batches.is_empty() { + partition_sizes[part_id] += + batches.iter().map(|b| b.num_rows()).sum::(); + writers[part_id].write_batches(batches.iter()).await?; } - try_join_all(futs).await?; - - partition_buffers.iter_mut().for_each(|b| b.clear()); - } - } - - // final flush - for (part_id, batches) in partition_buffers.into_iter().enumerate() { - let writer = &mut writers[part_id]; - partition_sizes[part_id] += batches.iter().map(|b| b.num_rows()).sum::(); - for batch in batches.iter() { - writer.write_batch(batch).await?; } } From 33b50faa9ef62fc1d350741e215d76d402e8254b Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sun, 8 Feb 2026 07:00:45 -0800 Subject: [PATCH 2/3] lint --- rust/lance-index/src/vector/v3/shuffler.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index f054f8fe34d..ff49a27a765 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -162,8 +162,7 @@ impl Shuffler for IvfShuffler { for (part_id, batches) in shuffled.into_iter().enumerate() { if !batches.is_empty() { - partition_sizes[part_id] += - batches.iter().map(|b| b.num_rows()).sum::(); + partition_sizes[part_id] += batches.iter().map(|b| b.num_rows()).sum::(); writers[part_id].write_batches(batches.iter()).await?; } } From 8f12f82c07eeee1e097e73e92741e54d3cf99f5a Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 9 Feb 2026 07:31:11 -0800 Subject: [PATCH 3/3] make writes concurrent --- rust/lance-index/src/vector/v3/shuffler.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index ff49a27a765..595c0a0c736 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use arrow::{array::AsArray, compute::sort_to_indices}; use arrow_array::{RecordBatch, UInt32Array}; use arrow_schema::Schema; -use futures::prelude::*; +use futures::{future::try_join_all, prelude::*}; use lance_arrow::{RecordBatchExt, SchemaExt}; use lance_core::{ cache::LanceCache, @@ -160,12 +160,15 @@ impl Shuffler for IvfShuffler { let (shuffled, loss) = shuffled?; total_loss += loss; - for (part_id, batches) in shuffled.into_iter().enumerate() { + let mut futs = Vec::new(); + for (part_id, (writer, batches)) in writers.iter_mut().zip(shuffled.iter()).enumerate() + { if !batches.is_empty() { partition_sizes[part_id] += batches.iter().map(|b| b.num_rows()).sum::(); - writers[part_id].write_batches(batches.iter()).await?; + futs.push(writer.write_batches(batches.iter())); } } + try_join_all(futs).await?; } // finish all writers