diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index f2ad57f56f6..a9f494b4726 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -81,6 +81,8 @@ pub struct ObjectWriter { buffer: Vec, // TODO: use constant size to support R2 use_constant_size_upload_parts: bool, + /// Local filesystem writes don't need 5MB multipart buffers. + is_local: bool, } #[derive(Debug, Clone, Default)] @@ -157,20 +159,34 @@ impl UploadState { impl ObjectWriter { pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result { + let is_local = object_store.is_local(); + let initial_capacity = if is_local { + 8 * 1024 // 8 KB - no S3/GCS minimum part size for local filesystem + } else { + initial_upload_size() // 5 MB for cloud (S3/GCS minimum part size) + }; Ok(Self { state: UploadState::Started(object_store.inner.clone()), cursor: 0, path: Arc::new(path.clone()), connection_resets: 0, - buffer: Vec::with_capacity(initial_upload_size()), + buffer: Vec::with_capacity(initial_capacity), use_constant_size_upload_parts: object_store.use_constant_size_upload_parts, + is_local, }) } /// Returns the contents of `buffer` as a `Bytes` object and resets `buffer`. /// The new capacity of `buffer` is determined by the current part index. - fn next_part_buffer(buffer: &mut Vec, part_idx: u16, constant_upload_size: bool) -> Bytes { - let new_capacity = if constant_upload_size { + fn next_part_buffer( + buffer: &mut Vec, + part_idx: u16, + constant_upload_size: bool, + is_local: bool, + ) -> Bytes { + let new_capacity = if is_local { + 8 * 1024 // 8 KB for local filesystem + } else if constant_upload_size { // The store does not support variable part sizes, so use the initial size. initial_upload_size() } else { @@ -222,6 +238,7 @@ impl ObjectWriter { &mut mut_self.buffer, 0, mut_self.use_constant_size_upload_parts, + mut_self.is_local, ); futures.spawn(Self::put_part(upload.as_mut(), data, 0, None)); @@ -401,6 +418,7 @@ impl AsyncWrite for ObjectWriter { &mut mut_self.buffer, *part_idx, mut_self.use_constant_size_upload_parts, + mut_self.is_local, ); futures.spawn( Self::put_part(upload.as_mut(), data, *part_idx, None)