Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 245 additions & 7 deletions rust/lance-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use arrow_array::RecordBatch;

use arrow_data::ArrayData;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use lance_core::datatypes::{Field, Schema as LanceSchema};
Expand Down Expand Up @@ -100,6 +100,109 @@ pub struct FileWriterOptions {
pub format_version: Option<LanceFileVersion>,
}

// Total in-memory budget for buffering serialized page metadata before flushing
// to the spill file. Divided evenly across columns (with a floor of 64 bytes).
const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;

/// Spills serialized page metadata to a temporary file to bound memory usage.
///
/// The spill file is an unstructured sequence of "chunks". Each chunk is a
/// contiguous run of length-delimited protobuf `Page` messages belonging to a
/// single column. Chunks from different columns are interleaved in the order
/// they are flushed (i.e. whenever a column's in-memory buffer exceeds
/// `per_column_limit`). The `column_chunks` index records the (offset, length)
/// of every chunk so each column's pages can be read back and reassembled in
/// order.
struct PageMetadataSpill {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document the structure of the spill file at a high level? It looks like a series of column chunks where each chunk is a series of page messages for a single column?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comments, thanks

writer: ObjectWriter,
object_store: Arc<ObjectStore>,
path: Path,
/// Current write position in the spill file.
position: u64,
/// Per-column buffer of serialized (length-delimited protobuf) page metadata
/// that has not yet been flushed to the spill file.
column_buffers: Vec<Vec<u8>>,
/// Per-column list of chunks that have been flushed to the spill file.
/// Each entry is (offset, length) pointing into the spill file.
column_chunks: Vec<Vec<(u64, u32)>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document a little what these fields are holding?

/// Maximum bytes to buffer per column before flushing to the spill file.
per_column_limit: usize,
}

impl PageMetadataSpill {
async fn new(object_store: Arc<ObjectStore>, path: Path, num_columns: usize) -> Result<Self> {
let writer = object_store.create(&path).await?;
let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64);
Ok(Self {
writer,
object_store,
path,
position: 0,
column_buffers: vec![Vec::new(); num_columns],
column_chunks: vec![Vec::new(); num_columns],
per_column_limit,
})
}

async fn append_page(
&mut self,
column_idx: usize,
page: &pbfile::column_metadata::Page,
) -> Result<()> {
page.encode_length_delimited(&mut self.column_buffers[column_idx])
.map_err(|e| Error::IO {
source: Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
location: location!(),
})?;
if self.column_buffers[column_idx].len() >= self.per_column_limit {
self.flush_column(column_idx).await?;
}
Ok(())
}

async fn flush_column(&mut self, column_idx: usize) -> Result<()> {
let buf = &self.column_buffers[column_idx];
if buf.is_empty() {
return Ok(());
}
let len = buf.len();
self.writer.write_all(buf).await?;
self.column_chunks[column_idx].push((self.position, len as u32));
self.position += len as u64;
self.column_buffers[column_idx].clear();
Ok(())
}

async fn shutdown_writer(&mut self) -> Result<()> {
for col_idx in 0..self.column_buffers.len() {
self.flush_column(col_idx).await?;
}
self.writer.shutdown().await?;
Ok(())
}
}

fn decode_spilled_chunk(data: &Bytes) -> Result<Vec<pbfile::column_metadata::Page>> {
let mut pages = Vec::new();
let mut cursor = data.clone();
while cursor.has_remaining() {
let page =
pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| {
Error::IO {
source: Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
location: location!(),
}
})?;
pages.push(page);
}
Ok(pages)
}

enum PageSpillState {
Pending(Arc<ObjectStore>, Path),
Active(PageMetadataSpill),
}

pub struct FileWriter {
writer: ObjectWriter,
schema: Option<LanceSchema>,
Expand All @@ -111,6 +214,7 @@ pub struct FileWriter {
global_buffers: Vec<(u64, u64)>,
schema_metadata: HashMap<String, String>,
options: FileWriterOptions,
page_spill: Option<PageSpillState>,
}

fn initial_column_metadata() -> pbfile::ColumnMetadata {
Expand Down Expand Up @@ -165,10 +269,23 @@ impl FileWriter {
field_id_to_column_indices: Vec::new(),
global_buffers: Vec::new(),
schema_metadata: HashMap::new(),
page_spill: None,
options,
}
}

/// Spill page metadata to a sidecar file instead of accumulating in memory.
///
/// This can dramatically reduce memory usage when many writers are open
/// concurrently (e.g. IVF shuffle with thousands of partition writers).
/// The sidecar file is created lazily on the first page write. The caller
/// is responsible for cleaning up `path` (e.g. by placing it in a temp
/// directory that is removed via RAII).
pub fn with_page_metadata_spill(mut self, object_store: Arc<ObjectStore>, path: Path) -> Self {
self.page_spill = Some(PageSpillState::Pending(object_store, path));
self
}

/// Write a series of record batches to a new file
///
/// Returns the number of rows written
Expand Down Expand Up @@ -223,9 +340,20 @@ impl FileWriter {
length: encoded_page.num_rows,
priority: encoded_page.row_number,
};
self.column_metadata[encoded_page.column_idx as usize]
.pages
.push(page);
let col_idx = encoded_page.column_idx as usize;
if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) {
let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else {
unreachable!()
};
self.page_spill = Some(PageSpillState::Active(
PageMetadataSpill::new(store, path, self.num_columns as usize).await?,
));
}
match &mut self.page_spill {
Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?,
None => self.column_metadata[col_idx].pages.push(page),
Some(PageSpillState::Pending(..)) => unreachable!(),
}
Ok(())
}

