From 9943de272774567c2b9c246fe151aec908b786ec Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Feb 2026 13:08:10 -0800 Subject: [PATCH 01/10] feat: create local writer for efficient local writes This creates a new LocalWriter that wraps tokio::fs::File in a BufWriter for local file writes. ObjectStore::create() now returns one of these when working against local storage, and an ObjectWriter for remote storage. Prior to this commit, local writes (e.g for shuffling) went through a local object writer implementation that required a 5MB buffer per writer and also simulated multipart upload machinery. For local writing, this is slower than necessary and uses a lot of memory in situations where many writers are open at once. This change results in a substantial memory reduction and incremental speedup for IVF shuffle. --- rust/lance-file/src/previous/page_table.rs | 2 +- rust/lance-file/src/previous/writer/mod.rs | 48 +++--- rust/lance-file/src/writer.rs | 19 +- rust/lance-io/Cargo.toml | 4 + rust/lance-io/benches/write.rs | 64 +++++++ rust/lance-io/src/encodings/binary.rs | 6 +- rust/lance-io/src/encodings/dictionary.rs | 2 +- rust/lance-io/src/encodings/plain.rs | 2 +- rust/lance-io/src/local.rs | 7 + rust/lance-io/src/object_store.rs | 29 +++- rust/lance-io/src/object_writer.rs | 162 ++++++++++++++++-- rust/lance-io/src/traits.rs | 17 ++ rust/lance-io/src/utils/tracking_store.rs | 21 +++ rust/lance-table/src/io/commit.rs | 36 +--- rust/lance-table/src/io/manifest.rs | 7 +- rust/lance/src/dataset/blob.rs | 8 +- .../lance/src/dataset/optimize/binary_copy.rs | 17 +- rust/lance/src/index/vector/ivf.rs | 31 ++-- rust/lance/src/index/vector/ivf/builder.rs | 3 +- 19 files changed, 356 insertions(+), 129 deletions(-) create mode 100644 rust/lance-io/benches/write.rs diff --git a/rust/lance-file/src/previous/page_table.rs b/rust/lance-file/src/previous/page_table.rs index 3089a400790..c0a77af1fb3 100644 --- a/rust/lance-file/src/previous/page_table.rs +++ b/rust/lance-file/src/previous/page_table.rs @@ -214,7 +214,7 @@ mod tests { .write(&mut writer, starting_field_id) .await .unwrap(); - writer.shutdown().await.unwrap(); + AsyncWriteExt::shutdown(&mut writer).await.unwrap(); let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await diff --git a/rust/lance-file/src/previous/writer/mod.rs b/rust/lance-file/src/previous/writer/mod.rs index 7006442e08c..aa9370d4a09 100644 --- a/rust/lance-file/src/previous/writer/mod.rs +++ b/rust/lance-file/src/previous/writer/mod.rs @@ -22,7 +22,6 @@ use lance_io::encodings::{ binary::BinaryEncoder, dictionary::DictionaryEncoder, plain::PlainEncoder, Encoder, }; use lance_io::object_store::ObjectStore; -use lance_io::object_writer::ObjectWriter; use lance_io::traits::{WriteExt, Writer}; use object_store::path::Path; use snafu::location; @@ -47,10 +46,8 @@ pub trait ManifestProvider { /// /// Note: the dictionaries have already been written by this point and the schema should /// be populated with the dictionary lengths/offsets - async fn store_schema( - object_writer: &mut ObjectWriter, - schema: &Schema, - ) -> Result>; + async fn store_schema(object_writer: &mut dyn Writer, schema: &Schema) + -> Result>; } /// Implementation of ManifestProvider that does not store the schema @@ -60,7 +57,7 @@ pub(crate) struct NotSelfDescribing {} #[cfg(test)] #[async_trait] impl ManifestProvider for NotSelfDescribing { - async fn store_schema(_: &mut ObjectWriter, _: &Schema) -> Result> { + async fn store_schema(_: &mut dyn Writer, _: &Schema) -> Result> { Ok(None) } } @@ -79,7 +76,7 @@ impl ManifestProvider for NotSelfDescribing { /// file_writer.shutdown(); /// ``` pub struct FileWriter { - pub object_writer: ObjectWriter, + pub object_writer: Box, schema: Schema, batch_id: i32, page_table: PageTable, @@ -109,7 +106,7 @@ impl FileWriter { } pub fn with_object_writer( - object_writer: ObjectWriter, + object_writer: Box, schema: Schema, options: &FileWriterOptions, ) -> Result { @@ -213,7 +210,7 @@ impl FileWriter { .collect::>>()?; Self::write_array( - &mut self.object_writer, + self.object_writer.as_mut(), field, &arrs, self.batch_id, @@ -253,7 +250,7 @@ impl FileWriter { pub async fn finish(&mut self) -> Result { self.write_footer().await?; - self.object_writer.shutdown().await?; + Writer::shutdown(self.object_writer.as_mut()).await?; let num_rows = self .metadata .batch_offsets @@ -284,7 +281,7 @@ impl FileWriter { #[async_recursion] async fn write_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&ArrayRef], batch_id: i32, @@ -385,7 +382,7 @@ impl FileWriter { } async fn write_null_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&dyn Array], batch_id: i32, @@ -399,7 +396,7 @@ impl FileWriter { /// Write fixed size array, including, primtiives, fixed size binary, and fixed size list. async fn write_fixed_stride_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&dyn Array], batch_id: i32, @@ -419,7 +416,7 @@ impl FileWriter { /// Write var-length binary arrays. async fn write_binary_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&dyn Array], batch_id: i32, @@ -435,7 +432,7 @@ impl FileWriter { } async fn write_dictionary_arr( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&dyn Array], key_type: &DataType, @@ -455,7 +452,7 @@ impl FileWriter { #[async_recursion] async fn write_struct_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrays: &[&StructArray], batch_id: i32, @@ -486,7 +483,7 @@ impl FileWriter { } async fn write_list_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&dyn Array], batch_id: i32, @@ -534,7 +531,7 @@ impl FileWriter { } async fn write_large_list_array( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, field: &Field, arrs: &[&dyn Array], batch_id: i32, @@ -597,7 +594,7 @@ impl FileWriter { let mut stats_page_table = PageTable::default(); for (i, field) in schema.fields.iter().enumerate() { Self::write_array( - &mut self.object_writer, + self.object_writer.as_mut(), field, &[stats_batch.column(i)], 0, // Only one batch for statistics. @@ -606,8 +603,9 @@ impl FileWriter { .await?; } - let page_table_position = - stats_page_table.write(&mut self.object_writer, 0).await?; + let page_table_position = stats_page_table + .write(self.object_writer.as_mut(), 0) + .await?; Ok(Some(StatisticsMetadata { schema, @@ -624,7 +622,7 @@ impl FileWriter { /// /// The offsets and lengths of the written buffers are stored in the given /// schema so that the dictionaries can be loaded in the future. - async fn write_dictionaries(writer: &mut ObjectWriter, schema: &mut Schema) -> Result<()> { + async fn write_dictionaries(writer: &mut dyn Writer, schema: &mut Schema) -> Result<()> { // Write dictionary values. let max_field_id = schema.max_field_id().unwrap_or(-1); for field_id in 0..max_field_id + 1 { @@ -680,7 +678,7 @@ impl FileWriter { let field_id_offset = *self.schema.field_ids().iter().min().unwrap(); let pos = self .page_table - .write(&mut self.object_writer, field_id_offset) + .write(self.object_writer.as_mut(), field_id_offset) .await?; self.metadata.page_table_position = pos; @@ -688,8 +686,8 @@ impl FileWriter { self.metadata.stats_metadata = self.write_statistics().await?; // Step 3. Write manifest and dictionary values. - Self::write_dictionaries(&mut self.object_writer, &mut self.schema).await?; - let pos = M::store_schema(&mut self.object_writer, &self.schema).await?; + Self::write_dictionaries(self.object_writer.as_mut(), &mut self.schema).await?; + let pos = M::store_schema(self.object_writer.as_mut(), &self.schema).await?; // Step 4. Write metadata. self.metadata.manifest_position = pos; diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index ea753f463f9..0f00ee45b6a 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -23,13 +23,13 @@ use lance_encoding::encoder::{ use lance_encoding::repdef::RepDefBuilder; use lance_encoding::version::LanceFileVersion; use lance_io::object_store::ObjectStore; -use lance_io::object_writer::ObjectWriter; use lance_io::traits::Writer; use log::{debug, warn}; use object_store::path::Path; use prost::Message; use prost_types::Any; use snafu::location; +use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tracing::instrument; @@ -114,7 +114,7 @@ const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024; /// of every chunk so each column's pages can be read back and reassembled in /// order. struct PageMetadataSpill { - writer: ObjectWriter, + writer: Box, object_store: Arc, path: Path, /// Current write position in the spill file. @@ -177,7 +177,7 @@ impl PageMetadataSpill { for col_idx in 0..self.column_buffers.len() { self.flush_column(col_idx).await?; } - self.writer.shutdown().await?; + Writer::shutdown(self.writer.as_mut()).await?; Ok(()) } } @@ -204,7 +204,7 @@ enum PageSpillState { } pub struct FileWriter { - writer: ObjectWriter, + writer: Box, schema: Option, column_writers: Vec>, column_metadata: Vec, @@ -231,7 +231,7 @@ static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false); impl FileWriter { /// Create a new FileWriter with a desired output schema pub fn try_new( - object_writer: ObjectWriter, + object_writer: Box, schema: LanceSchema, options: FileWriterOptions, ) -> Result { @@ -244,7 +244,7 @@ impl FileWriter { /// /// The output schema will be set based on the first batch of data to arrive. /// If no data arrives and the writer is finished then the write will fail. - pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self { + pub fn new_lazy(object_writer: Box, options: FileWriterOptions) -> Self { if let Some(format_version) = options.format_version { if format_version.is_unstable() && WARNED_ON_UNSTABLE_API @@ -304,7 +304,7 @@ impl FileWriter { Ok(writer.finish().await? as usize) } - async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> { + async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> { writer.write_all(buf).await?; let pad_bytes = pad_bytes::(buf.len()); writer.write_all(&PAD_BUFFER[..pad_bytes]).await?; @@ -810,13 +810,14 @@ impl FileWriter { self.writer.write_all(MAGIC).await?; // 7. close the writer - self.writer.shutdown().await?; + Writer::shutdown(self.writer.as_mut()).await?; Ok(self.rows_written) } pub async fn abort(&mut self) { - self.writer.abort().await; + // For multipart uploads, ObjectWriter's Drop impl will abort + // the upload when the writer is dropped. } pub async fn tell(&mut self) -> Result { diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index 953c9c39e6c..fa98982e1cc 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -62,6 +62,10 @@ pprof.workspace = true name = "scheduler" harness = false +[[bench]] +name = "write" +harness = false + [features] default = ["aws", "azure", "gcp"] gcs-test = [] diff --git a/rust/lance-io/benches/write.rs b/rust/lance-io/benches/write.rs new file mode 100644 index 00000000000..4a0f631bd1d --- /dev/null +++ b/rust/lance-io/benches/write.rs @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors +#![allow(clippy::print_stdout)] + +use lance_io::object_store::ObjectStore; +use object_store::path::Path; +use rand::RngCore; +use std::fs::File; +use tokio::{io::AsyncWriteExt, runtime::Runtime}; + +use criterion::{criterion_group, criterion_main, Criterion}; + +fn generate_data(num_bytes: u64) -> Vec { + let mut data = vec![0; num_bytes as usize]; + rand::rng().fill_bytes(&mut data); + data +} + +fn write_basic(data: &[u8], path: &std::path::Path) { + let mut f = File::create(path).unwrap(); + let writer = std::io::BufWriter::new(&mut f); + std::io::Write::write_all(&mut writer.into_inner().unwrap(), data).unwrap(); +} + +async fn write_lance(data: &[u8], obj_store: &ObjectStore, path: &Path) { + let mut writer = obj_store.create(path).await.unwrap(); + writer.write_all(data).await.unwrap(); +} + +const DATA_SIZE: u64 = 128 * 1024 * 1024; + +fn bench_basic_write(c: &mut Criterion) { + let mut group = c.benchmark_group("write"); + + group.throughput(criterion::Throughput::Bytes(DATA_SIZE)); + + let runtime = Runtime::new().unwrap(); + let temp_path = std::env::temp_dir().join("lance_io_bench_write"); + std::fs::create_dir_all(&temp_path).unwrap(); + + let data = generate_data(DATA_SIZE); + + group.bench_function("basic_write", |b| { + let path = temp_path.join("basic_write.file"); + b.iter(|| { + write_basic(&data, &path); + }); + }); + + let obj_store = ObjectStore::local(); + group.bench_function("lance_write", |b| { + let path = Path::from_absolute_path(temp_path.join("lance_write.file")).unwrap(); + b.iter(|| { + runtime.block_on(write_lance(&data, &obj_store, &path)); + }); + }); +} + +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = bench_basic_write); + +criterion_main!(benches); diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs index 1d2c23030fe..e4827d1b63b 100644 --- a/rust/lance-io/src/encodings/binary.rs +++ b/rust/lance-io/src/encodings/binary.rs @@ -488,7 +488,7 @@ mod tests { let arrs = arr.iter().map(|a| a as &dyn Array).collect::>(); let pos = encoder.encode(arrs.as_slice()).await.unwrap(); - writer.shutdown().await.unwrap(); + AsyncWriteExt::shutdown(&mut writer).await.unwrap(); Ok(pos) } @@ -562,7 +562,7 @@ mod tests { object_writer.write_all(b"1234").await.unwrap(); let mut encoder = BinaryEncoder::new(&mut object_writer); let pos = encoder.encode(&[&data]).await.unwrap(); - object_writer.shutdown().await.unwrap(); + AsyncWriteExt::shutdown(&mut object_writer).await.unwrap(); let reader = LocalObjectReader::open_local_path(&path, 1024, None) .await @@ -731,7 +731,7 @@ mod tests { // let arrs = arr.iter().map(|a| a as &dyn Array).collect::>(); let pos = encoder.encode(&[&data]).await.unwrap(); - object_writer.shutdown().await.unwrap(); + AsyncWriteExt::shutdown(&mut object_writer).await.unwrap(); pos }; diff --git a/rust/lance-io/src/encodings/dictionary.rs b/rust/lance-io/src/encodings/dictionary.rs index ecc2bce1aec..2352939da27 100644 --- a/rust/lance-io/src/encodings/dictionary.rs +++ b/rust/lance-io/src/encodings/dictionary.rs @@ -243,7 +243,7 @@ mod tests { let mut object_writer = tokio::fs::File::create(&path).await.unwrap(); let mut encoder = PlainEncoder::new(&mut object_writer, arr1.keys().data_type()); pos = encoder.encode(arrs.as_slice()).await.unwrap(); - object_writer.shutdown().await.unwrap(); + AsyncWriteExt::shutdown(&mut object_writer).await.unwrap(); } let reader = LocalObjectReader::open_local_path(&path, 2048, None) diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs index 9966d8b33ec..b1d2ac225f6 100644 --- a/rust/lance-io/src/encodings/plain.rs +++ b/rust/lance-io/src/encodings/plain.rs @@ -756,7 +756,7 @@ mod tests { let mut writer = tokio::fs::File::create(&path).await.unwrap(); let mut encoder = PlainEncoder::new(&mut writer, array.data_type()); assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0); - writer.shutdown().await.unwrap(); + AsyncWriteExt::shutdown(&mut writer).await.unwrap(); } let reader = LocalObjectReader::open_local_path(&path, 2048, None) diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index a08f81460e2..cb71d0ebeab 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -26,6 +26,7 @@ use tokio::sync::OnceCell; use tracing::instrument; use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM; +use crate::object_writer::WriteResult; use crate::traits::{Reader, Writer}; use crate::utils::tracking_store::IOTracker; @@ -284,4 +285,10 @@ impl Writer for tokio::fs::File { async fn tell(&mut self) -> Result { Ok(self.seek(SeekFrom::Current(0)).await? as usize) } + + async fn shutdown(&mut self) -> Result { + let size = self.seek(SeekFrom::Current(0)).await? as usize; + tokio::io::AsyncWriteExt::shutdown(self).await?; + Ok(WriteResult { size, e_tag: None }) + } } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 2e0921542b2..05c3cfc85c8 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -37,7 +37,8 @@ pub mod providers; pub mod storage_options; mod tracing; use crate::object_reader::SmallReader; -use crate::object_writer::WriteResult; +use crate::object_writer::{LocalWriter, WriteResult}; +use crate::traits::Writer; use crate::utils::tracking_store::{IOTracker, IoStats}; use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader}; use lance_core::{Error, Result}; @@ -633,7 +634,7 @@ impl ObjectStore { let object_store = Self::local(); let absolute_path = expand_path(path.to_string_lossy())?; let os_path = Path::from_absolute_path(absolute_path)?; - object_store.create(&os_path).await + ObjectWriter::new(&object_store, &os_path).await } /// Open an [Reader] from local [std::path::Path] @@ -645,15 +646,29 @@ impl ObjectStore { } /// Create a new file. - pub async fn create(&self, path: &Path) -> Result { - ObjectWriter::new(self, path).await + pub async fn create(&self, path: &Path) -> Result> { + match self.scheme.as_str() { + "file" => { + let local_path = super::local::to_local_path(path); + if let Some(parent) = std::path::Path::new(&local_path).parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&local_path).await?; + Ok(Box::new(LocalWriter::new( + file, + path.clone(), + Arc::new(self.io_tracker.clone()), + ))) + } + _ => Ok(Box::new(ObjectWriter::new(self, path).await?)), + } } /// A helper function to create a file and write content to it. pub async fn put(&self, path: &Path, content: &[u8]) -> Result { let mut writer = self.create(path).await?; writer.write_all(content).await?; - writer.shutdown().await + Writer::shutdown(writer.as_mut()).await } pub async fn delete(&self, path: &Path) -> Result<()> { @@ -1206,7 +1221,7 @@ mod tests { let file_path = TempStdFile::default(); let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap(); writer.write_all(b"LOCAL").await.unwrap(); - writer.shutdown().await.unwrap(); + Writer::shutdown(&mut writer).await.unwrap(); let reader = ObjectStore::open_local(&file_path).await.unwrap(); let buf = reader.get_range(0..5).await.unwrap(); @@ -1218,7 +1233,7 @@ mod tests { let file_path = TempStdFile::default(); let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap(); writer.write_all(b"LOCAL").await.unwrap(); - writer.shutdown().await.unwrap(); + Writer::shutdown(&mut writer).await.unwrap(); let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap(); let obj_store = ObjectStore::local(); diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index f2ad57f56f6..70deca0263d 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -21,6 +21,7 @@ use lance_core::{Error, Result}; use tracing::Instrument; use crate::traits::Writer; +use crate::utils::tracking_store::IOTracker; use snafu::location; use tokio::runtime::Handle; @@ -298,21 +299,6 @@ impl ObjectWriter { Ok(()) } - pub async fn shutdown(&mut self) -> Result { - AsyncWriteExt::shutdown(self).await.map_err(|e| { - Error::io( - format!("failed to shutdown object writer for {}: {}", self.path, e), - // and wrap it in here. - location!(), - ) - })?; - if let UploadState::Done(result) = &self.state { - Ok(result.clone()) - } else { - unreachable!() - } - } - pub async fn abort(&mut self) { let state = std::mem::replace(&mut self.state, UploadState::Done(WriteResult::default())); if let UploadState::InProgress { mut upload, .. } = state { @@ -498,6 +484,125 @@ impl Writer for ObjectWriter { async fn tell(&mut self) -> Result { Ok(self.cursor) } + + async fn shutdown(&mut self) -> Result { + AsyncWriteExt::shutdown(self).await.map_err(|e| { + Error::io( + format!("failed to shutdown object writer for {}: {}", self.path, e), + location!(), + ) + })?; + if let UploadState::Done(result) = &self.state { + Ok(result.clone()) + } else { + unreachable!() + } + } +} + +pub struct LocalWriter { + inner: tokio::io::BufWriter, + cursor: usize, + path: Path, + io_tracker: Arc, +} + +impl LocalWriter { + pub fn new(file: tokio::fs::File, path: Path, io_tracker: Arc) -> Self { + Self { + inner: tokio::io::BufWriter::new(file), + cursor: 0, + path, + io_tracker, + } + } +} + +impl AsyncWrite for LocalWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let poll = Pin::new(&mut self.inner).poll_write(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.cursor += *n; + } + poll + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +#[async_trait] +impl Writer for LocalWriter { + async fn tell(&mut self) -> Result { + Ok(self.cursor) + } + + async fn shutdown(&mut self) -> Result { + AsyncWriteExt::shutdown(self).await.map_err(|e| { + Error::io( + format!("failed to shutdown local writer for {}: {}", self.path, e), + location!(), + ) + })?; + + let local_path = crate::local::to_local_path(&self.path); + let metadata = std::fs::metadata(&local_path).map_err(|e| { + Error::io( + format!("failed to read metadata for {}: {}", self.path, e), + location!(), + ) + })?; + let e_tag = get_etag(&metadata); + + self.io_tracker + .record_write("put", self.path.clone(), self.cursor as u64); + + Ok(WriteResult { + size: self.cursor, + e_tag: Some(e_tag), + }) + } +} + +// Based on object store's implementation. +pub fn get_etag(metadata: &std::fs::Metadata) -> String { + let inode = get_inode(metadata); + let size = metadata.len(); + let mtime = metadata + .modified() + .ok() + .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok()) + .unwrap_or_default() + .as_micros(); + + // Use an ETag scheme based on that used by many popular HTTP servers + // + format!("{inode:x}-{mtime:x}-{size:x}") +} + +#[cfg(unix)] +fn get_inode(metadata: &std::fs::Metadata) -> u64 { + std::os::unix::fs::MetadataExt::ino(metadata) +} + +#[cfg(not(unix))] +fn get_inode(_metadata: &std::fs::Metadata) -> u64 { + 0 } #[cfg(test)] @@ -525,7 +630,7 @@ mod tests { assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256); assert_eq!(object_writer.tell().await.unwrap(), 256 * 3); - let res = object_writer.shutdown().await.unwrap(); + let res = Writer::shutdown(&mut object_writer).await.unwrap(); assert_eq!(res.size, 256 * 3); // Trigger multi part upload @@ -540,7 +645,7 @@ mod tests { // Check the cursor assert_eq!(object_writer.tell().await.unwrap(), (i + 1) * buf.len()); } - let res = object_writer.shutdown().await.unwrap(); + let res = Writer::shutdown(&mut object_writer).await.unwrap(); assert_eq!(res.size, buf.len() * 5); } @@ -553,4 +658,27 @@ mod tests { .unwrap(); object_writer.abort().await; } + + #[tokio::test] + async fn test_local_writer_shutdown() { + let tmp = lance_core::utils::tempfile::TempStdDir::default(); + let file_path = tmp.join("test_local_writer.bin"); + let os_path = Path::from_absolute_path(&file_path).unwrap(); + let io_tracker = Arc::new(IOTracker::default()); + + let file = tokio::fs::File::create(&file_path).await.unwrap(); + let mut writer = LocalWriter::new(file, os_path, io_tracker.clone()); + + let data = b"hello local writer"; + writer.write_all(data).await.unwrap(); + + let result = Writer::shutdown(&mut writer).await.unwrap(); + assert_eq!(result.size, data.len()); + assert!(result.e_tag.is_some()); + assert!(!result.e_tag.as_ref().unwrap().is_empty()); + + let stats = io_tracker.stats(); + assert_eq!(stats.write_iops, 1); + assert_eq!(stats.written_bytes, data.len() as u64); + } } diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 4a6631e6ba8..9ad8d86c00c 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -13,6 +13,8 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; use lance_core::Result; +use crate::object_writer::WriteResult; + pub trait ProtoStruct { type Proto: Message; } @@ -22,6 +24,21 @@ pub trait ProtoStruct { pub trait Writer: AsyncWrite + Unpin + Send { /// Tell the current offset. async fn tell(&mut self) -> Result; + + /// Flush all buffered data and finalize the write, returning metadata about + /// the written object. + async fn shutdown(&mut self) -> Result; +} + +#[async_trait] +impl Writer for Box { + async fn tell(&mut self) -> Result { + self.as_mut().tell().await + } + + async fn shutdown(&mut self) -> Result { + self.as_mut().shutdown().await + } } /// Lance Write Extension. diff --git a/rust/lance-io/src/utils/tracking_store.rs b/rust/lance-io/src/utils/tracking_store.rs index f1afb77990b..dd8474b5683 100644 --- a/rust/lance-io/src/utils/tracking_store.rs +++ b/rust/lance-io/src/utils/tracking_store.rs @@ -65,6 +65,27 @@ impl IOTracker { range, }); } + + /// Record a write operation for tracking. + /// + /// This is used by writers that bypass the ObjectStore layer (like LocalWriter) + /// to ensure their IO operations are still tracked. + pub fn record_write( + &self, + #[allow(unused_variables)] method: &'static str, + #[allow(unused_variables)] path: Path, + num_bytes: u64, + ) { + let mut stats = self.0.lock().unwrap(); + stats.write_iops += 1; + stats.written_bytes += num_bytes; + #[cfg(feature = "test-util")] + stats.requests.push(IoRequestRecord { + method, + path, + range: None, + }); + } } impl WrappingObjectStore for IOTracker { diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index bf18cce4907..3d5dfb34f07 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -37,7 +37,7 @@ use futures::{ StreamExt, TryStreamExt, }; use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION}; -use lance_io::object_writer::{ObjectWriter, WriteResult}; +use lance_io::object_writer::{get_etag, ObjectWriter, WriteResult}; use log::warn; use object_store::PutOptions; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; @@ -51,7 +51,7 @@ pub mod external_manifest; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams}; -use lance_io::traits::WriteExt; +use lance_io::traits::{WriteExt, Writer}; use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction}; use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT}; @@ -204,7 +204,7 @@ pub fn write_manifest_file_to_path<'a>( object_writer .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC) .await?; - let res = object_writer.shutdown().await?; + let res = Writer::shutdown(&mut object_writer).await?; info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string()); Ok(res) }) @@ -426,36 +426,6 @@ fn current_manifest_local(base: &Path) -> std::io::Result String { - let inode = get_inode(metadata); - let size = metadata.len(); - let mtime = metadata - .modified() - .ok() - .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok()) - .unwrap_or_default() - .as_micros(); - - // Use an ETag scheme based on that used by many popular HTTP servers - // - // - format!("{inode:x}-{mtime:x}-{size:x}") -} - -#[cfg(unix)] -/// We include the inode when available to yield an ETag more resistant to collisions -/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag) -fn get_inode(metadata: &std::fs::Metadata) -> u64 { - std::os::unix::fs::MetadataExt::ino(metadata) -} - -#[cfg(not(unix))] -/// On platforms where an inode isn't available, fallback to just relying on size and mtime -fn get_inode(_metadata: &std::fs::Metadata) -> u64 { - 0 -} - fn list_manifests<'a>( base_path: &Path, object_store: &'a dyn OSObjectStore, diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 2f938d19ece..12e5cc0a09c 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -19,7 +19,6 @@ use lance_core::{datatypes::Schema, Error, Result}; use lance_io::{ encodings::{binary::BinaryEncoder, plain::PlainEncoder, Encoder}, object_store::ObjectStore, - object_writer::ObjectWriter, traits::{WriteExt, Writer}, utils::read_message, }; @@ -233,7 +232,7 @@ pub struct ManifestDescribing {} #[async_trait] impl PreviousManifestProvider for ManifestDescribing { async fn store_schema( - object_writer: &mut ObjectWriter, + object_writer: &mut dyn Writer, schema: &Schema, ) -> Result> { let mut manifest = Manifest::new( @@ -295,14 +294,14 @@ mod test { DataStorageFormat::default(), HashMap::new(), ); - let pos = write_manifest(&mut writer, &mut manifest, None, None) + let pos = write_manifest(writer.as_mut(), &mut manifest, None, None) .await .unwrap(); writer .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC) .await .unwrap(); - writer.shutdown().await.unwrap(); + Writer::shutdown(writer.as_mut()).await.unwrap(); let roundtripped_manifest = read_manifest(&store, &path, None).await.unwrap(); diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 1ea9f7c4189..b50247925f1 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -22,7 +22,7 @@ use arrow_array::StructArray; use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::utils::blob::blob_path; use lance_core::{utils::address::RowAddress, Error, Result}; -use lance_io::traits::Reader; +use lance_io::traits::{Reader, Writer}; const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff @@ -40,7 +40,7 @@ struct PackWriter { data_file_key: String, max_pack_size: usize, current_blob_id: Option, - writer: Option, + writer: Option>, current_size: usize, } @@ -102,7 +102,7 @@ impl PackWriter { async fn finish(&mut self) -> Result<()> { if let Some(mut writer) = self.writer.take() { - writer.shutdown().await?; + Writer::shutdown(writer.as_mut()).await?; } self.current_blob_id = None; self.current_size = 0; @@ -172,7 +172,7 @@ impl BlobPreprocessor { let path = blob_path(&self.data_dir, &self.data_file_key, blob_id); let mut writer = self.object_store.create(&path).await?; writer.write_all(data).await?; - writer.shutdown().await?; + Writer::shutdown(&mut writer).await?; Ok(path) } diff --git a/rust/lance/src/dataset/optimize/binary_copy.rs b/rust/lance/src/dataset/optimize/binary_copy.rs index 2a51e8aca9b..3a350aede82 100644 --- a/rust/lance/src/dataset/optimize/binary_copy.rs +++ b/rust/lance/src/dataset/optimize/binary_copy.rs @@ -14,7 +14,6 @@ use lance_encoding::version::LanceFileVersion; use lance_file::format::pbfile; use lance_file::reader::FileReader as LFReader; use lance_file::writer::{FileWriter, FileWriterOptions}; -use lance_io::object_writer::ObjectWriter; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_io::traits::Writer; use lance_table::format::{DataFile, Fragment}; @@ -34,7 +33,7 @@ const ALIGN: usize = 64; /// /// Returns the new position after padding (if any). async fn apply_alignment_padding( - writer: &mut ObjectWriter, + writer: &mut dyn Writer, current_pos: u64, version: LanceFileVersion, ) -> Result { @@ -53,7 +52,7 @@ async fn apply_alignment_padding( async fn init_writer_if_necessary( dataset: &Dataset, - current_writer: &mut Option, + current_writer: &mut Option>, current_filename: &mut Option, ) -> Result { if current_writer.is_none() { @@ -102,7 +101,7 @@ fn compute_field_column_indices( async fn finalize_current_output_file( schema: &Schema, full_field_ids: &[i32], - current_writer: &mut Option, + current_writer: &mut Option>, current_filename: &mut Option, current_page_table: &[ColumnInfo], col_pages: &mut [Vec], @@ -232,7 +231,7 @@ pub async fn rewrite_files_binary_copy( } let mut out: Vec = Vec::new(); - let mut current_writer: Option = None; + let mut current_writer: Option> = None; let mut current_filename: Option = None; let mut current_pos: u64 = 0; let mut current_page_table: Vec = Vec::new(); @@ -367,7 +366,7 @@ pub async fn rewrite_files_binary_copy( let mut new_offsets = Vec::with_capacity(*buffer_count); for _ in 0..*buffer_count { if let Some(bytes) = bytes_iter.next() { - let writer = current_writer.as_mut().unwrap(); + let writer = current_writer.as_mut().unwrap().as_mut(); current_pos = apply_alignment_padding(writer, current_pos, version).await?; let start = current_pos; @@ -419,7 +418,7 @@ pub async fn rewrite_files_binary_copy( .collect(); let bytes_vec = file_scheduler.submit_request(ranges, 0).await?; for bytes in bytes_vec.into_iter() { - let writer = current_writer.as_mut().unwrap(); + let writer = current_writer.as_mut().unwrap().as_mut(); current_pos = apply_alignment_padding(writer, current_pos, version).await?; let start = current_pos; writer.write_all(&bytes).await?; @@ -501,14 +500,14 @@ pub async fn rewrite_files_binary_copy( /// - v2_0 structural single‑page enforcement is handled when building `final_cols`; this function /// only performs consistent finalization. async fn flush_footer( - mut writer: ObjectWriter, + mut writer: Box, schema: &Schema, final_cols: &[Arc], total_rows_in_current: u64, version: LanceFileVersion, ) -> Result<()> { let pos = writer.tell().await? as u64; - let _new_pos = apply_alignment_padding(&mut writer, pos, version).await?; + let _new_pos = apply_alignment_padding(writer.as_mut(), pos, version).await?; let mut col_metadatas = Vec::with_capacity(final_cols.len()); for col in final_cols { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 7d6280c468c..84b3fec1173 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -82,7 +82,6 @@ use lance_io::{ encodings::plain::PlainEncoder, local::to_local_path, object_store::ObjectStore, - object_writer::ObjectWriter, stream::RecordBatchStream, traits::{Reader, WriteExt, Writer}, }; @@ -575,7 +574,7 @@ async fn optimize_ivf_pq_indices( unindexed: Option, existing_indices: &[Arc], options: &OptimizeOptions, - mut writer: ObjectWriter, + mut writer: Box, dataset_version: u64, ) -> Result { let metric_type = first_idx.metric_type; @@ -622,7 +621,13 @@ async fn optimize_ivf_pq_indices( }) }) .collect::>>()?; - write_pq_partitions(&mut writer, &mut ivf_mut, shuffled, Some(&indices_to_merge)).await?; + write_pq_partitions( + writer.as_mut(), + &mut ivf_mut, + shuffled, + Some(&indices_to_merge), + ) + .await?; let metadata = IvfPQIndexMetadata { name: format!("_{}_idx", vector_column), column: vector_column.to_string(), @@ -639,7 +644,7 @@ async fn optimize_ivf_pq_indices( // TODO: for now the IVF_PQ index file format hasn't been updated, so keep the old version, // change it to latest version value after refactoring the IVF_PQ writer.write_magics(pos, 0, 1, MAGIC).await?; - writer.shutdown().await?; + Writer::shutdown(writer.as_mut()).await?; Ok(existing_indices.len() - start_pos) } @@ -653,8 +658,8 @@ async fn optimize_ivf_hnsw_indices( unindexed: Option, existing_indices: &[Arc], options: &OptimizeOptions, - writer: ObjectWriter, - aux_writer: ObjectWriter, + writer: Box, + aux_writer: Box, ) -> Result { let distance_type = first_idx.metric_type; let quantizer = hnsw_index.quantizer().clone(); @@ -1468,7 +1473,7 @@ impl RemapPageTask { Ok(self) } - async fn write(self, writer: &mut ObjectWriter, ivf: &mut IvfModel) -> Result<()> { + async fn write(self, writer: &mut dyn Writer, ivf: &mut IvfModel) -> Result<()> { let page = self.page.as_ref().expect("Load was not called"); let page: &PQIndex = page .as_any() @@ -1616,7 +1621,7 @@ pub(crate) async fn remap_index_file( loss: index.ivf.loss, }; while let Some(write_task) = task_stream.try_next().await? { - write_task.write(&mut writer, &mut ivf).await?; + write_task.write(writer.as_mut(), &mut ivf).await?; } let pq_sub_index = index @@ -1644,7 +1649,7 @@ pub(crate) async fn remap_index_file( // TODO: for now the IVF_PQ index file format hasn't been updated, so keep the old version, // change it to latest version value after refactoring the IVF_PQ writer.write_magics(pos, 0, 1, MAGIC).await?; - writer.shutdown().await?; + Writer::shutdown(writer.as_mut()).await?; Ok(()) } @@ -1674,7 +1679,7 @@ async fn write_ivf_pq_file( let start = std::time::Instant::now(); let num_partitions = ivf.num_partitions() as u32; builder::build_partitions( - &mut writer, + writer.as_mut(), stream, column, &mut ivf, @@ -1705,7 +1710,7 @@ async fn write_ivf_pq_file( // TODO: for now the IVF_PQ index file format hasn't been updated, so keep the old version, // change it to latest version value after refactoring the IVF_PQ writer.write_magics(pos, 0, 1, MAGIC).await?; - writer.shutdown().await?; + Writer::shutdown(writer.as_mut()).await?; Ok(()) } @@ -1725,7 +1730,7 @@ pub async fn write_ivf_pq_file_from_existing_index( .child(index_id.to_string()) .child("index.idx"); let mut writer = obj_store.create(&path).await?; - write_pq_partitions(&mut writer, &mut ivf, Some(streams), None).await?; + write_pq_partitions(writer.as_mut(), &mut ivf, Some(streams), None).await?; let metadata = IvfPQIndexMetadata::new( index_name.to_string(), @@ -1740,7 +1745,7 @@ pub async fn write_ivf_pq_file_from_existing_index( let metadata = pb::Index::try_from(&metadata)?; let pos = writer.write_protobuf(&metadata).await?; writer.write_magics(pos, 0, 1, MAGIC).await?; - writer.shutdown().await?; + Writer::shutdown(writer.as_mut()).await?; Ok(()) } diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 42cd5569a77..f19f5bfe48d 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -19,7 +19,6 @@ use lance_index::vector::pq::ProductQuantizer; use lance_index::vector::quantizer::Quantizer; use lance_index::vector::PART_ID_COLUMN; use lance_index::vector::{ivf::storage::IvfModel, transform::Transformer}; -use lance_io::object_writer::ObjectWriter; use lance_io::stream::RecordBatchStreamAdapter; use lance_table::io::manifest::ManifestDescribing; use log::info; @@ -201,7 +200,7 @@ pub async fn write_vector_storage( pq: ProductQuantizer, distance_type: DistanceType, column: &str, - writer: ObjectWriter, + writer: Box, precomputed_partitions_ds_uri: Option<&str>, ) -> Result<()> { info!("Transforming {} vectors for storage", num_rows); From a692a0c7c6479839ea1cbbd8abc286801ff3952d Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Thu, 12 Feb 2026 08:36:22 -0800 Subject: [PATCH 02/10] Use file-object-store for tests --- rust/lance/src/dataset/cleanup.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 1c4d0c90cca..0f4ef9ab52b 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1131,7 +1131,9 @@ mod tests { fn try_new() -> Result { let tmpdir = TempStrDir::default(); let tmpdir_path = tmpdir.as_str(); - let dataset_path = format!("{}/my_db", tmpdir_path); + // Use file-object-store:// scheme so that writes go through the ObjectStore + // wrapper chain (MockObjectStore) instead of the optimized local writer path. + let dataset_path = format!("file-object-store://{}/my_db", tmpdir_path); Ok(Self { _tmpdir: tmpdir, dataset_path, From 3948da91f1f76f7f04c2837422798963d2a49288 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 15:54:47 +0000 Subject: [PATCH 03/10] use atomic rename for local file writes --- Cargo.lock | 1 + python/Cargo.lock | 1 + rust/lance-io/Cargo.toml | 1 + rust/lance-io/src/object_store.rs | 9 +++- rust/lance-io/src/object_writer.rs | 68 +++++++++++++++++++++++++++--- 5 files changed, 73 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfcc4899c96..6742f06bc1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5161,6 +5161,7 @@ dependencies = [ "serde", "shellexpand", "snafu", + "tempfile", "test-log", "tokio", "tracing", diff --git a/python/Cargo.lock b/python/Cargo.lock index 090400fd4b3..65da20f6840 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4259,6 +4259,7 @@ dependencies = [ "serde", "shellexpand", "snafu", + "tempfile", "tokio", "tracing", "url", diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index fa98982e1cc..e1a3a7bfda3 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -47,6 +47,7 @@ tracing.workspace = true url.workspace = true path_abs.workspace = true rand.workspace = true +tempfile.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 05c3cfc85c8..aa8447a8993 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -650,13 +650,18 @@ impl ObjectStore { match self.scheme.as_str() { "file" => { let local_path = super::local::to_local_path(path); - if let Some(parent) = std::path::Path::new(&local_path).parent() { + let local_path = std::path::PathBuf::from(&local_path); + if let Some(parent) = local_path.parent() { tokio::fs::create_dir_all(parent).await?; } - let file = tokio::fs::File::create(&local_path).await?; + let parent = local_path.parent().expect("file path must have parent"); + let named_temp = tempfile::NamedTempFile::new_in(parent)?; + let (std_file, temp_path) = named_temp.into_parts(); + let file = tokio::fs::File::from_std(std_file); Ok(Box::new(LocalWriter::new( file, path.clone(), + temp_path, Arc::new(self.io_tracker.clone()), ))) } diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 70deca0263d..c6bc44ecb0d 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -504,15 +504,23 @@ pub struct LocalWriter { inner: tokio::io::BufWriter, cursor: usize, path: Path, + /// Temp path that auto-deletes on drop. Set to `None` after `persist()`. + temp_path: Option, io_tracker: Arc, } impl LocalWriter { - pub fn new(file: tokio::fs::File, path: Path, io_tracker: Arc) -> Self { + pub fn new( + file: tokio::fs::File, + path: Path, + temp_path: tempfile::TempPath, + io_tracker: Arc, + ) -> Self { Self { inner: tokio::io::BufWriter::new(file), cursor: 0, path, + temp_path: Some(temp_path), io_tracker, } } @@ -560,8 +568,24 @@ impl Writer for LocalWriter { ) })?; - let local_path = crate::local::to_local_path(&self.path); - let metadata = std::fs::metadata(&local_path).map_err(|e| { + let final_path = crate::local::to_local_path(&self.path); + let temp_path = self.temp_path.take().ok_or_else(|| { + Error::io( + format!("local writer for {} already shut down", self.path), + location!(), + ) + })?; + temp_path.persist(&final_path).map_err(|e| { + Error::io( + format!( + "failed to persist temp file to {}: {}", + final_path, e.error + ), + location!(), + ) + })?; + + let metadata = std::fs::metadata(&final_path).map_err(|e| { Error::io( format!("failed to read metadata for {}: {}", self.path, e), location!(), @@ -666,19 +690,53 @@ mod tests { let os_path = Path::from_absolute_path(&file_path).unwrap(); let io_tracker = Arc::new(IOTracker::default()); - let file = tokio::fs::File::create(&file_path).await.unwrap(); - let mut writer = LocalWriter::new(file, os_path, io_tracker.clone()); + let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap(); + let temp_file_path = named_temp.path().to_owned(); + let (std_file, temp_path) = named_temp.into_parts(); + let file = tokio::fs::File::from_std(std_file); + let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker.clone()); let data = b"hello local writer"; writer.write_all(data).await.unwrap(); + // Before shutdown, the final path should not exist + assert!(!file_path.exists()); + // But the temp file should exist + assert!(temp_file_path.exists()); + let result = Writer::shutdown(&mut writer).await.unwrap(); assert_eq!(result.size, data.len()); assert!(result.e_tag.is_some()); assert!(!result.e_tag.as_ref().unwrap().is_empty()); + // After shutdown, the final path should exist and temp should be gone + assert!(file_path.exists()); + assert!(!temp_file_path.exists()); + let stats = io_tracker.stats(); assert_eq!(stats.write_iops, 1); assert_eq!(stats.written_bytes, data.len() as u64); } + + #[tokio::test] + async fn test_local_writer_drop_cleans_up() { + let tmp = lance_core::utils::tempfile::TempStdDir::default(); + let file_path = tmp.join("test_drop.bin"); + let os_path = Path::from_absolute_path(&file_path).unwrap(); + let io_tracker = Arc::new(IOTracker::default()); + + let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap(); + let temp_file_path = named_temp.path().to_owned(); + let (std_file, temp_path) = named_temp.into_parts(); + let file = tokio::fs::File::from_std(std_file); + let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker); + + writer.write_all(b"some data").await.unwrap(); + assert!(temp_file_path.exists()); + + // Drop without shutdown should clean up the temp file + drop(writer); + assert!(!temp_file_path.exists()); + assert!(!file_path.exists()); + } } From a19fd9601638a4e21dcb57ed0f22d324aec985d3 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 16:09:03 +0000 Subject: [PATCH 04/10] format --- rust/lance-io/src/object_writer.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index c6bc44ecb0d..3b33e7aa762 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -577,10 +577,7 @@ impl Writer for LocalWriter { })?; temp_path.persist(&final_path).map_err(|e| { Error::io( - format!( - "failed to persist temp file to {}: {}", - final_path, e.error - ), + format!("failed to persist temp file to {}: {}", final_path, e.error), location!(), ) })?; From 70fa379a3a4e0912ba2b7a4180e7548efbfa1e0e Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 18:25:34 +0000 Subject: [PATCH 05/10] windows fix --- rust/lance-io/src/object_store/providers/local.rs | 4 ++++ rust/lance/src/dataset/cleanup.rs | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs index 78c8c9632c4..f2cb5a67144 100644 --- a/rust/lance-io/src/object_store/providers/local.rs +++ b/rust/lance-io/src/object_store/providers/local.rs @@ -124,6 +124,10 @@ mod tests { "C:\\Users\\ADMINI~1\\AppData\\Local\\..\\", "C:/Users/ADMINI~1/AppData", ), + ( + "file-object-store:///C:/Users/ADMINI~1/AppData/Local", + "C:/Users/ADMINI~1/AppData/Local", + ), ]; for (uri, expected_path) in cases { diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 0f4ef9ab52b..351eb22dd37 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1133,7 +1133,11 @@ mod tests { let tmpdir_path = tmpdir.as_str(); // Use file-object-store:// scheme so that writes go through the ObjectStore // wrapper chain (MockObjectStore) instead of the optimized local writer path. - let dataset_path = format!("file-object-store://{}/my_db", tmpdir_path); + // The path must always start with "/" (three slashes after the scheme) so that + // on Windows, a drive letter like "C:" isn't parsed as the URL authority. + let path_prefix = if tmpdir_path.starts_with('/') { "" } else { "/" }; + let dataset_path = + format!("file-object-store://{path_prefix}{tmpdir_path}/my_db"); Ok(Self { _tmpdir: tmpdir, dataset_path, From a5ced5739540a5fb9fe0bde266768d15ce8354ff Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 18:29:27 +0000 Subject: [PATCH 06/10] format --- rust/lance/src/dataset/cleanup.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 351eb22dd37..802658c4567 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1135,9 +1135,12 @@ mod tests { // wrapper chain (MockObjectStore) instead of the optimized local writer path. // The path must always start with "/" (three slashes after the scheme) so that // on Windows, a drive letter like "C:" isn't parsed as the URL authority. - let path_prefix = if tmpdir_path.starts_with('/') { "" } else { "/" }; - let dataset_path = - format!("file-object-store://{path_prefix}{tmpdir_path}/my_db"); + let path_prefix = if tmpdir_path.starts_with('/') { + "" + } else { + "/" + }; + let dataset_path = format!("file-object-store://{path_prefix}{tmpdir_path}/my_db"); Ok(Self { _tmpdir: tmpdir, dataset_path, From c9c57d02cefd190864d5a8e092f7a1345b33e488 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 19:09:53 +0000 Subject: [PATCH 07/10] use mock writer for disk full test --- rust/lance/src/dataset/write.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index c1b36702408..2262ee20abf 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -2593,7 +2593,9 @@ mod tests { let object_store = Arc::new(lance_io::object_store::ObjectStore::new( Arc::new(DiskFullObjectStore) as Arc, - url::Url::parse("file:///test").unwrap(), + // Use a non-"file" scheme so writes go through ObjectWriter (which + // uses the DiskFullObjectStore) instead of the optimized LocalWriter. + url::Url::parse("mock:///test").unwrap(), None, None, false, From af19db610a2c1975a7c2f24dda26478e2971305f Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 20:27:41 +0000 Subject: [PATCH 08/10] fix TempObjFile atomic rename for windows --- rust/lance-core/src/utils/tempfile.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/rust/lance-core/src/utils/tempfile.rs b/rust/lance-core/src/utils/tempfile.rs index f4b804c5c70..a5a13ba26f1 100644 --- a/rust/lance-core/src/utils/tempfile.rs +++ b/rust/lance-core/src/utils/tempfile.rs @@ -268,12 +268,14 @@ impl Deref for TempStdFile { } } -/// A temporary file that is exposed as an object store path +/// A unique path to a temporary file, exposed as an object store path /// -/// This is a wrapper around [`TempFile`] that exposes the path as an object store path. -/// It is useful when you need to create a temporary file that is only used as an object store path. +/// Unlike [`TempFile`], this does not create an empty file. We create a +/// temporary directory and then construct a path inside it, following the +/// same pattern as [`TempStdPath`]. This avoids holding an open file handle, +/// which on Windows would prevent atomic renames to the same path. pub struct TempObjFile { - _tempfile: TempFile, + _tempdir: TempDir, path: ObjPath, } @@ -293,10 +295,10 @@ impl std::ops::Deref for TempObjFile { impl Default for TempObjFile { fn default() -> Self { - let tempfile = TempFile::default(); - let path = tempfile.obj_path(); + let tempdir = TempDir::default(); + let path = ObjPath::parse(format!("{}/some_file", tempdir.path_str())).unwrap(); Self { - _tempfile: tempfile, + _tempdir: tempdir, path, } } From 778e2ee695fcbfdfb7f9e6de3e9dbb7e2f428ce8 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 21:38:52 +0000 Subject: [PATCH 09/10] remove write benchmark --- rust/lance-io/Cargo.toml | 4 --- rust/lance-io/benches/write.rs | 64 ---------------------------------- 2 files changed, 68 deletions(-) delete mode 100644 rust/lance-io/benches/write.rs diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index e1a3a7bfda3..ca21af66610 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -63,10 +63,6 @@ pprof.workspace = true name = "scheduler" harness = false -[[bench]] -name = "write" -harness = false - [features] default = ["aws", "azure", "gcp"] gcs-test = [] diff --git a/rust/lance-io/benches/write.rs b/rust/lance-io/benches/write.rs deleted file mode 100644 index 4a0f631bd1d..00000000000 --- a/rust/lance-io/benches/write.rs +++ /dev/null @@ -1,64 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors -#![allow(clippy::print_stdout)] - -use lance_io::object_store::ObjectStore; -use object_store::path::Path; -use rand::RngCore; -use std::fs::File; -use tokio::{io::AsyncWriteExt, runtime::Runtime}; - -use criterion::{criterion_group, criterion_main, Criterion}; - -fn generate_data(num_bytes: u64) -> Vec { - let mut data = vec![0; num_bytes as usize]; - rand::rng().fill_bytes(&mut data); - data -} - -fn write_basic(data: &[u8], path: &std::path::Path) { - let mut f = File::create(path).unwrap(); - let writer = std::io::BufWriter::new(&mut f); - std::io::Write::write_all(&mut writer.into_inner().unwrap(), data).unwrap(); -} - -async fn write_lance(data: &[u8], obj_store: &ObjectStore, path: &Path) { - let mut writer = obj_store.create(path).await.unwrap(); - writer.write_all(data).await.unwrap(); -} - -const DATA_SIZE: u64 = 128 * 1024 * 1024; - -fn bench_basic_write(c: &mut Criterion) { - let mut group = c.benchmark_group("write"); - - group.throughput(criterion::Throughput::Bytes(DATA_SIZE)); - - let runtime = Runtime::new().unwrap(); - let temp_path = std::env::temp_dir().join("lance_io_bench_write"); - std::fs::create_dir_all(&temp_path).unwrap(); - - let data = generate_data(DATA_SIZE); - - group.bench_function("basic_write", |b| { - let path = temp_path.join("basic_write.file"); - b.iter(|| { - write_basic(&data, &path); - }); - }); - - let obj_store = ObjectStore::local(); - group.bench_function("lance_write", |b| { - let path = Path::from_absolute_path(temp_path.join("lance_write.file")).unwrap(); - b.iter(|| { - runtime.block_on(write_lance(&data, &obj_store, &path)); - }); - }); -} - -criterion_group!( - name=benches; - config = Criterion::default().significance_level(0.1).sample_size(10); - targets = bench_basic_write); - -criterion_main!(benches); From f47a40e3977c2ec722a1adea4094c7abaf1da544 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 16 Feb 2026 21:42:37 +0000 Subject: [PATCH 10/10] use spawn_blocking for rename/metadata operations --- rust/lance-io/src/object_store.rs | 12 ++++++++++-- rust/lance-io/src/object_writer.rs | 31 +++++++++++++++++------------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index aa8447a8993..e908599c500 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -654,8 +654,16 @@ impl ObjectStore { if let Some(parent) = local_path.parent() { tokio::fs::create_dir_all(parent).await?; } - let parent = local_path.parent().expect("file path must have parent"); - let named_temp = tempfile::NamedTempFile::new_in(parent)?; + let parent = local_path + .parent() + .expect("file path must have parent") + .to_owned(); + let named_temp = + tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent)) + .await + .map_err(|e| { + Error::io(format!("spawn_blocking failed: {}", e), location!()) + })??; let (std_file, temp_path) = named_temp.into_parts(); let file = tokio::fs::File::from_std(std_file); Ok(Box::new(LocalWriter::new( diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 3b33e7aa762..a762436211c 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -575,20 +575,25 @@ impl Writer for LocalWriter { location!(), ) })?; - temp_path.persist(&final_path).map_err(|e| { - Error::io( - format!("failed to persist temp file to {}: {}", final_path, e.error), - location!(), - ) - })?; + let path_clone = self.path.clone(); + let e_tag = tokio::task::spawn_blocking(move || -> Result { + temp_path.persist(&final_path).map_err(|e| { + Error::io( + format!("failed to persist temp file to {}: {}", final_path, e.error), + location!(), + ) + })?; - let metadata = std::fs::metadata(&final_path).map_err(|e| { - Error::io( - format!("failed to read metadata for {}: {}", self.path, e), - location!(), - ) - })?; - let e_tag = get_etag(&metadata); + let metadata = std::fs::metadata(&final_path).map_err(|e| { + Error::io( + format!("failed to read metadata for {}: {}", path_clone, e), + location!(), + ) + })?; + Ok(get_etag(&metadata)) + }) + .await + .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e), location!()))??; self.io_tracker .record_write("put", self.path.clone(), self.cursor as u64);