Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions rust/lance-io/src/object_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct ObjectWriter {
buffer: Vec<u8>,
// TODO: use constant size to support R2
use_constant_size_upload_parts: bool,
/// Local filesystem writes don't need 5MB multipart buffers.
is_local: bool,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -157,20 +159,34 @@ impl UploadState {

impl ObjectWriter {
pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result<Self> {
let is_local = object_store.is_local();
let initial_capacity = if is_local {
8 * 1024 // 8 KB - no S3/GCS minimum part size for local filesystem
} else {
initial_upload_size() // 5 MB for cloud (S3/GCS minimum part size)
};
Ok(Self {
state: UploadState::Started(object_store.inner.clone()),
cursor: 0,
path: Arc::new(path.clone()),
connection_resets: 0,
buffer: Vec::with_capacity(initial_upload_size()),
buffer: Vec::with_capacity(initial_capacity),
use_constant_size_upload_parts: object_store.use_constant_size_upload_parts,
is_local,
})
}

/// Returns the contents of `buffer` as a `Bytes` object and resets `buffer`.
/// The new capacity of `buffer` is determined by the current part index.
fn next_part_buffer(buffer: &mut Vec<u8>, part_idx: u16, constant_upload_size: bool) -> Bytes {
let new_capacity = if constant_upload_size {
fn next_part_buffer(
buffer: &mut Vec<u8>,
part_idx: u16,
constant_upload_size: bool,
is_local: bool,
) -> Bytes {
let new_capacity = if is_local {
8 * 1024 // 8 KB for local filesystem
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to chop up large writes into tiny 8KiB writes? From a syscall perspective that maybe isn't the best approach. We should probably just send the entire buffer to the OS?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, actually this is way worse than I thought. It is going to do a simulated multipart write on the local FS.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prototyped a specialized local writer here that doesn't do the multipart simulation. I didn't see an improvement in write throughput, so left it aside, but feel free to play around with it. wjones127@7d7e30a

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works perfectly 👍

} else if constant_upload_size {
// The store does not support variable part sizes, so use the initial size.
initial_upload_size()
} else {
Expand Down Expand Up @@ -222,6 +238,7 @@ impl ObjectWriter {
&mut mut_self.buffer,
0,
mut_self.use_constant_size_upload_parts,
mut_self.is_local,
);
futures.spawn(Self::put_part(upload.as_mut(), data, 0, None));

Expand Down Expand Up @@ -401,6 +418,7 @@ impl AsyncWrite for ObjectWriter {
&mut mut_self.buffer,
*part_idx,
mut_self.use_constant_size_upload_parts,
mut_self.is_local,
);
futures.spawn(
Self::put_part(upload.as_mut(), data, *part_idx, None)
Expand Down
Loading