From a554093fb92a6f16af439f3875321d5ea0369931 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sun, 8 Feb 2026 09:49:28 -0800 Subject: [PATCH 1/3] feat: spill page metadata to disk during IVF shuffle During IVF shuffle, we have a FileWriter per partition and each accumulates page metadata in memory over the course of the shuffle. With large datasets and large numbers of partitions, this memory grows over time to dominate the memory cost of IVF shuffle. This patch adds optional functionality to the FileWriter that serializes page metadata to a spill file and enables it by default in the IVF shuffler. --- rust/lance-file/src/writer.rs | 238 ++++++++++++++++++++- rust/lance-index/src/vector/v3/shuffler.rs | 7 +- 2 files changed, 236 insertions(+), 9 deletions(-) diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index d4bdd996573..53565788d02 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_data::ArrayData; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::stream::FuturesOrdered; use futures::StreamExt; use lance_core::datatypes::{Field, Schema as LanceSchema}; @@ -100,6 +100,95 @@ pub struct FileWriterOptions { pub format_version: Option, } +// Total in-memory budget for buffering serialized page metadata before flushing +// to the spill file. Divided evenly across columns (with a floor of 64 bytes). +const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024; + +struct PageMetadataSpill { + writer: ObjectWriter, + object_store: Arc, + path: Path, + position: u64, + column_buffers: Vec>, + column_chunks: Vec>, + per_column_limit: usize, +} + +impl PageMetadataSpill { + async fn new(object_store: Arc, path: Path, num_columns: usize) -> Result { + let writer = object_store.create(&path).await?; + let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64); + Ok(Self { + writer, + object_store, + path, + position: 0, + column_buffers: vec![Vec::new(); num_columns], + column_chunks: vec![Vec::new(); num_columns], + per_column_limit, + }) + } + + async fn append_page( + &mut self, + column_idx: usize, + page: &pbfile::column_metadata::Page, + ) -> Result<()> { + page.encode_length_delimited(&mut self.column_buffers[column_idx]) + .map_err(|e| Error::IO { + source: Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + location: location!(), + })?; + if self.column_buffers[column_idx].len() >= self.per_column_limit { + self.flush_column(column_idx).await?; + } + Ok(()) + } + + async fn flush_column(&mut self, column_idx: usize) -> Result<()> { + let buf = &self.column_buffers[column_idx]; + if buf.is_empty() { + return Ok(()); + } + let len = buf.len(); + self.writer.write_all(buf).await?; + self.column_chunks[column_idx].push((self.position, len as u32)); + self.position += len as u64; + self.column_buffers[column_idx].clear(); + Ok(()) + } + + async fn shutdown_writer(&mut self) -> Result<()> { + for col_idx in 0..self.column_buffers.len() { + self.flush_column(col_idx).await?; + } + self.writer.shutdown().await?; + Ok(()) + } + +} + +fn decode_spilled_chunk(data: &Bytes) -> Result> { + let mut pages = Vec::new(); + let mut cursor = data.clone(); + while cursor.has_remaining() { + let page = + pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| { + Error::IO { + source: Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + location: location!(), + } + })?; + pages.push(page); + } + Ok(pages) +} + +enum PageSpillState { + Pending(Arc, Path), + Active(PageMetadataSpill), +} + pub struct FileWriter { writer: ObjectWriter, schema: Option, @@ -111,6 +200,7 @@ pub struct FileWriter { global_buffers: Vec<(u64, u64)>, schema_metadata: HashMap, options: FileWriterOptions, + page_spill: Option, } fn initial_column_metadata() -> pbfile::ColumnMetadata { @@ -165,10 +255,23 @@ impl FileWriter { field_id_to_column_indices: Vec::new(), global_buffers: Vec::new(), schema_metadata: HashMap::new(), + page_spill: None, options, } } + /// Spill page metadata to a sidecar file instead of accumulating in memory. + /// + /// This can dramatically reduce memory usage when many writers are open + /// concurrently (e.g. IVF shuffle with thousands of partition writers). + /// The sidecar file is created lazily on the first page write. The caller + /// is responsible for cleaning up `path` (e.g. by placing it in a temp + /// directory that is removed via RAII). + pub fn with_page_metadata_spill(mut self, object_store: Arc, path: Path) -> Self { + self.page_spill = Some(PageSpillState::Pending(object_store, path)); + self + } + /// Write a series of record batches to a new file /// /// Returns the number of rows written @@ -223,9 +326,20 @@ impl FileWriter { length: encoded_page.num_rows, priority: encoded_page.row_number, }; - self.column_metadata[encoded_page.column_idx as usize] - .pages - .push(page); + let col_idx = encoded_page.column_idx as usize; + if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) { + let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else { + unreachable!() + }; + self.page_spill = Some(PageSpillState::Active( + PageMetadataSpill::new(store, path, self.num_columns as usize).await?, + )); + } + match &mut self.page_spill { + Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?, + None => self.column_metadata[col_idx].pages.push(page), + Some(PageSpillState::Pending(..)) => unreachable!(), + } Ok(()) } @@ -440,12 +554,41 @@ impl FileWriter { } async fn write_column_metadatas(&mut self) -> Result> { - let mut metadatas = Vec::new(); - std::mem::swap(&mut self.column_metadata, &mut metadatas); + let metadatas = std::mem::take(&mut self.column_metadata); + + // If spilling, finalize the spill writer and reopen for reading. + // The spill file itself is cleaned up by the caller (it lives in a + // temp directory managed by the caller's RAII guard). + let spill_state = self.page_spill.take(); + let (spill_chunks, spill_reader) = + if let Some(PageSpillState::Active(mut spill)) = spill_state { + spill.shutdown_writer().await?; + let reader = spill.object_store.open(&spill.path).await?; + let chunks = std::mem::take(&mut spill.column_chunks); + (chunks, Some(reader)) + } else { + (Vec::new(), None) + }; + let mut metadata_positions = Vec::with_capacity(metadatas.len()); - for metadata in metadatas { + for (col_idx, mut metadata) in metadatas.into_iter().enumerate() { + if let Some(reader) = &spill_reader { + let mut pages = Vec::new(); + for &(offset, len) in &spill_chunks[col_idx] { + let data = reader + .get_range(offset as usize..(offset as usize + len as usize)) + .await + .map_err(|e| Error::IO { + source: Box::new(e), + location: location!(), + })?; + pages.extend(decode_spilled_chunk(&data)?); + } + metadata.pages = pages; + } metadata_positions.push(self.write_column_metadata(metadata).await?); } + Ok(metadata_positions) } @@ -654,6 +797,7 @@ impl FileWriter { // 7. close the writer self.writer.shutdown().await?; + Ok(self.rows_written) } @@ -1472,4 +1616,84 @@ mod tests { // Verify first value matches what we wrote assert!(read_binary.value(0).iter().all(|&b| b == 42u8)); } + + fn spill_config() -> (TempObjFile, Arc) { + let spill_path = TempObjFile::default(); + (spill_path, Arc::new(ObjectStore::local())) + } + + fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec { + let fields: Vec<_> = (0..num_cols) + .map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false)) + .collect(); + let schema = Arc::new(ArrowSchema::new(fields)); + (0..num_batches) + .map(|i| { + let cols: Vec> = (0..num_cols) + .map(|c| { + let start = (i * rows_per_batch + c as i32) * 100; + Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch)) + as Arc + }) + .collect(); + RecordBatch::try_new(schema.clone(), cols).unwrap() + }) + .collect() + } + + async fn write_and_read_batches( + batches: &[RecordBatch], + spill: Option<(Arc, object_store::path::Path)>, + ) -> Vec { + let fs = FsFixture::default(); + let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap(); + let writer = fs.object_store.create(&fs.tmp_path).await.unwrap(); + let mut file_writer = + FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap(); + if let Some((store, path)) = spill { + file_writer = file_writer.with_page_metadata_spill(store, path); + } + for batch in batches { + file_writer.write_batch(batch).await.unwrap(); + } + file_writer.add_schema_metadata("foo", "bar"); + file_writer.finish().await.unwrap(); + + crate::testing::read_lance_file( + &fs, + Arc::::default(), + lance_encoding::decoder::FilterExpression::no_filter(), + ) + .await + } + + #[rstest::rstest] + #[case::multi_col(20, 2, 100)] + #[case::many_batches(50, 2, 100)] + #[tokio::test] + async fn test_page_metadata_spill_roundtrip( + #[case] num_batches: i32, + #[case] num_cols: usize, + #[case] rows_per_batch: i32, + ) { + let batches = make_batches(num_batches, num_cols, rows_per_batch); + let baseline = write_and_read_batches(&batches, None).await; + let (spill_path, spill_store) = spill_config(); + let spilled = + write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone()))) + .await; + assert_eq!(baseline, spilled); + } + + #[tokio::test] + async fn test_page_metadata_spill_many_columns() { + // Many columns forces small per-column buffer limits, exercising mid-write flushing. + let batches = make_batches(10, 500, 100); + let baseline = write_and_read_batches(&batches, None).await; + let (spill_path, spill_store) = spill_config(); + let spilled = + write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone()))) + .await; + assert_eq!(baseline, spilled); + } } diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index c1d74812b85..fc28a277b4a 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -110,15 +110,18 @@ impl Shuffler for IvfShuffler { let mut writers = stream::iter(0..num_partitions) .map(|partition_id| { let part_path = self.output_dir.child(format!("ivf_{}.lance", partition_id)); + let spill_path = self.output_dir.child(format!("ivf_{}.spill", partition_id)); let object_store = self.object_store.clone(); let schema = schema.clone(); async move { let writer = object_store.create(&part_path).await?; - FileWriter::try_new( + let file_writer = FileWriter::try_new( writer, lance_core::datatypes::Schema::try_from(&schema)?, Default::default(), - ) + )? + .with_page_metadata_spill(object_store.clone(), spill_path); + Result::Ok(file_writer) } }) .buffered(self.object_store.io_parallelism()) From 81dacd3453f6a0d8b1a4fdeff9dacc0d49d0f3da Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 10 Feb 2026 06:21:34 -0800 Subject: [PATCH 2/3] improve commenting --- rust/lance-file/src/writer.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 53565788d02..cb0416f1f19 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -104,13 +104,28 @@ pub struct FileWriterOptions { // to the spill file. Divided evenly across columns (with a floor of 64 bytes). const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024; +/// Spills serialized page metadata to a temporary file to bound memory usage. +/// +/// The spill file is an unstructured sequence of "chunks". Each chunk is a +/// contiguous run of length-delimited protobuf `Page` messages belonging to a +/// single column. Chunks from different columns are interleaved in the order +/// they are flushed (i.e. whenever a column's in-memory buffer exceeds +/// `per_column_limit`). The `column_chunks` index records the (offset, length) +/// of every chunk so each column's pages can be read back and reassembled in +/// order. struct PageMetadataSpill { writer: ObjectWriter, object_store: Arc, path: Path, + /// Current write position in the spill file. position: u64, + /// Per-column buffer of serialized (length-delimited protobuf) page metadata + /// that has not yet been flushed to the spill file. column_buffers: Vec>, + /// Per-column list of chunks that have been flushed to the spill file. + /// Each entry is (offset, length) pointing into the spill file. column_chunks: Vec>, + /// Maximum bytes to buffer per column before flushing to the spill file. per_column_limit: usize, } From 73defa7a76e0463ec2f68ee069827ac2d6d83422 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 10 Feb 2026 06:23:21 -0800 Subject: [PATCH 3/3] format --- rust/lance-file/src/writer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index cb0416f1f19..ea753f463f9 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -180,7 +180,6 @@ impl PageMetadataSpill { self.writer.shutdown().await?; Ok(()) } - } fn decode_spilled_chunk(data: &Bytes) -> Result> {