diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index d4bdd996573..ea753f463f9 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_data::ArrayData; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::stream::FuturesOrdered; use futures::StreamExt; use lance_core::datatypes::{Field, Schema as LanceSchema}; @@ -100,6 +100,109 @@ pub struct FileWriterOptions { pub format_version: Option, } +// Total in-memory budget for buffering serialized page metadata before flushing +// to the spill file. Divided evenly across columns (with a floor of 64 bytes). +const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024; + +/// Spills serialized page metadata to a temporary file to bound memory usage. +/// +/// The spill file is an unstructured sequence of "chunks". Each chunk is a +/// contiguous run of length-delimited protobuf `Page` messages belonging to a +/// single column. Chunks from different columns are interleaved in the order +/// they are flushed (i.e. whenever a column's in-memory buffer exceeds +/// `per_column_limit`). The `column_chunks` index records the (offset, length) +/// of every chunk so each column's pages can be read back and reassembled in +/// order. +struct PageMetadataSpill { + writer: ObjectWriter, + object_store: Arc, + path: Path, + /// Current write position in the spill file. + position: u64, + /// Per-column buffer of serialized (length-delimited protobuf) page metadata + /// that has not yet been flushed to the spill file. + column_buffers: Vec>, + /// Per-column list of chunks that have been flushed to the spill file. + /// Each entry is (offset, length) pointing into the spill file. + column_chunks: Vec>, + /// Maximum bytes to buffer per column before flushing to the spill file. + per_column_limit: usize, +} + +impl PageMetadataSpill { + async fn new(object_store: Arc, path: Path, num_columns: usize) -> Result { + let writer = object_store.create(&path).await?; + let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64); + Ok(Self { + writer, + object_store, + path, + position: 0, + column_buffers: vec![Vec::new(); num_columns], + column_chunks: vec![Vec::new(); num_columns], + per_column_limit, + }) + } + + async fn append_page( + &mut self, + column_idx: usize, + page: &pbfile::column_metadata::Page, + ) -> Result<()> { + page.encode_length_delimited(&mut self.column_buffers[column_idx]) + .map_err(|e| Error::IO { + source: Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + location: location!(), + })?; + if self.column_buffers[column_idx].len() >= self.per_column_limit { + self.flush_column(column_idx).await?; + } + Ok(()) + } + + async fn flush_column(&mut self, column_idx: usize) -> Result<()> { + let buf = &self.column_buffers[column_idx]; + if buf.is_empty() { + return Ok(()); + } + let len = buf.len(); + self.writer.write_all(buf).await?; + self.column_chunks[column_idx].push((self.position, len as u32)); + self.position += len as u64; + self.column_buffers[column_idx].clear(); + Ok(()) + } + + async fn shutdown_writer(&mut self) -> Result<()> { + for col_idx in 0..self.column_buffers.len() { + self.flush_column(col_idx).await?; + } + self.writer.shutdown().await?; + Ok(()) + } +} + +fn decode_spilled_chunk(data: &Bytes) -> Result> { + let mut pages = Vec::new(); + let mut cursor = data.clone(); + while cursor.has_remaining() { + let page = + pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| { + Error::IO { + source: Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + location: location!(), + } + })?; + pages.push(page); + } + Ok(pages) +} + +enum PageSpillState { + Pending(Arc, Path), + Active(PageMetadataSpill), +} + pub struct FileWriter { writer: ObjectWriter, schema: Option, @@ -111,6 +214,7 @@ pub struct FileWriter { global_buffers: Vec<(u64, u64)>, schema_metadata: HashMap, options: FileWriterOptions, + page_spill: Option, } fn initial_column_metadata() -> pbfile::ColumnMetadata { @@ -165,10 +269,23 @@ impl FileWriter { field_id_to_column_indices: Vec::new(), global_buffers: Vec::new(), schema_metadata: HashMap::new(), + page_spill: None, options, } } + /// Spill page metadata to a sidecar file instead of accumulating in memory. + /// + /// This can dramatically reduce memory usage when many writers are open + /// concurrently (e.g. IVF shuffle with thousands of partition writers). + /// The sidecar file is created lazily on the first page write. The caller + /// is responsible for cleaning up `path` (e.g. by placing it in a temp + /// directory that is removed via RAII). + pub fn with_page_metadata_spill(mut self, object_store: Arc, path: Path) -> Self { + self.page_spill = Some(PageSpillState::Pending(object_store, path)); + self + } + /// Write a series of record batches to a new file /// /// Returns the number of rows written @@ -223,9 +340,20 @@ impl FileWriter { length: encoded_page.num_rows, priority: encoded_page.row_number, }; - self.column_metadata[encoded_page.column_idx as usize] - .pages - .push(page); + let col_idx = encoded_page.column_idx as usize; + if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) { + let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else { + unreachable!() + }; + self.page_spill = Some(PageSpillState::Active( + PageMetadataSpill::new(store, path, self.num_columns as usize).await?, + )); + } + match &mut self.page_spill { + Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?, + None => self.column_metadata[col_idx].pages.push(page), + Some(PageSpillState::Pending(..)) => unreachable!(), + } Ok(()) } @@ -440,12 +568,41 @@ impl FileWriter { } async fn write_column_metadatas(&mut self) -> Result> { - let mut metadatas = Vec::new(); - std::mem::swap(&mut self.column_metadata, &mut metadatas); + let metadatas = std::mem::take(&mut self.column_metadata); + + // If spilling, finalize the spill writer and reopen for reading. + // The spill file itself is cleaned up by the caller (it lives in a + // temp directory managed by the caller's RAII guard). + let spill_state = self.page_spill.take(); + let (spill_chunks, spill_reader) = + if let Some(PageSpillState::Active(mut spill)) = spill_state { + spill.shutdown_writer().await?; + let reader = spill.object_store.open(&spill.path).await?; + let chunks = std::mem::take(&mut spill.column_chunks); + (chunks, Some(reader)) + } else { + (Vec::new(), None) + }; + let mut metadata_positions = Vec::with_capacity(metadatas.len()); - for metadata in metadatas { + for (col_idx, mut metadata) in metadatas.into_iter().enumerate() { + if let Some(reader) = &spill_reader { + let mut pages = Vec::new(); + for &(offset, len) in &spill_chunks[col_idx] { + let data = reader + .get_range(offset as usize..(offset as usize + len as usize)) + .await + .map_err(|e| Error::IO { + source: Box::new(e), + location: location!(), + })?; + pages.extend(decode_spilled_chunk(&data)?); + } + metadata.pages = pages; + } metadata_positions.push(self.write_column_metadata(metadata).await?); } + Ok(metadata_positions) } @@ -654,6 +811,7 @@ impl FileWriter { // 7. close the writer self.writer.shutdown().await?; + Ok(self.rows_written) } @@ -1472,4 +1630,84 @@ mod tests { // Verify first value matches what we wrote assert!(read_binary.value(0).iter().all(|&b| b == 42u8)); } + + fn spill_config() -> (TempObjFile, Arc) { + let spill_path = TempObjFile::default(); + (spill_path, Arc::new(ObjectStore::local())) + } + + fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec { + let fields: Vec<_> = (0..num_cols) + .map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false)) + .collect(); + let schema = Arc::new(ArrowSchema::new(fields)); + (0..num_batches) + .map(|i| { + let cols: Vec> = (0..num_cols) + .map(|c| { + let start = (i * rows_per_batch + c as i32) * 100; + Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch)) + as Arc + }) + .collect(); + RecordBatch::try_new(schema.clone(), cols).unwrap() + }) + .collect() + } + + async fn write_and_read_batches( + batches: &[RecordBatch], + spill: Option<(Arc, object_store::path::Path)>, + ) -> Vec { + let fs = FsFixture::default(); + let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap(); + let writer = fs.object_store.create(&fs.tmp_path).await.unwrap(); + let mut file_writer = + FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap(); + if let Some((store, path)) = spill { + file_writer = file_writer.with_page_metadata_spill(store, path); + } + for batch in batches { + file_writer.write_batch(batch).await.unwrap(); + } + file_writer.add_schema_metadata("foo", "bar"); + file_writer.finish().await.unwrap(); + + crate::testing::read_lance_file( + &fs, + Arc::::default(), + lance_encoding::decoder::FilterExpression::no_filter(), + ) + .await + } + + #[rstest::rstest] + #[case::multi_col(20, 2, 100)] + #[case::many_batches(50, 2, 100)] + #[tokio::test] + async fn test_page_metadata_spill_roundtrip( + #[case] num_batches: i32, + #[case] num_cols: usize, + #[case] rows_per_batch: i32, + ) { + let batches = make_batches(num_batches, num_cols, rows_per_batch); + let baseline = write_and_read_batches(&batches, None).await; + let (spill_path, spill_store) = spill_config(); + let spilled = + write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone()))) + .await; + assert_eq!(baseline, spilled); + } + + #[tokio::test] + async fn test_page_metadata_spill_many_columns() { + // Many columns forces small per-column buffer limits, exercising mid-write flushing. + let batches = make_batches(10, 500, 100); + let baseline = write_and_read_batches(&batches, None).await; + let (spill_path, spill_store) = spill_config(); + let spilled = + write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone()))) + .await; + assert_eq!(baseline, spilled); + } } diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index c1d74812b85..fc28a277b4a 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -110,15 +110,18 @@ impl Shuffler for IvfShuffler { let mut writers = stream::iter(0..num_partitions) .map(|partition_id| { let part_path = self.output_dir.child(format!("ivf_{}.lance", partition_id)); + let spill_path = self.output_dir.child(format!("ivf_{}.spill", partition_id)); let object_store = self.object_store.clone(); let schema = schema.clone(); async move { let writer = object_store.create(&part_path).await?; - FileWriter::try_new( + let file_writer = FileWriter::try_new( writer, lance_core::datatypes::Schema::try_from(&schema)?, Default::default(), - ) + )? + .with_page_metadata_spill(object_store.clone(), spill_path); + Result::Ok(file_writer) } }) .buffered(self.object_store.io_parallelism())