Expand Down Expand Up @@ -440,12 +568,41 @@ impl FileWriter {
}

async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
let mut metadatas = Vec::new();
std::mem::swap(&mut self.column_metadata, &mut metadatas);
let metadatas = std::mem::take(&mut self.column_metadata);

// If spilling, finalize the spill writer and reopen for reading.
// The spill file itself is cleaned up by the caller (it lives in a
// temp directory managed by the caller's RAII guard).
let spill_state = self.page_spill.take();
let (spill_chunks, spill_reader) =
if let Some(PageSpillState::Active(mut spill)) = spill_state {
spill.shutdown_writer().await?;
let reader = spill.object_store.open(&spill.path).await?;
let chunks = std::mem::take(&mut spill.column_chunks);
(chunks, Some(reader))
} else {
(Vec::new(), None)
};

let mut metadata_positions = Vec::with_capacity(metadatas.len());
for metadata in metadatas {
for (col_idx, mut metadata) in metadatas.into_iter().enumerate() {
if let Some(reader) = &spill_reader {
let mut pages = Vec::new();
for &(offset, len) in &spill_chunks[col_idx] {
let data = reader
.get_range(offset as usize..(offset as usize + len as usize))
.await
.map_err(|e| Error::IO {
source: Box::new(e),
location: location!(),
})?;
pages.extend(decode_spilled_chunk(&data)?);
}
metadata.pages = pages;
}
metadata_positions.push(self.write_column_metadata(metadata).await?);
}

Ok(metadata_positions)
}

Expand Down Expand Up @@ -654,6 +811,7 @@ impl FileWriter {

// 7. close the writer
self.writer.shutdown().await?;

Ok(self.rows_written)
}

Expand Down Expand Up @@ -1472,4 +1630,84 @@ mod tests {
// Verify first value matches what we wrote
assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
}

fn spill_config() -> (TempObjFile, Arc<ObjectStore>) {
let spill_path = TempObjFile::default();
(spill_path, Arc::new(ObjectStore::local()))
}

fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec<RecordBatch> {
let fields: Vec<_> = (0..num_cols)
.map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false))
.collect();
let schema = Arc::new(ArrowSchema::new(fields));
(0..num_batches)
.map(|i| {
let cols: Vec<Arc<dyn arrow_array::Array>> = (0..num_cols)
.map(|c| {
let start = (i * rows_per_batch + c as i32) * 100;
Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch))
as Arc<dyn arrow_array::Array>
})
.collect();
RecordBatch::try_new(schema.clone(), cols).unwrap()
})
.collect()
}

async fn write_and_read_batches(
batches: &[RecordBatch],
spill: Option<(Arc<ObjectStore>, object_store::path::Path)>,
) -> Vec<RecordBatch> {
let fs = FsFixture::default();
let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap();
let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
let mut file_writer =
FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
if let Some((store, path)) = spill {
file_writer = file_writer.with_page_metadata_spill(store, path);
}
for batch in batches {
file_writer.write_batch(batch).await.unwrap();
}
file_writer.add_schema_metadata("foo", "bar");
file_writer.finish().await.unwrap();

crate::testing::read_lance_file(
&fs,
Arc::<DecoderPlugins>::default(),
lance_encoding::decoder::FilterExpression::no_filter(),
)
.await
}

#[rstest::rstest]
#[case::multi_col(20, 2, 100)]
#[case::many_batches(50, 2, 100)]
#[tokio::test]
async fn test_page_metadata_spill_roundtrip(
#[case] num_batches: i32,
#[case] num_cols: usize,
#[case] rows_per_batch: i32,
) {
let batches = make_batches(num_batches, num_cols, rows_per_batch);
let baseline = write_and_read_batches(&batches, None).await;
let (spill_path, spill_store) = spill_config();
let spilled =
write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
.await;
assert_eq!(baseline, spilled);
}

#[tokio::test]
async fn test_page_metadata_spill_many_columns() {
// Many columns forces small per-column buffer limits, exercising mid-write flushing.
let batches = make_batches(10, 500, 100);
let baseline = write_and_read_batches(&batches, None).await;
let (spill_path, spill_store) = spill_config();
let spilled =
write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
.await;
assert_eq!(baseline, spilled);
}
}
7 changes: 5 additions & 2 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,18 @@ impl Shuffler for IvfShuffler {
let mut writers = stream::iter(0..num_partitions)
.map(|partition_id| {
let part_path = self.output_dir.child(format!("ivf_{}.lance", partition_id));
let spill_path = self.output_dir.child(format!("ivf_{}.spill", partition_id));
let object_store = self.object_store.clone();
let schema = schema.clone();
async move {
let writer = object_store.create(&part_path).await?;
FileWriter::try_new(
let file_writer = FileWriter::try_new(
writer,
lance_core::datatypes::Schema::try_from(&schema)?,
Default::default(),
)
)?
.with_page_metadata_spill(object_store.clone(), spill_path);
Result::Ok(file_writer)
}
})
.buffered(self.object_store.io_parallelism())
Expand Down
Loading