From 5b5569b9e756aff4f0eff761e88d6b8d080d84c6 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sat, 7 Feb 2026 08:38:38 -0800 Subject: [PATCH] perf: use 8KB buffer for local ObjectWriter We pick the buffer size for object writers according to caller configuration, or by defaulting to 5MB in order to guarantee a multipart write in object storage. For local storage, the 5MB buffer is not applicable and can be wasteful, if many writers are open simultaneously. We encounter that situation during the shuffle stage of an IVF-PQ index build. Change the object writer to use an 8KB buffer when the object store in use is local. --- rust/lance-io/src/object_writer.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index f2ad57f56f6..a9f494b4726 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -81,6 +81,8 @@ pub struct ObjectWriter { buffer: Vec, // TODO: use constant size to support R2 use_constant_size_upload_parts: bool, + /// Local filesystem writes don't need 5MB multipart buffers. + is_local: bool, } #[derive(Debug, Clone, Default)] @@ -157,20 +159,34 @@ impl UploadState { impl ObjectWriter { pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result { + let is_local = object_store.is_local(); + let initial_capacity = if is_local { + 8 * 1024 // 8 KB - no S3/GCS minimum part size for local filesystem + } else { + initial_upload_size() // 5 MB for cloud (S3/GCS minimum part size) + }; Ok(Self { state: UploadState::Started(object_store.inner.clone()), cursor: 0, path: Arc::new(path.clone()), connection_resets: 0, - buffer: Vec::with_capacity(initial_upload_size()), + buffer: Vec::with_capacity(initial_capacity), use_constant_size_upload_parts: object_store.use_constant_size_upload_parts, + is_local, }) } /// Returns the contents of `buffer` as a `Bytes` object and resets `buffer`. /// The new capacity of `buffer` is determined by the current part index. - fn next_part_buffer(buffer: &mut Vec, part_idx: u16, constant_upload_size: bool) -> Bytes { - let new_capacity = if constant_upload_size { + fn next_part_buffer( + buffer: &mut Vec, + part_idx: u16, + constant_upload_size: bool, + is_local: bool, + ) -> Bytes { + let new_capacity = if is_local { + 8 * 1024 // 8 KB for local filesystem + } else if constant_upload_size { // The store does not support variable part sizes, so use the initial size. initial_upload_size() } else { @@ -222,6 +238,7 @@ impl ObjectWriter { &mut mut_self.buffer, 0, mut_self.use_constant_size_upload_parts, + mut_self.is_local, ); futures.spawn(Self::put_part(upload.as_mut(), data, 0, None)); @@ -401,6 +418,7 @@ impl AsyncWrite for ObjectWriter { &mut mut_self.buffer, *part_idx, mut_self.use_constant_size_upload_parts, + mut_self.is_local, ); futures.spawn( Self::put_part(upload.as_mut(), data, *part_idx, None)