diff --git a/Cargo.lock b/Cargo.lock
index cfcc4899c96..6742f06bc1e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5161,6 +5161,7 @@ dependencies = [
"serde",
"shellexpand",
"snafu",
+ "tempfile",
"test-log",
"tokio",
"tracing",
diff --git a/python/Cargo.lock b/python/Cargo.lock
index 090400fd4b3..65da20f6840 100644
--- a/python/Cargo.lock
+++ b/python/Cargo.lock
@@ -4259,6 +4259,7 @@ dependencies = [
"serde",
"shellexpand",
"snafu",
+ "tempfile",
"tokio",
"tracing",
"url",
diff --git a/rust/lance-core/src/utils/tempfile.rs b/rust/lance-core/src/utils/tempfile.rs
index f4b804c5c70..a5a13ba26f1 100644
--- a/rust/lance-core/src/utils/tempfile.rs
+++ b/rust/lance-core/src/utils/tempfile.rs
@@ -268,12 +268,14 @@ impl Deref for TempStdFile {
}
}
-/// A temporary file that is exposed as an object store path
+/// A unique path to a temporary file, exposed as an object store path
///
-/// This is a wrapper around [`TempFile`] that exposes the path as an object store path.
-/// It is useful when you need to create a temporary file that is only used as an object store path.
+/// Unlike [`TempFile`], this does not create an empty file. We create a
+/// temporary directory and then construct a path inside it, following the
+/// same pattern as [`TempStdPath`]. This avoids holding an open file handle,
+/// which on Windows would prevent atomic renames to the same path.
pub struct TempObjFile {
- _tempfile: TempFile,
+ _tempdir: TempDir,
path: ObjPath,
}
@@ -293,10 +295,10 @@ impl std::ops::Deref for TempObjFile {
impl Default for TempObjFile {
fn default() -> Self {
- let tempfile = TempFile::default();
- let path = tempfile.obj_path();
+ let tempdir = TempDir::default();
+ let path = ObjPath::parse(format!("{}/some_file", tempdir.path_str())).unwrap();
Self {
- _tempfile: tempfile,
+ _tempdir: tempdir,
path,
}
}
diff --git a/rust/lance-file/src/previous/page_table.rs b/rust/lance-file/src/previous/page_table.rs
index 3089a400790..c0a77af1fb3 100644
--- a/rust/lance-file/src/previous/page_table.rs
+++ b/rust/lance-file/src/previous/page_table.rs
@@ -214,7 +214,7 @@ mod tests {
.write(&mut writer, starting_field_id)
.await
.unwrap();
- writer.shutdown().await.unwrap();
+ AsyncWriteExt::shutdown(&mut writer).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
diff --git a/rust/lance-file/src/previous/writer/mod.rs b/rust/lance-file/src/previous/writer/mod.rs
index 7006442e08c..aa9370d4a09 100644
--- a/rust/lance-file/src/previous/writer/mod.rs
+++ b/rust/lance-file/src/previous/writer/mod.rs
@@ -22,7 +22,6 @@ use lance_io::encodings::{
binary::BinaryEncoder, dictionary::DictionaryEncoder, plain::PlainEncoder, Encoder,
};
use lance_io::object_store::ObjectStore;
-use lance_io::object_writer::ObjectWriter;
use lance_io::traits::{WriteExt, Writer};
use object_store::path::Path;
use snafu::location;
@@ -47,10 +46,8 @@ pub trait ManifestProvider {
///
/// Note: the dictionaries have already been written by this point and the schema should
/// be populated with the dictionary lengths/offsets
- async fn store_schema(
- object_writer: &mut ObjectWriter,
- schema: &Schema,
- ) -> Result>;
+ async fn store_schema(object_writer: &mut dyn Writer, schema: &Schema)
+ -> Result >;
}
/// Implementation of ManifestProvider that does not store the schema
@@ -60,7 +57,7 @@ pub(crate) struct NotSelfDescribing {}
#[cfg(test)]
#[async_trait]
impl ManifestProvider for NotSelfDescribing {
- async fn store_schema(_: &mut ObjectWriter, _: &Schema) -> Result > {
+ async fn store_schema(_: &mut dyn Writer, _: &Schema) -> Result > {
Ok(None)
}
}
@@ -79,7 +76,7 @@ impl ManifestProvider for NotSelfDescribing {
/// file_writer.shutdown();
/// ```
pub struct FileWriter {
- pub object_writer: ObjectWriter,
+ pub object_writer: Box,
schema: Schema,
batch_id: i32,
page_table: PageTable,
@@ -109,7 +106,7 @@ impl FileWriter {
}
pub fn with_object_writer(
- object_writer: ObjectWriter,
+ object_writer: Box,
schema: Schema,
options: &FileWriterOptions,
) -> Result {
@@ -213,7 +210,7 @@ impl FileWriter {
.collect::>>()?;
Self::write_array(
- &mut self.object_writer,
+ self.object_writer.as_mut(),
field,
&arrs,
self.batch_id,
@@ -253,7 +250,7 @@ impl FileWriter {
pub async fn finish(&mut self) -> Result {
self.write_footer().await?;
- self.object_writer.shutdown().await?;
+ Writer::shutdown(self.object_writer.as_mut()).await?;
let num_rows = self
.metadata
.batch_offsets
@@ -284,7 +281,7 @@ impl FileWriter {
#[async_recursion]
async fn write_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&ArrayRef],
batch_id: i32,
@@ -385,7 +382,7 @@ impl FileWriter {
}
async fn write_null_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
@@ -399,7 +396,7 @@ impl FileWriter {
/// Write fixed size array, including, primtiives, fixed size binary, and fixed size list.
async fn write_fixed_stride_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
@@ -419,7 +416,7 @@ impl FileWriter {
/// Write var-length binary arrays.
async fn write_binary_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
@@ -435,7 +432,7 @@ impl FileWriter {
}
async fn write_dictionary_arr(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
key_type: &DataType,
@@ -455,7 +452,7 @@ impl FileWriter {
#[async_recursion]
async fn write_struct_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrays: &[&StructArray],
batch_id: i32,
@@ -486,7 +483,7 @@ impl FileWriter {
}
async fn write_list_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
@@ -534,7 +531,7 @@ impl FileWriter {
}
async fn write_large_list_array(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
field: &Field,
arrs: &[&dyn Array],
batch_id: i32,
@@ -597,7 +594,7 @@ impl FileWriter {
let mut stats_page_table = PageTable::default();
for (i, field) in schema.fields.iter().enumerate() {
Self::write_array(
- &mut self.object_writer,
+ self.object_writer.as_mut(),
field,
&[stats_batch.column(i)],
0, // Only one batch for statistics.
@@ -606,8 +603,9 @@ impl FileWriter {
.await?;
}
- let page_table_position =
- stats_page_table.write(&mut self.object_writer, 0).await?;
+ let page_table_position = stats_page_table
+ .write(self.object_writer.as_mut(), 0)
+ .await?;
Ok(Some(StatisticsMetadata {
schema,
@@ -624,7 +622,7 @@ impl FileWriter {
///
/// The offsets and lengths of the written buffers are stored in the given
/// schema so that the dictionaries can be loaded in the future.
- async fn write_dictionaries(writer: &mut ObjectWriter, schema: &mut Schema) -> Result<()> {
+ async fn write_dictionaries(writer: &mut dyn Writer, schema: &mut Schema) -> Result<()> {
// Write dictionary values.
let max_field_id = schema.max_field_id().unwrap_or(-1);
for field_id in 0..max_field_id + 1 {
@@ -680,7 +678,7 @@ impl FileWriter {
let field_id_offset = *self.schema.field_ids().iter().min().unwrap();
let pos = self
.page_table
- .write(&mut self.object_writer, field_id_offset)
+ .write(self.object_writer.as_mut(), field_id_offset)
.await?;
self.metadata.page_table_position = pos;
@@ -688,8 +686,8 @@ impl FileWriter {
self.metadata.stats_metadata = self.write_statistics().await?;
// Step 3. Write manifest and dictionary values.
- Self::write_dictionaries(&mut self.object_writer, &mut self.schema).await?;
- let pos = M::store_schema(&mut self.object_writer, &self.schema).await?;
+ Self::write_dictionaries(self.object_writer.as_mut(), &mut self.schema).await?;
+ let pos = M::store_schema(self.object_writer.as_mut(), &self.schema).await?;
// Step 4. Write metadata.
self.metadata.manifest_position = pos;
diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs
index ea753f463f9..0f00ee45b6a 100644
--- a/rust/lance-file/src/writer.rs
+++ b/rust/lance-file/src/writer.rs
@@ -23,13 +23,13 @@ use lance_encoding::encoder::{
use lance_encoding::repdef::RepDefBuilder;
use lance_encoding::version::LanceFileVersion;
use lance_io::object_store::ObjectStore;
-use lance_io::object_writer::ObjectWriter;
use lance_io::traits::Writer;
use log::{debug, warn};
use object_store::path::Path;
use prost::Message;
use prost_types::Any;
use snafu::location;
+use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tracing::instrument;
@@ -114,7 +114,7 @@ const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;
/// of every chunk so each column's pages can be read back and reassembled in
/// order.
struct PageMetadataSpill {
- writer: ObjectWriter,
+ writer: Box,
object_store: Arc,
path: Path,
/// Current write position in the spill file.
@@ -177,7 +177,7 @@ impl PageMetadataSpill {
for col_idx in 0..self.column_buffers.len() {
self.flush_column(col_idx).await?;
}
- self.writer.shutdown().await?;
+ Writer::shutdown(self.writer.as_mut()).await?;
Ok(())
}
}
@@ -204,7 +204,7 @@ enum PageSpillState {
}
pub struct FileWriter {
- writer: ObjectWriter,
+ writer: Box,
schema: Option,
column_writers: Vec>,
column_metadata: Vec,
@@ -231,7 +231,7 @@ static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
impl FileWriter {
/// Create a new FileWriter with a desired output schema
pub fn try_new(
- object_writer: ObjectWriter,
+ object_writer: Box,
schema: LanceSchema,
options: FileWriterOptions,
) -> Result {
@@ -244,7 +244,7 @@ impl FileWriter {
///
/// The output schema will be set based on the first batch of data to arrive.
/// If no data arrives and the writer is finished then the write will fail.
- pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
+ pub fn new_lazy(object_writer: Box, options: FileWriterOptions) -> Self {
if let Some(format_version) = options.format_version {
if format_version.is_unstable()
&& WARNED_ON_UNSTABLE_API
@@ -304,7 +304,7 @@ impl FileWriter {
Ok(writer.finish().await? as usize)
}
- async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
+ async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> {
writer.write_all(buf).await?;
let pad_bytes = pad_bytes::(buf.len());
writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
@@ -810,13 +810,14 @@ impl FileWriter {
self.writer.write_all(MAGIC).await?;
// 7. close the writer
- self.writer.shutdown().await?;
+ Writer::shutdown(self.writer.as_mut()).await?;
Ok(self.rows_written)
}
pub async fn abort(&mut self) {
- self.writer.abort().await;
+ // For multipart uploads, ObjectWriter's Drop impl will abort
+ // the upload when the writer is dropped.
}
pub async fn tell(&mut self) -> Result {
diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml
index 953c9c39e6c..ca21af66610 100644
--- a/rust/lance-io/Cargo.toml
+++ b/rust/lance-io/Cargo.toml
@@ -47,6 +47,7 @@ tracing.workspace = true
url.workspace = true
path_abs.workspace = true
rand.workspace = true
+tempfile.workspace = true
[dev-dependencies]
criterion.workspace = true
diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs
index 1d2c23030fe..e4827d1b63b 100644
--- a/rust/lance-io/src/encodings/binary.rs
+++ b/rust/lance-io/src/encodings/binary.rs
@@ -488,7 +488,7 @@ mod tests {
let arrs = arr.iter().map(|a| a as &dyn Array).collect::>();
let pos = encoder.encode(arrs.as_slice()).await.unwrap();
- writer.shutdown().await.unwrap();
+ AsyncWriteExt::shutdown(&mut writer).await.unwrap();
Ok(pos)
}
@@ -562,7 +562,7 @@ mod tests {
object_writer.write_all(b"1234").await.unwrap();
let mut encoder = BinaryEncoder::new(&mut object_writer);
let pos = encoder.encode(&[&data]).await.unwrap();
- object_writer.shutdown().await.unwrap();
+ AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
let reader = LocalObjectReader::open_local_path(&path, 1024, None)
.await
@@ -731,7 +731,7 @@ mod tests {
// let arrs = arr.iter().map(|a| a as &dyn Array).collect::>();
let pos = encoder.encode(&[&data]).await.unwrap();
- object_writer.shutdown().await.unwrap();
+ AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
pos
};
diff --git a/rust/lance-io/src/encodings/dictionary.rs b/rust/lance-io/src/encodings/dictionary.rs
index ecc2bce1aec..2352939da27 100644
--- a/rust/lance-io/src/encodings/dictionary.rs
+++ b/rust/lance-io/src/encodings/dictionary.rs
@@ -243,7 +243,7 @@ mod tests {
let mut object_writer = tokio::fs::File::create(&path).await.unwrap();
let mut encoder = PlainEncoder::new(&mut object_writer, arr1.keys().data_type());
pos = encoder.encode(arrs.as_slice()).await.unwrap();
- object_writer.shutdown().await.unwrap();
+ AsyncWriteExt::shutdown(&mut object_writer).await.unwrap();
}
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs
index 9966d8b33ec..b1d2ac225f6 100644
--- a/rust/lance-io/src/encodings/plain.rs
+++ b/rust/lance-io/src/encodings/plain.rs
@@ -756,7 +756,7 @@ mod tests {
let mut writer = tokio::fs::File::create(&path).await.unwrap();
let mut encoder = PlainEncoder::new(&mut writer, array.data_type());
assert_eq!(encoder.encode(&[&array]).await.unwrap(), 0);
- writer.shutdown().await.unwrap();
+ AsyncWriteExt::shutdown(&mut writer).await.unwrap();
}
let reader = LocalObjectReader::open_local_path(&path, 2048, None)
diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs
index a08f81460e2..cb71d0ebeab 100644
--- a/rust/lance-io/src/local.rs
+++ b/rust/lance-io/src/local.rs
@@ -26,6 +26,7 @@ use tokio::sync::OnceCell;
use tracing::instrument;
use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM;
+use crate::object_writer::WriteResult;
use crate::traits::{Reader, Writer};
use crate::utils::tracking_store::IOTracker;
@@ -284,4 +285,10 @@ impl Writer for tokio::fs::File {
async fn tell(&mut self) -> Result {
Ok(self.seek(SeekFrom::Current(0)).await? as usize)
}
+
+ async fn shutdown(&mut self) -> Result {
+ let size = self.seek(SeekFrom::Current(0)).await? as usize;
+ tokio::io::AsyncWriteExt::shutdown(self).await?;
+ Ok(WriteResult { size, e_tag: None })
+ }
}
diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs
index 2e0921542b2..e908599c500 100644
--- a/rust/lance-io/src/object_store.rs
+++ b/rust/lance-io/src/object_store.rs
@@ -37,7 +37,8 @@ pub mod providers;
pub mod storage_options;
mod tracing;
use crate::object_reader::SmallReader;
-use crate::object_writer::WriteResult;
+use crate::object_writer::{LocalWriter, WriteResult};
+use crate::traits::Writer;
use crate::utils::tracking_store::{IOTracker, IoStats};
use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
use lance_core::{Error, Result};
@@ -633,7 +634,7 @@ impl ObjectStore {
let object_store = Self::local();
let absolute_path = expand_path(path.to_string_lossy())?;
let os_path = Path::from_absolute_path(absolute_path)?;
- object_store.create(&os_path).await
+ ObjectWriter::new(&object_store, &os_path).await
}
/// Open an [Reader] from local [std::path::Path]
@@ -645,15 +646,42 @@ impl ObjectStore {
}
/// Create a new file.
- pub async fn create(&self, path: &Path) -> Result {
- ObjectWriter::new(self, path).await
+ pub async fn create(&self, path: &Path) -> Result> {
+ match self.scheme.as_str() {
+ "file" => {
+ let local_path = super::local::to_local_path(path);
+ let local_path = std::path::PathBuf::from(&local_path);
+ if let Some(parent) = local_path.parent() {
+ tokio::fs::create_dir_all(parent).await?;
+ }
+ let parent = local_path
+ .parent()
+ .expect("file path must have parent")
+ .to_owned();
+ let named_temp =
+ tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
+ .await
+ .map_err(|e| {
+ Error::io(format!("spawn_blocking failed: {}", e), location!())
+ })??;
+ let (std_file, temp_path) = named_temp.into_parts();
+ let file = tokio::fs::File::from_std(std_file);
+ Ok(Box::new(LocalWriter::new(
+ file,
+ path.clone(),
+ temp_path,
+ Arc::new(self.io_tracker.clone()),
+ )))
+ }
+ _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
+ }
}
/// A helper function to create a file and write content to it.
pub async fn put(&self, path: &Path, content: &[u8]) -> Result {
let mut writer = self.create(path).await?;
writer.write_all(content).await?;
- writer.shutdown().await
+ Writer::shutdown(writer.as_mut()).await
}
pub async fn delete(&self, path: &Path) -> Result<()> {
@@ -1206,7 +1234,7 @@ mod tests {
let file_path = TempStdFile::default();
let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
writer.write_all(b"LOCAL").await.unwrap();
- writer.shutdown().await.unwrap();
+ Writer::shutdown(&mut writer).await.unwrap();
let reader = ObjectStore::open_local(&file_path).await.unwrap();
let buf = reader.get_range(0..5).await.unwrap();
@@ -1218,7 +1246,7 @@ mod tests {
let file_path = TempStdFile::default();
let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
writer.write_all(b"LOCAL").await.unwrap();
- writer.shutdown().await.unwrap();
+ Writer::shutdown(&mut writer).await.unwrap();
let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
let obj_store = ObjectStore::local();
diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs
index 78c8c9632c4..f2cb5a67144 100644
--- a/rust/lance-io/src/object_store/providers/local.rs
+++ b/rust/lance-io/src/object_store/providers/local.rs
@@ -124,6 +124,10 @@ mod tests {
"C:\\Users\\ADMINI~1\\AppData\\Local\\..\\",
"C:/Users/ADMINI~1/AppData",
),
+ (
+ "file-object-store:///C:/Users/ADMINI~1/AppData/Local",
+ "C:/Users/ADMINI~1/AppData/Local",
+ ),
];
for (uri, expected_path) in cases {
diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs
index f2ad57f56f6..a762436211c 100644
--- a/rust/lance-io/src/object_writer.rs
+++ b/rust/lance-io/src/object_writer.rs
@@ -21,6 +21,7 @@ use lance_core::{Error, Result};
use tracing::Instrument;
use crate::traits::Writer;
+use crate::utils::tracking_store::IOTracker;
use snafu::location;
use tokio::runtime::Handle;
@@ -298,21 +299,6 @@ impl ObjectWriter {
Ok(())
}
- pub async fn shutdown(&mut self) -> Result {
- AsyncWriteExt::shutdown(self).await.map_err(|e| {
- Error::io(
- format!("failed to shutdown object writer for {}: {}", self.path, e),
- // and wrap it in here.
- location!(),
- )
- })?;
- if let UploadState::Done(result) = &self.state {
- Ok(result.clone())
- } else {
- unreachable!()
- }
- }
-
pub async fn abort(&mut self) {
let state = std::mem::replace(&mut self.state, UploadState::Done(WriteResult::default()));
if let UploadState::InProgress { mut upload, .. } = state {
@@ -498,6 +484,151 @@ impl Writer for ObjectWriter {
async fn tell(&mut self) -> Result {
Ok(self.cursor)
}
+
+ async fn shutdown(&mut self) -> Result {
+ AsyncWriteExt::shutdown(self).await.map_err(|e| {
+ Error::io(
+ format!("failed to shutdown object writer for {}: {}", self.path, e),
+ location!(),
+ )
+ })?;
+ if let UploadState::Done(result) = &self.state {
+ Ok(result.clone())
+ } else {
+ unreachable!()
+ }
+ }
+}
+
+pub struct LocalWriter {
+ inner: tokio::io::BufWriter,
+ cursor: usize,
+ path: Path,
+ /// Temp path that auto-deletes on drop. Set to `None` after `persist()`.
+ temp_path: Option,
+ io_tracker: Arc,
+}
+
+impl LocalWriter {
+ pub fn new(
+ file: tokio::fs::File,
+ path: Path,
+ temp_path: tempfile::TempPath,
+ io_tracker: Arc,
+ ) -> Self {
+ Self {
+ inner: tokio::io::BufWriter::new(file),
+ cursor: 0,
+ path,
+ temp_path: Some(temp_path),
+ io_tracker,
+ }
+ }
+}
+
+impl AsyncWrite for LocalWriter {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll> {
+ let poll = Pin::new(&mut self.inner).poll_write(cx, buf);
+ if let Poll::Ready(Ok(n)) = &poll {
+ self.cursor += *n;
+ }
+ poll
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+}
+
+#[async_trait]
+impl Writer for LocalWriter {
+ async fn tell(&mut self) -> Result {
+ Ok(self.cursor)
+ }
+
+ async fn shutdown(&mut self) -> Result {
+ AsyncWriteExt::shutdown(self).await.map_err(|e| {
+ Error::io(
+ format!("failed to shutdown local writer for {}: {}", self.path, e),
+ location!(),
+ )
+ })?;
+
+ let final_path = crate::local::to_local_path(&self.path);
+ let temp_path = self.temp_path.take().ok_or_else(|| {
+ Error::io(
+ format!("local writer for {} already shut down", self.path),
+ location!(),
+ )
+ })?;
+ let path_clone = self.path.clone();
+ let e_tag = tokio::task::spawn_blocking(move || -> Result {
+ temp_path.persist(&final_path).map_err(|e| {
+ Error::io(
+ format!("failed to persist temp file to {}: {}", final_path, e.error),
+ location!(),
+ )
+ })?;
+
+ let metadata = std::fs::metadata(&final_path).map_err(|e| {
+ Error::io(
+ format!("failed to read metadata for {}: {}", path_clone, e),
+ location!(),
+ )
+ })?;
+ Ok(get_etag(&metadata))
+ })
+ .await
+ .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e), location!()))??;
+
+ self.io_tracker
+ .record_write("put", self.path.clone(), self.cursor as u64);
+
+ Ok(WriteResult {
+ size: self.cursor,
+ e_tag: Some(e_tag),
+ })
+ }
+}
+
+// Based on object store's implementation.
+pub fn get_etag(metadata: &std::fs::Metadata) -> String {
+ let inode = get_inode(metadata);
+ let size = metadata.len();
+ let mtime = metadata
+ .modified()
+ .ok()
+ .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
+ .unwrap_or_default()
+ .as_micros();
+
+ // Use an ETag scheme based on that used by many popular HTTP servers
+ //
+ format!("{inode:x}-{mtime:x}-{size:x}")
+}
+
+#[cfg(unix)]
+fn get_inode(metadata: &std::fs::Metadata) -> u64 {
+ std::os::unix::fs::MetadataExt::ino(metadata)
+}
+
+#[cfg(not(unix))]
+fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
+ 0
}
#[cfg(test)]
@@ -525,7 +656,7 @@ mod tests {
assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
assert_eq!(object_writer.tell().await.unwrap(), 256 * 3);
- let res = object_writer.shutdown().await.unwrap();
+ let res = Writer::shutdown(&mut object_writer).await.unwrap();
assert_eq!(res.size, 256 * 3);
// Trigger multi part upload
@@ -540,7 +671,7 @@ mod tests {
// Check the cursor
assert_eq!(object_writer.tell().await.unwrap(), (i + 1) * buf.len());
}
- let res = object_writer.shutdown().await.unwrap();
+ let res = Writer::shutdown(&mut object_writer).await.unwrap();
assert_eq!(res.size, buf.len() * 5);
}
@@ -553,4 +684,61 @@ mod tests {
.unwrap();
object_writer.abort().await;
}
+
+ #[tokio::test]
+ async fn test_local_writer_shutdown() {
+ let tmp = lance_core::utils::tempfile::TempStdDir::default();
+ let file_path = tmp.join("test_local_writer.bin");
+ let os_path = Path::from_absolute_path(&file_path).unwrap();
+ let io_tracker = Arc::new(IOTracker::default());
+
+ let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap();
+ let temp_file_path = named_temp.path().to_owned();
+ let (std_file, temp_path) = named_temp.into_parts();
+ let file = tokio::fs::File::from_std(std_file);
+ let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker.clone());
+
+ let data = b"hello local writer";
+ writer.write_all(data).await.unwrap();
+
+ // Before shutdown, the final path should not exist
+ assert!(!file_path.exists());
+ // But the temp file should exist
+ assert!(temp_file_path.exists());
+
+ let result = Writer::shutdown(&mut writer).await.unwrap();
+ assert_eq!(result.size, data.len());
+ assert!(result.e_tag.is_some());
+ assert!(!result.e_tag.as_ref().unwrap().is_empty());
+
+ // After shutdown, the final path should exist and temp should be gone
+ assert!(file_path.exists());
+ assert!(!temp_file_path.exists());
+
+ let stats = io_tracker.stats();
+ assert_eq!(stats.write_iops, 1);
+ assert_eq!(stats.written_bytes, data.len() as u64);
+ }
+
+ #[tokio::test]
+ async fn test_local_writer_drop_cleans_up() {
+ let tmp = lance_core::utils::tempfile::TempStdDir::default();
+ let file_path = tmp.join("test_drop.bin");
+ let os_path = Path::from_absolute_path(&file_path).unwrap();
+ let io_tracker = Arc::new(IOTracker::default());
+
+ let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap();
+ let temp_file_path = named_temp.path().to_owned();
+ let (std_file, temp_path) = named_temp.into_parts();
+ let file = tokio::fs::File::from_std(std_file);
+ let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker);
+
+ writer.write_all(b"some data").await.unwrap();
+ assert!(temp_file_path.exists());
+
+ // Drop without shutdown should clean up the temp file
+ drop(writer);
+ assert!(!temp_file_path.exists());
+ assert!(!file_path.exists());
+ }
}
diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs
index 4a6631e6ba8..9ad8d86c00c 100644
--- a/rust/lance-io/src/traits.rs
+++ b/rust/lance-io/src/traits.rs
@@ -13,6 +13,8 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use lance_core::Result;
+use crate::object_writer::WriteResult;
+
pub trait ProtoStruct {
type Proto: Message;
}
@@ -22,6 +24,21 @@ pub trait ProtoStruct {
pub trait Writer: AsyncWrite + Unpin + Send {
/// Tell the current offset.
async fn tell(&mut self) -> Result;
+
+ /// Flush all buffered data and finalize the write, returning metadata about
+ /// the written object.
+ async fn shutdown(&mut self) -> Result;
+}
+
+#[async_trait]
+impl Writer for Box {
+ async fn tell(&mut self) -> Result {
+ self.as_mut().tell().await
+ }
+
+ async fn shutdown(&mut self) -> Result {
+ self.as_mut().shutdown().await
+ }
}
/// Lance Write Extension.
diff --git a/rust/lance-io/src/utils/tracking_store.rs b/rust/lance-io/src/utils/tracking_store.rs
index f1afb77990b..dd8474b5683 100644
--- a/rust/lance-io/src/utils/tracking_store.rs
+++ b/rust/lance-io/src/utils/tracking_store.rs
@@ -65,6 +65,27 @@ impl IOTracker {
range,
});
}
+
+ /// Record a write operation for tracking.
+ ///
+ /// This is used by writers that bypass the ObjectStore layer (like LocalWriter)
+ /// to ensure their IO operations are still tracked.
+ pub fn record_write(
+ &self,
+ #[allow(unused_variables)] method: &'static str,
+ #[allow(unused_variables)] path: Path,
+ num_bytes: u64,
+ ) {
+ let mut stats = self.0.lock().unwrap();
+ stats.write_iops += 1;
+ stats.written_bytes += num_bytes;
+ #[cfg(feature = "test-util")]
+ stats.requests.push(IoRequestRecord {
+ method,
+ path,
+ range: None,
+ });
+ }
}
impl WrappingObjectStore for IOTracker {
diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs
index bf18cce4907..3d5dfb34f07 100644
--- a/rust/lance-table/src/io/commit.rs
+++ b/rust/lance-table/src/io/commit.rs
@@ -37,7 +37,7 @@ use futures::{
StreamExt, TryStreamExt,
};
use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
-use lance_io::object_writer::{ObjectWriter, WriteResult};
+use lance_io::object_writer::{get_etag, ObjectWriter, WriteResult};
use log::warn;
use object_store::PutOptions;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
@@ -51,7 +51,7 @@ pub mod external_manifest;
use lance_core::{Error, Result};
use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
-use lance_io::traits::WriteExt;
+use lance_io::traits::{WriteExt, Writer};
use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction};
use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
@@ -204,7 +204,7 @@ pub fn write_manifest_file_to_path<'a>(
object_writer
.write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
.await?;
- let res = object_writer.shutdown().await?;
+ let res = Writer::shutdown(&mut object_writer).await?;
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
Ok(res)
})
@@ -426,36 +426,6 @@ fn current_manifest_local(base: &Path) -> std::io::Result String {
- let inode = get_inode(metadata);
- let size = metadata.len();
- let mtime = metadata
- .modified()
- .ok()
- .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
- .unwrap_or_default()
- .as_micros();
-
- // Use an ETag scheme based on that used by many popular HTTP servers
- //
- //
- format!("{inode:x}-{mtime:x}-{size:x}")
-}
-
-#[cfg(unix)]
-/// We include the inode when available to yield an ETag more resistant to collisions
-/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
-fn get_inode(metadata: &std::fs::Metadata) -> u64 {
- std::os::unix::fs::MetadataExt::ino(metadata)
-}
-
-#[cfg(not(unix))]
-/// On platforms where an inode isn't available, fallback to just relying on size and mtime
-fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
- 0
-}
-
fn list_manifests<'a>(
base_path: &Path,
object_store: &'a dyn OSObjectStore,
diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs
index 2f938d19ece..12e5cc0a09c 100644
--- a/rust/lance-table/src/io/manifest.rs
+++ b/rust/lance-table/src/io/manifest.rs
@@ -19,7 +19,6 @@ use lance_core::{datatypes::Schema, Error, Result};
use lance_io::{
encodings::{binary::BinaryEncoder, plain::PlainEncoder, Encoder},
object_store::ObjectStore,
- object_writer::ObjectWriter,
traits::{WriteExt, Writer},
utils::read_message,
};
@@ -233,7 +232,7 @@ pub struct ManifestDescribing {}
#[async_trait]
impl PreviousManifestProvider for ManifestDescribing {
async fn store_schema(
- object_writer: &mut ObjectWriter,
+ object_writer: &mut dyn Writer,
schema: &Schema,
) -> Result> {
let mut manifest = Manifest::new(
@@ -295,14 +294,14 @@ mod test {
DataStorageFormat::default(),
HashMap::new(),
);
- let pos = write_manifest(&mut writer, &mut manifest, None, None)
+ let pos = write_manifest(writer.as_mut(), &mut manifest, None, None)
.await
.unwrap();
writer
.write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
.await
.unwrap();
- writer.shutdown().await.unwrap();
+ Writer::shutdown(writer.as_mut()).await.unwrap();
let roundtripped_manifest = read_manifest(&store, &path, None).await.unwrap();
diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs
index 1ea9f7c4189..b50247925f1 100644
--- a/rust/lance/src/dataset/blob.rs
+++ b/rust/lance/src/dataset/blob.rs
@@ -22,7 +22,7 @@ use arrow_array::StructArray;
use lance_core::datatypes::{BlobKind, BlobVersion};
use lance_core::utils::blob::blob_path;
use lance_core::{utils::address::RowAddress, Error, Result};
-use lance_io::traits::Reader;
+use lance_io::traits::{Reader, Writer};
const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff
const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff
@@ -40,7 +40,7 @@ struct PackWriter {
data_file_key: String,
max_pack_size: usize,
current_blob_id: Option,
- writer: Option,
+ writer: Option>,
current_size: usize,
}
@@ -102,7 +102,7 @@ impl PackWriter {
async fn finish(&mut self) -> Result<()> {
if let Some(mut writer) = self.writer.take() {
- writer.shutdown().await?;
+ Writer::shutdown(writer.as_mut()).await?;
}
self.current_blob_id = None;
self.current_size = 0;
@@ -172,7 +172,7 @@ impl BlobPreprocessor {
let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
let mut writer = self.object_store.create(&path).await?;
writer.write_all(data).await?;
- writer.shutdown().await?;
+ Writer::shutdown(&mut writer).await?;
Ok(path)
}
diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs
index 1c4d0c90cca..802658c4567 100644
--- a/rust/lance/src/dataset/cleanup.rs
+++ b/rust/lance/src/dataset/cleanup.rs
@@ -1131,7 +1131,16 @@ mod tests {
fn try_new() -> Result {
let tmpdir = TempStrDir::default();
let tmpdir_path = tmpdir.as_str();
- let dataset_path = format!("{}/my_db", tmpdir_path);
+ // Use file-object-store:// scheme so that writes go through the ObjectStore
+ // wrapper chain (MockObjectStore) instead of the optimized local writer path.
+ // The path must always start with "/" (three slashes after the scheme) so that
+ // on Windows, a drive letter like "C:" isn't parsed as the URL authority.
+ let path_prefix = if tmpdir_path.starts_with('/') {
+ ""
+ } else {
+ "/"
+ };
+ let dataset_path = format!("file-object-store://{path_prefix}{tmpdir_path}/my_db");
Ok(Self {
_tmpdir: tmpdir,
dataset_path,
diff --git a/rust/lance/src/dataset/optimize/binary_copy.rs b/rust/lance/src/dataset/optimize/binary_copy.rs
index 2a51e8aca9b..3a350aede82 100644
--- a/rust/lance/src/dataset/optimize/binary_copy.rs
+++ b/rust/lance/src/dataset/optimize/binary_copy.rs
@@ -14,7 +14,6 @@ use lance_encoding::version::LanceFileVersion;
use lance_file::format::pbfile;
use lance_file::reader::FileReader as LFReader;
use lance_file::writer::{FileWriter, FileWriterOptions};
-use lance_io::object_writer::ObjectWriter;
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
use lance_io::traits::Writer;
use lance_table::format::{DataFile, Fragment};
@@ -34,7 +33,7 @@ const ALIGN: usize = 64;
///
/// Returns the new position after padding (if any).
async fn apply_alignment_padding(
- writer: &mut ObjectWriter,
+ writer: &mut dyn Writer,
current_pos: u64,
version: LanceFileVersion,
) -> Result {
@@ -53,7 +52,7 @@ async fn apply_alignment_padding(
async fn init_writer_if_necessary(
dataset: &Dataset,
- current_writer: &mut Option,
+ current_writer: &mut Option>,
current_filename: &mut Option,
) -> Result {
if current_writer.is_none() {
@@ -102,7 +101,7 @@ fn compute_field_column_indices(
async fn finalize_current_output_file(
schema: &Schema,
full_field_ids: &[i32],
- current_writer: &mut Option,
+ current_writer: &mut Option>,
current_filename: &mut Option,
current_page_table: &[ColumnInfo],
col_pages: &mut [Vec],
@@ -232,7 +231,7 @@ pub async fn rewrite_files_binary_copy(
}
let mut out: Vec = Vec::new();
- let mut current_writer: Option = None;
+ let mut current_writer: Option> = None;
let mut current_filename: Option = None;
let mut current_pos: u64 = 0;
let mut current_page_table: Vec = Vec::new();
@@ -367,7 +366,7 @@ pub async fn rewrite_files_binary_copy(
let mut new_offsets = Vec::with_capacity(*buffer_count);
for _ in 0..*buffer_count {
if let Some(bytes) = bytes_iter.next() {
- let writer = current_writer.as_mut().unwrap();
+ let writer = current_writer.as_mut().unwrap().as_mut();
current_pos =
apply_alignment_padding(writer, current_pos, version).await?;
let start = current_pos;
@@ -419,7 +418,7 @@ pub async fn rewrite_files_binary_copy(
.collect();
let bytes_vec = file_scheduler.submit_request(ranges, 0).await?;
for bytes in bytes_vec.into_iter() {
- let writer = current_writer.as_mut().unwrap();
+ let writer = current_writer.as_mut().unwrap().as_mut();
current_pos = apply_alignment_padding(writer, current_pos, version).await?;
let start = current_pos;
writer.write_all(&bytes).await?;
@@ -501,14 +500,14 @@ pub async fn rewrite_files_binary_copy(
/// - v2_0 structural single‑page enforcement is handled when building `final_cols`; this function
/// only performs consistent finalization.
async fn flush_footer(
- mut writer: ObjectWriter,
+ mut writer: Box,
schema: &Schema,
final_cols: &[Arc],
total_rows_in_current: u64,
version: LanceFileVersion,
) -> Result<()> {
let pos = writer.tell().await? as u64;
- let _new_pos = apply_alignment_padding(&mut writer, pos, version).await?;
+ let _new_pos = apply_alignment_padding(writer.as_mut(), pos, version).await?;
let mut col_metadatas = Vec::with_capacity(final_cols.len());
for col in final_cols {
diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs
index c1b36702408..2262ee20abf 100644
--- a/rust/lance/src/dataset/write.rs
+++ b/rust/lance/src/dataset/write.rs
@@ -2593,7 +2593,9 @@ mod tests {
let object_store = Arc::new(lance_io::object_store::ObjectStore::new(
Arc::new(DiskFullObjectStore) as Arc,
- url::Url::parse("file:///test").unwrap(),
+ // Use a non-"file" scheme so writes go through ObjectWriter (which
+ // uses the DiskFullObjectStore) instead of the optimized LocalWriter.
+ url::Url::parse("mock:///test").unwrap(),
None,
None,
false,
diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs
index 7d6280c468c..84b3fec1173 100644
--- a/rust/lance/src/index/vector/ivf.rs
+++ b/rust/lance/src/index/vector/ivf.rs
@@ -82,7 +82,6 @@ use lance_io::{
encodings::plain::PlainEncoder,
local::to_local_path,
object_store::ObjectStore,
- object_writer::ObjectWriter,
stream::RecordBatchStream,
traits::{Reader, WriteExt, Writer},
};
@@ -575,7 +574,7 @@ async fn optimize_ivf_pq_indices(
unindexed: Option,
existing_indices: &[Arc],
options: &OptimizeOptions,
- mut writer: ObjectWriter,
+ mut writer: Box,
dataset_version: u64,
) -> Result {
let metric_type = first_idx.metric_type;
@@ -622,7 +621,13 @@ async fn optimize_ivf_pq_indices(
})
})
.collect::>>()?;
- write_pq_partitions(&mut writer, &mut ivf_mut, shuffled, Some(&indices_to_merge)).await?;
+ write_pq_partitions(
+ writer.as_mut(),
+ &mut ivf_mut,
+ shuffled,
+ Some(&indices_to_merge),
+ )
+ .await?;
let metadata = IvfPQIndexMetadata {
name: format!("_{}_idx", vector_column),
column: vector_column.to_string(),
@@ -639,7 +644,7 @@ async fn optimize_ivf_pq_indices(
// TODO: for now the IVF_PQ index file format hasn't been updated, so keep the old version,
// change it to latest version value after refactoring the IVF_PQ
writer.write_magics(pos, 0, 1, MAGIC).await?;
- writer.shutdown().await?;
+ Writer::shutdown(writer.as_mut()).await?;
Ok(existing_indices.len() - start_pos)
}
@@ -653,8 +658,8 @@ async fn optimize_ivf_hnsw_indices(
unindexed: Option,
existing_indices: &[Arc],
options: &OptimizeOptions,
- writer: ObjectWriter,
- aux_writer: ObjectWriter,
+ writer: Box,
+ aux_writer: Box,
) -> Result {
let distance_type = first_idx.metric_type;
let quantizer = hnsw_index.quantizer().clone();
@@ -1468,7 +1473,7 @@ impl RemapPageTask {
Ok(self)
}
- async fn write(self, writer: &mut ObjectWriter, ivf: &mut IvfModel) -> Result<()> {
+ async fn write(self, writer: &mut dyn Writer, ivf: &mut IvfModel) -> Result<()> {
let page = self.page.as_ref().expect("Load was not called");
let page: &PQIndex = page
.as_any()
@@ -1616,7 +1621,7 @@ pub(crate) async fn remap_index_file(
loss: index.ivf.loss,
};
while let Some(write_task) = task_stream.try_next().await? {
- write_task.write(&mut writer, &mut ivf).await?;
+ write_task.write(writer.as_mut(), &mut ivf).await?;
}
let pq_sub_index = index
@@ -1644,7 +1649,7 @@ pub(crate) async fn remap_index_file(
// TODO: for now the IVF_PQ index file format hasn't been updated, so keep the old version,
// change it to latest version value after refactoring the IVF_PQ
writer.write_magics(pos, 0, 1, MAGIC).await?;
- writer.shutdown().await?;
+ Writer::shutdown(writer.as_mut()).await?;
Ok(())
}
@@ -1674,7 +1679,7 @@ async fn write_ivf_pq_file(
let start = std::time::Instant::now();
let num_partitions = ivf.num_partitions() as u32;
builder::build_partitions(
- &mut writer,
+ writer.as_mut(),
stream,
column,
&mut ivf,
@@ -1705,7 +1710,7 @@ async fn write_ivf_pq_file(
// TODO: for now the IVF_PQ index file format hasn't been updated, so keep the old version,
// change it to latest version value after refactoring the IVF_PQ
writer.write_magics(pos, 0, 1, MAGIC).await?;
- writer.shutdown().await?;
+ Writer::shutdown(writer.as_mut()).await?;
Ok(())
}
@@ -1725,7 +1730,7 @@ pub async fn write_ivf_pq_file_from_existing_index(
.child(index_id.to_string())
.child("index.idx");
let mut writer = obj_store.create(&path).await?;
- write_pq_partitions(&mut writer, &mut ivf, Some(streams), None).await?;
+ write_pq_partitions(writer.as_mut(), &mut ivf, Some(streams), None).await?;
let metadata = IvfPQIndexMetadata::new(
index_name.to_string(),
@@ -1740,7 +1745,7 @@ pub async fn write_ivf_pq_file_from_existing_index(
let metadata = pb::Index::try_from(&metadata)?;
let pos = writer.write_protobuf(&metadata).await?;
writer.write_magics(pos, 0, 1, MAGIC).await?;
- writer.shutdown().await?;
+ Writer::shutdown(writer.as_mut()).await?;
Ok(())
}
diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs
index 42cd5569a77..f19f5bfe48d 100644
--- a/rust/lance/src/index/vector/ivf/builder.rs
+++ b/rust/lance/src/index/vector/ivf/builder.rs
@@ -19,7 +19,6 @@ use lance_index::vector::pq::ProductQuantizer;
use lance_index::vector::quantizer::Quantizer;
use lance_index::vector::PART_ID_COLUMN;
use lance_index::vector::{ivf::storage::IvfModel, transform::Transformer};
-use lance_io::object_writer::ObjectWriter;
use lance_io::stream::RecordBatchStreamAdapter;
use lance_table::io::manifest::ManifestDescribing;
use log::info;
@@ -201,7 +200,7 @@ pub async fn write_vector_storage(
pq: ProductQuantizer,
distance_type: DistanceType,
column: &str,
- writer: ObjectWriter,
+ writer: Box,
precomputed_partitions_ds_uri: Option<&str>,
) -> Result<()> {
info!("Transforming {} vectors for storage", num_rows);