diff --git a/rust/lance-arrow/src/schema.rs b/rust/lance-arrow/src/schema.rs index 16840a7a451..8ce9442b4e5 100644 --- a/rust/lance-arrow/src/schema.rs +++ b/rust/lance-arrow/src/schema.rs @@ -40,6 +40,9 @@ pub trait FieldExt { /// Check if the field is marked as a blob fn is_blob(&self) -> bool; + + /// Check if the field is marked as a blob + fn is_blob_v2(&self) -> bool; } impl FieldExt for Field { @@ -108,6 +111,14 @@ impl FieldExt for Field { .map(|value| value == BLOB_V2_EXT_NAME) .unwrap_or(false) } + + fn is_blob_v2(&self) -> bool { + let field_metadata = self.metadata(); + field_metadata + .get(ARROW_EXT_NAME_KEY) + .map(|value| value == BLOB_V2_EXT_NAME) + .unwrap_or(false) + } } /// Extends the functionality of [arrow_schema::Schema]. diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index cd6377098f0..e96f29efa6a 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -508,6 +508,14 @@ impl Field { .unwrap_or(false) } + /// Returns true if the field is explicitly marked as blob v2 extension. + pub fn is_blob_v2(&self) -> bool { + self.metadata + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BLOB_V2_EXT_NAME) + .unwrap_or(false) + } + /// If the field is a blob, return a new field with the same name and id /// but with the data type set to a struct of the blob description fields. /// @@ -1562,6 +1570,19 @@ mod tests { assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } + #[test] + fn blob_v2_detection_by_extension() { + let metadata = HashMap::from([ + (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), + (BLOB_META_KEY.to_string(), "true".to_string()), + ]); + let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) + .with_metadata(metadata) + .try_into() + .unwrap(); + assert!(field.is_blob_v2()); + } + #[test] fn blob_extension_roundtrip() { let metadata = HashMap::from([ diff --git a/rust/lance-core/src/utils.rs b/rust/lance-core/src/utils.rs index cc0fdf086ec..663454e001b 100644 --- a/rust/lance-core/src/utils.rs +++ b/rust/lance-core/src/utils.rs @@ -5,6 +5,7 @@ pub mod address; pub mod assume; pub mod backoff; pub mod bit; +pub mod blob; pub mod cpu; pub mod deletion; pub mod futures; diff --git a/rust/lance-core/src/utils/blob.rs b/rust/lance-core/src/utils/blob.rs new file mode 100644 index 00000000000..06cbeb43960 --- /dev/null +++ b/rust/lance-core/src/utils/blob.rs @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use object_store::path::Path; + +/// Format a dedicated blob sidecar path for a data file. +/// +/// Layout: `//.raw` +/// - `base` is typically the dataset's data directory. +/// - `data_file_key` is the stem of the data file (without extension). +pub fn blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path { + let file_name = format!("{:08x}.raw", blob_id); + base.child(data_file_key).child(file_name.as_str()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_blob_path_formatting() { + let base = Path::from("base"); + let path = blob_path(&base, "deadbeef", 2); + assert_eq!(path.to_string(), "base/deadbeef/00000002.raw"); + } +} diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index e79e89921b1..dded1044267 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -272,98 +272,92 @@ impl FieldEncoder for BlobV2StructuralEncoder { row_number: u64, num_rows: u64, ) -> Result> { - // Supported input: Struct - let DataType::Struct(fields) = array.data_type() else { - return Err(Error::InvalidInput { - source: "Blob v2 requires struct input".into(), - location: location!(), - }); - }; - let struct_arr = array.as_struct(); if let Some(validity) = struct_arr.nulls() { repdef.add_validity_bitmap(validity.clone()); } else { repdef.add_no_null(struct_arr.len()); } - let mut data_idx = None; - let mut uri_idx = None; - for (idx, field) in fields.iter().enumerate() { - match field.name().as_str() { - "data" => data_idx = Some(idx), - "uri" => uri_idx = Some(idx), - _ => {} - } - } - let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput { - source: "Blob v2 struct must contain 'data' and 'uri' fields".into(), - location: location!(), - })?; - - let data_col = struct_arr.column(data_idx).as_binary::(); - let uri_col = struct_arr.column(uri_idx).as_string::(); - - // Validate XOR(data, uri) - for i in 0..struct_arr.len() { - if struct_arr.is_null(i) { - continue; - } - let data_is_set = !data_col.is_null(i); - let uri_is_set = !uri_col.is_null(i); - if data_is_set == uri_is_set { - return Err(Error::InvalidInput { - source: "Each blob row must set exactly one of data or uri".into(), - location: location!(), - }); - } - } - - let binary_array = data_col; - - let mut kind_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut position_builder = - PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut size_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut blob_id_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); - - for i in 0..struct_arr.len() { - if struct_arr.is_null(i) { - // Packed struct does not support nullable fields; use empty/default values instead. - kind_builder.append_value(BlobKind::Inline as u8); - position_builder.append_value(0); - size_builder.append_value(0); - blob_id_builder.append_value(0); - uri_builder.append_value(""); - continue; - } - let data_is_set = !data_col.is_null(i); - if data_is_set { - let value = binary_array.value(i); - kind_builder.append_value(BlobKind::Inline as u8); - if value.is_empty() { - position_builder.append_value(0); - size_builder.append_value(0); + let kind_col = struct_arr + .column_by_name("kind") + .expect("kind column must exist") + .as_primitive::(); + let data_col = struct_arr + .column_by_name("data") + .expect("data column must exist") + .as_binary::(); + let uri_col = struct_arr + .column_by_name("uri") + .expect("uri column must exist") + .as_string::(); + let blob_id_col = struct_arr + .column_by_name("blob_id") + .expect("blob_id column must exist") + .as_primitive::(); + let blob_size_col = struct_arr + .column_by_name("blob_size") + .expect("blob_size column must exist") + .as_primitive::(); + + let row_count = struct_arr.len(); + + let mut kind_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut position_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut size_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16); + + for i in 0..row_count { + let (kind_value, position_value, size_value, blob_id_value, uri_value) = + if struct_arr.is_null(i) || kind_col.is_null(i) { + (BlobKind::Inline as u8, 0, 0, 0, "".to_string()) } else { - let position = - external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); - position_builder.append_value(position); - size_builder.append_value(value.len() as u64); - } - blob_id_builder.append_value(0); - uri_builder.append_value(""); - } else { - // external uri - let uri = uri_col.value(i); - kind_builder.append_value(BlobKind::External as u8); - position_builder.append_value(0); - size_builder.append_value(0); - blob_id_builder.append_value(0); - uri_builder.append_value(uri); - } + let kind_val = BlobKind::try_from(kind_col.value(i))?; + match kind_val { + BlobKind::Dedicated => ( + BlobKind::Dedicated as u8, + 0, + blob_size_col.value(i), + blob_id_col.value(i), + "".to_string(), + ), + BlobKind::External => ( + BlobKind::External as u8, + 0, + 0, + 0, + uri_col.value(i).to_string(), + ), + BlobKind::Inline => { + let data_val = data_col.value(i); + let blob_len = data_val.len() as u64; + let position = external_buffers + .add_buffer(LanceBuffer::from(Buffer::from(data_val))); + + ( + BlobKind::Inline as u8, + position, + blob_len, + 0, + "".to_string(), + ) + } + BlobKind::Packed => { + return Err(Error::InvalidInput { + source: "Packed blob kind is not supported for v2 encoder".into(), + location: location!(), + }); + } + } + }; + + kind_builder.append_value(kind_value); + position_builder.append_value(position_value); + size_builder.append_value(size_value); + blob_id_builder.append_value(blob_id_value); + uri_builder.append_value(uri_value); } - let children: Vec = vec![ Arc::new(kind_builder.finish()), Arc::new(position_builder.finish()), @@ -507,19 +501,32 @@ mod tests { let blob_metadata = HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); + let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + let kind_array = UInt8Array::from(vec![ + BlobKind::Inline as u8, + BlobKind::External as u8, + BlobKind::External as u8, + ]); let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]); let uri_array = StringArray::from(vec![ None, Some("file:///tmp/external.bin"), Some("s3://bucket/blob"), ]); + let blob_id_array = UInt32Array::from(vec![0, 0, 0]); + let blob_size_array = UInt64Array::from(vec![0, 0, 0]); let struct_array = StructArray::from(vec![ + (kind_field, Arc::new(kind_array) as ArrayRef), (data_field, Arc::new(data_array) as ArrayRef), (uri_field, Arc::new(uri_array) as ArrayRef), + (blob_id_field, Arc::new(blob_id_array) as ArrayRef), + (blob_size_field, Arc::new(blob_size_array) as ArrayRef), ]); let expected_descriptor = StructArray::from(vec![ @@ -561,4 +568,64 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_blob_v2_dedicated_round_trip() { + let blob_metadata = + HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + + let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); + let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); + let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); + let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + + let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]); + let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]); + let uri_array = StringArray::from(vec![Option::<&str>::None, None]); + let blob_id_array = UInt32Array::from(vec![42, 0]); + let blob_size_array = UInt64Array::from(vec![12, 0]); + + let struct_array = StructArray::from(vec![ + (kind_field, Arc::new(kind_array) as ArrayRef), + (data_field, Arc::new(data_array) as ArrayRef), + (uri_field, Arc::new(uri_array) as ArrayRef), + (blob_id_field, Arc::new(blob_id_array) as ArrayRef), + (blob_size_field, Arc::new(blob_size_array) as ArrayRef), + ]); + + let expected_descriptor = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("kind", DataType::UInt8, false)), + Arc::new(UInt8Array::from(vec![ + BlobKind::Dedicated as u8, + BlobKind::Inline as u8, + ])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("position", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("size", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![12, 3])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)), + Arc::new(UInt32Array::from(vec![42, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), + Arc::new(StringArray::from(vec!["", ""])) as ArrayRef, + ), + ]); + + check_round_trip_encoding_of_data_with_expected( + vec![Arc::new(struct_array)], + Some(Arc::new(expected_descriptor)), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + blob_metadata, + ) + .await; + } } diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index e34b82954fc..55f3dad8a04 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -5,15 +5,22 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; +use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::Array; +use arrow_array::RecordBatch; +use arrow_schema::DataType as ArrowDataType; +use lance_arrow::FieldExt; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use object_store::path::Path; use snafu::location; +use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use super::take::TakeBuilder; use super::{Dataset, ProjectionRequest}; use arrow_array::StructArray; use lance_core::datatypes::{BlobKind, BlobVersion}; +use lance_core::utils::blob::blob_path; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; @@ -26,6 +33,196 @@ pub fn blob_version_from_config(config: &HashMap) -> BlobVersion .unwrap_or(BlobVersion::V1) } +const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; + +/// Preprocesses blob v2 columns on the write path so the encoder only sees lightweight descriptors: +/// +/// - Spills large blobs to sidecar files before encoding, reducing memory/CPU and avoiding copying huge payloads through page builders. +/// - Emits `blob_id/blob_size` tied to the data file stem, giving readers a stable path independent of temporary fragment IDs assigned during write. +/// - Leaves small inline blobs and URI rows unchanged for compatibility. +pub struct BlobPreprocessor { + object_store: ObjectStore, + data_dir: Path, + data_file_key: String, + local_counter: u32, +} + +impl BlobPreprocessor { + pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self { + Self { + object_store, + data_dir, + data_file_key, + // Start at 1 to avoid a potential all-zero blob_id value. + local_counter: 1, + } + } + + fn next_blob_id(&mut self) -> u32 { + let id = self.local_counter; + self.local_counter += 1; + id + } + + async fn write_blob(&self, blob_id: u32, data: &[u8]) -> Result { + let path = blob_path(&self.data_dir, &self.data_file_key, blob_id); + let mut writer = self.object_store.create(&path).await?; + writer.write_all(data).await?; + writer.shutdown().await?; + Ok(path) + } + + pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { + let mut new_columns = Vec::with_capacity(batch.num_columns()); + let mut new_fields = Vec::with_capacity(batch.num_columns()); + + for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { + if !field.is_blob_v2() { + new_columns.push(array.clone()); + new_fields.push(field.clone()); + continue; + } + + let struct_arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::invalid_input("Blob column was not a struct array", location!()) + })?; + + let data_col = struct_arr + .column_by_name("data") + .ok_or_else(|| { + Error::invalid_input("Blob struct missing `data` field", location!()) + })? + .as_binary::(); + let uri_col = struct_arr + .column_by_name("uri") + .ok_or_else(|| { + Error::invalid_input("Blob struct missing `uri` field", location!()) + })? + .as_string::(); + + let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); + let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0); + let mut blob_id_builder = + PrimitiveBuilder::::with_capacity(struct_arr.len()); + let mut blob_size_builder = + PrimitiveBuilder::::with_capacity(struct_arr.len()); + let mut kind_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); + + let struct_nulls = struct_arr.nulls(); + + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + kind_builder.append_null(); + continue; + } + + let has_data = !data_col.is_null(i); + let has_uri = !uri_col.is_null(i); + + if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { + let blob_id = self.next_blob_id(); + self.write_blob(blob_id, data_col.value(i)).await?; + + kind_builder.append_value(BlobKind::Dedicated as u8); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(blob_id); + blob_size_builder.append_value(data_col.value(i).len() as u64); + continue; + } + + if has_uri { + let uri_val = uri_col.value(i); + kind_builder.append_value(BlobKind::External as u8); + data_builder.append_null(); + uri_builder.append_value(uri_val); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + continue; + } + + if has_data { + kind_builder.append_value(BlobKind::Inline as u8); + let value = data_col.value(i); + data_builder.append_value(value); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + } else { + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + kind_builder.append_null(); + } + } + + let child_fields = vec![ + arrow_schema::Field::new("kind", ArrowDataType::UInt8, true), + arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true), + arrow_schema::Field::new("uri", ArrowDataType::Utf8, true), + arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), + arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true), + ]; + + let struct_array = arrow_array::StructArray::try_new( + child_fields.clone().into(), + vec![ + Arc::new(kind_builder.finish()), + Arc::new(data_builder.finish()), + Arc::new(uri_builder.finish()), + Arc::new(blob_id_builder.finish()), + Arc::new(blob_size_builder.finish()), + ], + struct_nulls.cloned(), + )?; + + new_columns.push(Arc::new(struct_array)); + new_fields.push(Arc::new( + arrow_schema::Field::new( + field.name(), + ArrowDataType::Struct(child_fields.into()), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + )); + } + + let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata( + new_fields + .iter() + .map(|f| f.as_ref().clone()) + .collect::>(), + batch.schema().metadata().clone(), + )); + + RecordBatch::try_new(new_schema, new_columns) + .map_err(|e| Error::invalid_input(e.to_string(), location!())) + } +} + +pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool { + schema.fields.iter().any(|f| f.is_blob_v2()) +} + +pub async fn preprocess_blob_batches( + batches: &[RecordBatch], + pre: &mut BlobPreprocessor, +) -> Result> { + let mut out = Vec::with_capacity(batches.len()); + for batch in batches { + out.push(pre.preprocess_batch(batch).await?); + } + Ok(out) +} + /// Current state of the reader. Held in a mutex for easy sharing /// /// The u64 is the cursor in the file that the reader is currently at @@ -75,6 +272,18 @@ impl BlobFile { } } + pub fn new_dedicated(dataset: Arc, path: Path, size: u64) -> Self { + Self { + object_store: dataset.object_store.clone(), + path, + position: 0, + size, + kind: BlobKind::Dedicated, + uri: None, + reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), + } + } + pub async fn new_external( uri: String, size: u64, @@ -340,7 +549,7 @@ async fn collect_blob_files_v2( let kinds = descriptions.column(0).as_primitive::(); let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); - let _blob_ids = descriptions.column(3).as_primitive::(); + let blob_ids = descriptions.column(3).as_primitive::(); let blob_uris = descriptions.column(4).as_string::(); let mut files = Vec::with_capacity(row_addrs.len()); @@ -364,6 +573,28 @@ async fn collect_blob_files_v2( size, )); } + BlobKind::Dedicated => { + let blob_id = blob_ids.value(idx); + let size = sizes.value(idx); + let frag_id = RowAddress::from(*row_addr).fragment_id(); + let frag = + dataset + .get_fragment(frag_id as usize) + .ok_or_else(|| Error::Internal { + message: "Fragment not found".to_string(), + location: location!(), + })?; + let data_file = + frag.data_file_for_field(blob_field_id) + .ok_or_else(|| Error::Internal { + message: "Data file not found for blob field".to_string(), + location: location!(), + })?; + + let data_file_key = data_file_key_from_path(data_file.path.as_str()); + let path = blob_path(&dataset.data_dir(), data_file_key, blob_id); + files.push(BlobFile::new_dedicated(dataset.clone(), path, size)); + } BlobKind::External => { let uri = blob_uris.value(idx).to_string(); let size = sizes.value(idx); @@ -387,6 +618,11 @@ async fn collect_blob_files_v2( Ok(files) } +fn data_file_key_from_path(path: &str) -> &str { + let filename = path.rsplit('/').next().unwrap_or(path); + filename.strip_suffix(".lance").unwrap_or(filename) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -401,6 +637,7 @@ mod tests { use lance_datagen::{array, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; + use super::data_file_key_from_path; use crate::{utils::test::TestDatasetGenerator, Dataset}; struct BlobTestFixture { @@ -654,4 +891,11 @@ mod tests { .unwrap(); assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items"); } + + #[test] + fn test_data_file_key_from_path() { + assert_eq!(data_file_key_from_path("data/abc.lance"), "abc"); + assert_eq!(data_file_key_from_path("abc.lance"), "abc"); + assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz"); + } } diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index b4e96ccbe27..0ab7dd15d21 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -18,6 +18,7 @@ use snafu::location; use std::borrow::Cow; use uuid::Uuid; +use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; use crate::dataset::builder::DatasetBuilder; use crate::dataset::write::do_write_fragments; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; @@ -134,9 +135,11 @@ impl<'a> FragmentCreateBuilder<'a> { ¶ms.store_params.clone().unwrap_or_default(), ) .await?; - let filename = format!("{}.lance", generate_random_filename()); + let data_file_key = generate_random_filename(); + let filename = format!("{}.lance", data_file_key); let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); + let has_blob_v2 = schema_has_blob_v2(&schema); let obj_writer = object_store.create(&full_path).await?; let mut writer = lance_file::writer::FileWriter::try_new( obj_writer, @@ -147,6 +150,16 @@ impl<'a> FragmentCreateBuilder<'a> { }, )?; + let mut preprocessor = if has_blob_v2 { + Some(BlobPreprocessor::new( + object_store.as_ref().clone(), + base_path.child(DATA_DIR), + data_file_key.clone(), + )) + } else { + None + }; + let (major, minor) = writer.version().to_numbers(); let data_file = DataFile::new_unstarted(filename, major, minor); @@ -160,7 +173,10 @@ impl<'a> FragmentCreateBuilder<'a> { .map_ok(|batch| vec![batch]) .boxed(); while let Some(batched_chunk) = broken_stream.next().await { - let batch_chunk = batched_chunk?; + let mut batch_chunk = batched_chunk?; + if let Some(pre) = preprocessor.as_mut() { + batch_chunk = preprocess_blob_batches(&batch_chunk, pre).await?; + } writer.write_batches(batch_chunk.iter()).await?; } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index dcd8fa6f774..6f675dbcb6e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -33,6 +33,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tracing::{info, instrument}; +use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; use crate::session::Session; use crate::Dataset; @@ -716,13 +717,21 @@ struct V2WriterAdapter { writer: current_writer::FileWriter, path: String, base_id: Option, + preprocessor: Option, } #[async_trait::async_trait] impl GenericWriter for V2WriterAdapter { async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> { - for batch in batches { - self.writer.write_batch(batch).await?; + if let Some(pre) = self.preprocessor.as_mut() { + let processed = preprocess_blob_batches(batches, pre).await?; + for batch in processed { + self.writer.write_batch(&batch).await?; + } + } else { + for batch in batches { + self.writer.write_batch(batch).await?; + } } Ok(()) } @@ -774,14 +783,17 @@ pub async fn open_writer_with_options( add_data_dir: bool, base_id: Option, ) -> Result> { - let filename = format!("{}.lance", generate_random_filename()); + let data_file_key = generate_random_filename(); + let filename = format!("{}.lance", data_file_key); - let full_path = if add_data_dir { - base_dir.child(DATA_DIR).child(filename.as_str()) + let data_dir = if add_data_dir { + base_dir.child(DATA_DIR) } else { - base_dir.child(filename.as_str()) + base_dir.clone() }; + let full_path = data_dir.child(filename.as_str()); + let writer = if storage_version == LanceFileVersion::Legacy { Box::new(V1WriterAdapter { writer: PreviousFileWriter::::try_new( @@ -804,10 +816,20 @@ pub async fn open_writer_with_options( ..Default::default() }, )?; + let preprocessor = if schema_has_blob_v2(schema) { + Some(BlobPreprocessor::new( + object_store.clone(), + data_dir.clone(), + data_file_key.clone(), + )) + } else { + None + }; let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, base_id, + preprocessor, }; Box::new(writer_adapter) as Box }; @@ -875,7 +897,7 @@ impl WriterGenerator { let writer = if let Some(base_info) = self.select_target_base() { open_writer_with_options( - base_info.object_store.as_ref(), + &base_info.object_store, &self.schema, &base_info.base_dir, self.storage_version, @@ -885,7 +907,7 @@ impl WriterGenerator { .await? } else { open_writer( - self.object_store.as_ref(), + &self.object_store, &self.schema, &self.base_dir, self.storage_version, @@ -1403,7 +1425,7 @@ mod tests { let base_dir = Path::from("test/bucket2"); let mut inner_writer = open_writer_with_options( - object_store.as_ref(), + &object_store, &schema, &base_dir, LanceFileVersion::Stable, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index bc315450e21..cd50f01226b 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -887,7 +887,7 @@ impl MergeInsertJob { .data_storage_format .lance_file_version()?; let mut writer = open_writer( - dataset.object_store(), + &dataset.object_store, &write_schema, &dataset.base, data_storage_version,