From deb1db4945f6955ebf65287c722de52cfff290d2 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 12:54:03 +0800 Subject: [PATCH 01/17] Save work Signed-off-by: Xuanwo --- rust/lance-core/src/utils.rs | 1 + rust/lance-core/src/utils/blob.rs | 57 ++++ .../src/encodings/logical/blob.rs | 91 ++++-- rust/lance/src/dataset.rs | 4 + rust/lance/src/dataset/blob.rs | 52 +++- rust/lance/src/dataset/fragment/write.rs | 18 +- rust/lance/src/dataset/write.rs | 274 +++++++++++++++++- rust/lance/src/dataset/write/merge_insert.rs | 2 +- 8 files changed, 452 insertions(+), 47 deletions(-) create mode 100644 rust/lance-core/src/utils/blob.rs diff --git a/rust/lance-core/src/utils.rs b/rust/lance-core/src/utils.rs index cc0fdf086ec..663454e001b 100644 --- a/rust/lance-core/src/utils.rs +++ b/rust/lance-core/src/utils.rs @@ -5,6 +5,7 @@ pub mod address; pub mod assume; pub mod backoff; pub mod bit; +pub mod blob; pub mod cpu; pub mod deletion; pub mod futures; diff --git a/rust/lance-core/src/utils/blob.rs b/rust/lance-core/src/utils/blob.rs new file mode 100644 index 00000000000..84bb7bbe75e --- /dev/null +++ b/rust/lance-core/src/utils/blob.rs @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use object_store::path::Path; +use rand::RngCore; + +/// Directory name for blob sidecar files. +pub const BLOB_SIDECAR_DIR: &str = "_blob"; + +/// Format a dedicated blob sidecar path. +/// +/// Layout: `_blob//--.raw` +pub fn blob_path(base: &Path, stem: &str, field_id: u32, blob_id: u32, prefix: &str) -> Path { + let file_name = format!("{}-{:08x}-{:08x}.raw", prefix, field_id, blob_id); + base.child(BLOB_SIDECAR_DIR) + .child(stem) + .child(file_name.as_str()) +} + +/// Generate a high-entropy prefix using the same pattern as data file names. +/// +/// Pattern: first 24 bits as binary, remaining 13 bytes as hex (26 chars). +pub fn generate_random_prefix() -> String { + let mut bytes = [0u8; 16]; + rand::rng().fill_bytes(&mut bytes); + + let mut out = String::with_capacity(50); + + for &b in &bytes[..3] { + for i in (0..8).rev() { + out.push(if (b >> i) & 1 == 1 { '1' } else { '0' }); + } + } + + const HEX: &[u8; 16] = b"0123456789abcdef"; + for &b in &bytes[3..] { + out.push(HEX[(b >> 4) as usize] as char); + out.push(HEX[(b & 0xf) as usize] as char); + } + + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_blob_path_formatting() { + let base = Path::from("base"); + let path = blob_path(&base, "stem", 1, 2, "pfx"); + assert_eq!( + path.to_string(), + "base/_blob/stem/pfx-00000001-00000002.raw" + ); + } +} diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index e2658e42827..53ce79b8667 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -282,10 +282,14 @@ impl FieldEncoder for BlobV2StructuralEncoder { let struct_arr = array.as_struct(); let mut data_idx = None; let mut uri_idx = None; + let mut blob_id_idx = None; + let mut blob_size_idx = None; for (idx, field) in fields.iter().enumerate() { match field.name().as_str() { "data" => data_idx = Some(idx), "uri" => uri_idx = Some(idx), + "blob_id" => blob_id_idx = Some(idx), + "blob_size" => blob_size_idx = Some(idx), _ => {} } } @@ -297,40 +301,20 @@ impl FieldEncoder for BlobV2StructuralEncoder { let data_col = struct_arr.column(data_idx).as_binary::(); let uri_col = struct_arr.column(uri_idx).as_string::(); - // Validate XOR(data, uri) - for i in 0..struct_arr.len() { - if struct_arr.is_null(i) { - continue; - } - let data_is_set = !data_col.is_null(i); - let uri_is_set = !uri_col.is_null(i); - if data_is_set == uri_is_set { - return Err(Error::InvalidInput { - source: "Each blob row must set exactly one of data or uri".into(), - location: location!(), - }); - } - if uri_is_set { - return Err(Error::NotSupported { - source: "External blob (uri) is not supported yet".into(), - location: location!(), - }); - } - } + let blob_id_col = blob_id_idx.map(|i| struct_arr.column(i).as_primitive::()); + let blob_size_col = + blob_size_idx.map(|i| struct_arr.column(i).as_primitive::()); - let binary_array = data_col; + let mut kind_builder = PrimitiveBuilder::::with_capacity(data_col.len()); + let mut position_builder = PrimitiveBuilder::::with_capacity(data_col.len()); + let mut size_builder = PrimitiveBuilder::::with_capacity(data_col.len()); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(data_col.len()); + let mut uri_builder = StringBuilder::with_capacity(data_col.len(), 0); - let mut kind_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut position_builder = - PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut size_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut blob_id_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); - let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); - - for i in 0..binary_array.len() { + for i in 0..data_col.len() { let is_null_row = match array.data_type() { DataType::Struct(_) => array.is_null(i), - _ => binary_array.is_null(i), + _ => data_col.is_null(i), }; if is_null_row { kind_builder.append_null(); @@ -341,7 +325,52 @@ impl FieldEncoder for BlobV2StructuralEncoder { continue; } - let value = binary_array.value(i); + let has_blob_id = blob_id_col + .as_ref() + .map(|col| col.is_valid(i)) + .unwrap_or(false); + let has_blob_size = blob_size_col + .as_ref() + .map(|col| col.is_valid(i)) + .unwrap_or(false); + + if has_blob_id || has_blob_size { + if !(has_blob_id && has_blob_size) { + return Err(Error::InvalidInput { + source: "blob_id and blob_size must both be set for dedicated blobs".into(), + location: location!(), + }); + } + let blob_id = blob_id_col.as_ref().unwrap().value(i); + let blob_size = blob_size_col.as_ref().unwrap().value(i); + kind_builder.append_value(2); + position_builder.append_value(0); + size_builder.append_value(blob_size); + blob_id_builder.append_value(blob_id); + uri_builder.append_null(); + continue; + } + + let data_is_set = !data_col.is_null(i); + let uri_is_set = !uri_col.is_null(i); + if data_is_set == uri_is_set { + return Err(Error::InvalidInput { + source: "Each blob row must set exactly one of data or uri".into(), + location: location!(), + }); + } + + if uri_is_set { + let uri_val = uri_col.value(i); + kind_builder.append_value(3); + position_builder.append_value(0); + size_builder.append_value(0); + blob_id_builder.append_value(0); + uri_builder.append_value(uri_val); + continue; + } + + let value = data_col.value(i); kind_builder.append_value(0); if value.is_empty() { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 4fecf55b25e..aac3a9c74d8 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1574,6 +1574,10 @@ impl Dataset { &self.object_store } + pub(crate) fn object_store_arc(&self) -> Arc { + self.object_store.clone() + } + /// Returns the storage options used when opening this dataset, if any. pub fn storage_options(&self) -> Option<&HashMap> { self.store_params diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 34f644c5d22..071308d4ad0 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -12,6 +12,7 @@ use tokio::sync::Mutex; use super::Dataset; use arrow_array::{Array, StructArray}; use lance_core::datatypes::BlobVersion; +use lance_core::utils::blob::blob_path; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; @@ -69,6 +70,16 @@ impl BlobFile { } } + pub fn new_dedicated(dataset: Arc, path: Path, size: u64) -> Self { + Self { + dataset, + data_file: path, + position: 0, + size, + reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), + } + } + /// Close the blob file, releasing any associated resources pub async fn close(&self) -> Result<()> { let mut reader = self.reader.lock().await; @@ -210,6 +221,8 @@ pub(super) async fn take_blobs( } const INLINE_BLOB_KIND: u8 = 0; +const DEDICATED_BLOB_KIND: u8 = 2; +const EXTERNAL_BLOB_KIND: u8 = 3; fn collect_blob_files_v1( dataset: &Arc, @@ -245,8 +258,8 @@ fn collect_blob_files_v2( let kinds = descriptions.column(0).as_primitive::(); let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); - let _blob_ids = descriptions.column(3).as_primitive::(); - let _uris = descriptions.column(4).as_string::(); + let blob_ids = descriptions.column(3).as_primitive::(); + let uris = descriptions.column(4).as_string::(); let mut files = Vec::with_capacity(row_addrs.len()); for (idx, row_addr) in row_addrs.values().iter().enumerate() { @@ -270,6 +283,41 @@ fn collect_blob_files_v2( size, )); } + DEDICATED_BLOB_KIND => { + if blob_ids.is_null(idx) || sizes.is_null(idx) { + return Err(Error::corrupt_file( + dataset.data_dir(), + "Missing blob_id or size for dedicated blob", + location!(), + )); + } + let blob_id = blob_ids.value(idx); + let size = sizes.value(idx); + let frag_id = RowAddress::from(*row_addr).fragment_id(); + let frag = + dataset + .get_fragment(frag_id as usize) + .ok_or_else(|| Error::Internal { + message: "Fragment not found".to_string(), + location: location!(), + })?; + let data_file = + frag.data_file_for_field(blob_field_id) + .ok_or_else(|| Error::Internal { + message: "Data file not found for blob field".to_string(), + location: location!(), + })?; + let stem = data_file + .path + .strip_suffix(".lance") + .unwrap_or(&data_file.path); + let path = blob_path(&dataset.data_dir(), stem, blob_field_id, blob_id, stem); + files.push(BlobFile::new_dedicated(dataset.clone(), path, size)); + } + EXTERNAL_BLOB_KIND => { + let _ = uris.value(idx); + // External blobs are not fetched; skip emitting BlobFile + } other => { return Err(Error::NotSupported { source: format!("Blob kind {} is not supported", other).into(), diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index b4e96ccbe27..b6ef0b0ab89 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -19,7 +19,7 @@ use std::borrow::Cow; use uuid::Uuid; use crate::dataset::builder::DatasetBuilder; -use crate::dataset::write::do_write_fragments; +use crate::dataset::write::{do_write_fragments, preprocess_blob_batches, BlobPreprocessor}; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; use crate::Result; @@ -147,6 +147,19 @@ impl<'a> FragmentCreateBuilder<'a> { }, )?; + let field_ids = writer + .field_id_to_column_indices() + .iter() + .map(|(id, _)| *id) + .collect::>(); + let mut preprocessor = BlobPreprocessor::new( + object_store.clone(), + base_path.child(DATA_DIR), + filename.trim_end_matches(".lance").to_string(), + id as u32, + field_ids, + ); + let (major, minor) = writer.version().to_numbers(); let data_file = DataFile::new_unstarted(filename, major, minor); @@ -160,7 +173,8 @@ impl<'a> FragmentCreateBuilder<'a> { .map_ok(|batch| vec![batch]) .boxed(); while let Some(batched_chunk) = broken_stream.next().await { - let batch_chunk = batched_chunk?; + let mut batch_chunk = batched_chunk?; + batch_chunk = preprocess_blob_batches(&batch_chunk, &mut preprocessor).await?; writer.write_batches(batch_chunk.iter()).await?; } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 3726f6e1a03..fdbc42a86a0 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1,7 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use arrow_array::RecordBatch; +use arrow_array::{ + builder::LargeBinaryBuilder, builder::LargeStringBuilder, cast::AsArray, Array, RecordBatch, +}; +use arrow_schema::DataType as ArrowDataType; use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; @@ -10,6 +13,7 @@ use lance_core::datatypes::{ BlobVersion, NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, }; use lance_core::error::LanceOptionExt; +use lance_core::utils::blob::blob_path; use lance_core::utils::tempfile::TempDir; use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_DATA, TRACE_FILE_AUDIT}; use lance_core::{datatypes::Schema, Error, Result}; @@ -31,6 +35,7 @@ use std::collections::HashMap; use std::num::NonZero; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use tokio::io::AsyncWriteExt; use tracing::{info, instrument}; use crate::session::Session; @@ -70,6 +75,230 @@ pub enum WriteDestination<'a> { Uri(&'a str), } +const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; + +pub(crate) struct BlobPreprocessor { + object_store: Arc, + data_dir: Path, + data_file_stem: String, + fragment_id: u32, + local_counter: u32, + field_ids: Vec, +} + +impl BlobPreprocessor { + pub(crate) fn new( + object_store: Arc, + data_dir: Path, + data_file_stem: String, + fragment_id: u32, + field_ids: Vec, + ) -> Self { + Self { + object_store, + data_dir, + data_file_stem, + fragment_id, + local_counter: 0, + field_ids, + } + } + + fn next_blob_id(&mut self) -> u32 { + let id = (self.fragment_id << 16) | self.local_counter; + self.local_counter = self.local_counter.wrapping_add(1); + id + } + + async fn write_blob(&self, field_id: u32, blob_id: u32, data: &[u8]) -> Result { + let path = blob_path( + &self.data_dir, + &self.data_file_stem, + field_id, + blob_id, + &self.data_file_stem, + ); + let mut writer = self.object_store.create(&path).await?; + writer.write_all(data).await?; + writer.shutdown().await?; + Ok(path) + } + + async fn delete_blob(&self, path: &Path) { + let _ = self.object_store.delete(path).await; + } + + async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { + let mut new_columns = Vec::with_capacity(batch.num_columns()); + let mut new_fields = Vec::with_capacity(batch.num_columns()); + let mut written_paths = Vec::new(); + + for (col_idx, (array, field)) in batch + .columns() + .iter() + .zip(batch.schema().fields()) + .enumerate() + { + let is_blob_struct = matches!(field.data_type(), ArrowDataType::Struct(_)) + && field.metadata().get(lance_arrow::BLOB_META_KEY).is_some(); + + if !is_blob_struct { + new_columns.push(array.clone()); + new_fields.push(field.clone()); + continue; + } + + let struct_arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::invalid_input("Blob column was not a struct array", location!()) + })?; + + let fields = match field.data_type() { + ArrowDataType::Struct(f) => f, + _ => unreachable!(), + }; + + let mut data_idx = None; + let mut uri_idx = None; + for (idx, child) in fields.iter().enumerate() { + match child.name().as_str() { + "data" => data_idx = Some(idx), + "uri" => uri_idx = Some(idx), + _ => {} + } + } + + let data_idx = data_idx.ok_or_else(|| { + Error::invalid_input("Blob struct missing `data` field", location!()) + })?; + let uri_idx = uri_idx.ok_or_else(|| { + Error::invalid_input("Blob struct missing `uri` field", location!()) + })?; + + let data_col = struct_arr.column(data_idx).as_binary::(); + let uri_col = struct_arr.column(uri_idx).as_string::(); + + let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); + let mut uri_builder = LargeStringBuilder::with_capacity(struct_arr.len(), 0); + let mut blob_id_builder = arrow_array::builder::PrimitiveBuilder::< + arrow_array::types::UInt32Type, + >::with_capacity(struct_arr.len()); + let mut blob_size_builder = arrow_array::builder::PrimitiveBuilder::< + arrow_array::types::UInt64Type, + >::with_capacity(struct_arr.len()); + + let struct_nulls = struct_arr.nulls(); + + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + continue; + } + + let has_data = !data_col.is_null(i); + let has_uri = !uri_col.is_null(i); + + if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { + let blob_id = self.next_blob_id(); + let field_id = *self.field_ids.get(col_idx).unwrap_or(&(col_idx as u32)); + match self.write_blob(field_id, blob_id, data_col.value(i)).await { + Ok(path) => written_paths.push(path), + Err(err) => { + for path in &written_paths { + self.delete_blob(path).await; + } + return Err(err); + } + } + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(blob_id); + blob_size_builder.append_value(data_col.value(i).len() as u64); + continue; + } + + if has_uri { + let uri_val = uri_col.value(i); + data_builder.append_null(); + uri_builder.append_value(uri_val); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + continue; + } + + if has_data { + let value = data_col.value(i); + data_builder.append_value(value); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + } else { + // row null or missing + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + } + } + + let child_fields = vec![ + arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true), + arrow_schema::Field::new("uri", ArrowDataType::Utf8, true), + arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), + arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true), + ]; + + let struct_array = arrow_array::StructArray::try_new( + child_fields.clone().into(), + vec![ + Arc::new(data_builder.finish()), + Arc::new(uri_builder.finish()), + Arc::new(blob_id_builder.finish()), + Arc::new(blob_size_builder.finish()), + ], + struct_nulls.cloned(), + )?; + + new_columns.push(Arc::new(struct_array)); + new_fields.push(Arc::new( + arrow_schema::Field::new( + field.name(), + ArrowDataType::Struct(child_fields.into()), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + )); + } + + let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata( + new_fields + .iter() + .map(|f| f.as_ref().clone()) + .collect::>(), + batch.schema().metadata().clone(), + )); + + RecordBatch::try_new(new_schema, new_columns) + .map_err(|e| Error::invalid_input(e.to_string(), location!())) + } +} + +pub(crate) async fn preprocess_blob_batches( + batches: &[RecordBatch], + pre: &mut BlobPreprocessor, +) -> Result> { + let mut out = Vec::with_capacity(batches.len()); + for batch in batches { + out.push(pre.preprocess_batch(batch).await?); + } + Ok(out) +} + impl WriteDestination<'_> { pub fn dataset(&self) -> Option<&Dataset> { match self { @@ -718,13 +947,21 @@ struct V2WriterAdapter { writer: current_writer::FileWriter, path: String, base_id: Option, + preprocessor: Option, } #[async_trait::async_trait] impl GenericWriter for V2WriterAdapter { async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> { - for batch in batches { - self.writer.write_batch(batch).await?; + if let Some(pre) = self.preprocessor.as_mut() { + let processed = preprocess_blob_batches(batches, pre).await?; + for batch in processed { + self.writer.write_batch(&batch).await?; + } + } else { + for batch in batches { + self.writer.write_batch(batch).await?; + } } Ok(()) } @@ -760,7 +997,7 @@ impl GenericWriter for V2WriterAdapter { } pub async fn open_writer( - object_store: &ObjectStore, + object_store: &Arc, schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, @@ -769,7 +1006,7 @@ pub async fn open_writer( } pub async fn open_writer_with_options( - object_store: &ObjectStore, + object_store: &Arc, schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, @@ -778,12 +1015,14 @@ pub async fn open_writer_with_options( ) -> Result> { let filename = format!("{}.lance", generate_random_filename()); - let full_path = if add_data_dir { - base_dir.child(DATA_DIR).child(filename.as_str()) + let data_dir = if add_data_dir { + base_dir.child(DATA_DIR) } else { - base_dir.child(filename.as_str()) + base_dir.clone() }; + let full_path = data_dir.child(filename.as_str()); + let writer = if storage_version == LanceFileVersion::Legacy { Box::new(V1WriterAdapter { writer: PreviousFileWriter::::try_new( @@ -806,10 +1045,23 @@ pub async fn open_writer_with_options( ..Default::default() }, )?; + let field_ids = schema + .fields + .iter() + .map(|f| f.id as u32) + .collect::>(); + let preprocessor = Some(BlobPreprocessor::new( + object_store.clone(), + data_dir.clone(), + filename.trim_end_matches(".lance").to_string(), + 0, + field_ids, + )); let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, base_id, + preprocessor, }; Box::new(writer_adapter) as Box }; @@ -877,7 +1129,7 @@ impl WriterGenerator { let writer = if let Some(base_info) = self.select_target_base() { open_writer_with_options( - base_info.object_store.as_ref(), + &base_info.object_store, &self.schema, &base_info.base_dir, self.storage_version, @@ -887,7 +1139,7 @@ impl WriterGenerator { .await? } else { open_writer( - self.object_store.as_ref(), + &self.object_store, &self.schema, &self.base_dir, self.storage_version, @@ -1405,7 +1657,7 @@ mod tests { let base_dir = Path::from("test/bucket2"); let mut inner_writer = open_writer_with_options( - object_store.as_ref(), + &object_store, &schema, &base_dir, LanceFileVersion::Stable, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 69222e90903..daf1b6fbf4b 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -887,7 +887,7 @@ impl MergeInsertJob { .data_storage_format .lance_file_version()?; let mut writer = open_writer( - dataset.object_store(), + &dataset.object_store_arc(), &write_schema, &dataset.base, data_storage_version, From f4ac526fe33da540b05c840671fd93366f85ad75 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 16:08:07 +0800 Subject: [PATCH 02/17] save work Signed-off-by: Xuanwo --- rust/lance-core/src/utils/blob.rs | 13 ++++---- rust/lance/src/dataset.rs | 4 --- rust/lance/src/dataset/blob.rs | 17 ++++------ rust/lance/src/dataset/fragment/write.rs | 26 ++++++++++----- rust/lance/src/dataset/write.rs | 35 ++++++++++---------- rust/lance/src/dataset/write/merge_insert.rs | 2 +- 6 files changed, 49 insertions(+), 48 deletions(-) diff --git a/rust/lance-core/src/utils/blob.rs b/rust/lance-core/src/utils/blob.rs index 84bb7bbe75e..3b66c08d241 100644 --- a/rust/lance-core/src/utils/blob.rs +++ b/rust/lance-core/src/utils/blob.rs @@ -9,11 +9,12 @@ pub const BLOB_SIDECAR_DIR: &str = "_blob"; /// Format a dedicated blob sidecar path. /// -/// Layout: `_blob//--.raw` -pub fn blob_path(base: &Path, stem: &str, field_id: u32, blob_id: u32, prefix: &str) -> Path { - let file_name = format!("{}-{:08x}-{:08x}.raw", prefix, field_id, blob_id); +/// Layout: `_blob///.raw` +pub fn blob_path(base: &Path, fragment_id: u32, field_id: u32, blob_id: u32) -> Path { + let file_name = format!("{:08x}.raw", blob_id); base.child(BLOB_SIDECAR_DIR) - .child(stem) + .child(format!("{:08x}", fragment_id)) + .child(format!("{:08x}", field_id)) .child(file_name.as_str()) } @@ -48,10 +49,10 @@ mod tests { #[test] fn test_blob_path_formatting() { let base = Path::from("base"); - let path = blob_path(&base, "stem", 1, 2, "pfx"); + let path = blob_path(&base, 0x10, 1, 2); assert_eq!( path.to_string(), - "base/_blob/stem/pfx-00000001-00000002.raw" + "base/_blob/00000010/00000001/00000002.raw" ); } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index aac3a9c74d8..4fecf55b25e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1574,10 +1574,6 @@ impl Dataset { &self.object_store } - pub(crate) fn object_store_arc(&self) -> Arc { - self.object_store.clone() - } - /// Returns the storage options used when opening this dataset, if any. pub fn storage_options(&self) -> Option<&HashMap> { self.store_params diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 071308d4ad0..61c400e5fcc 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -301,17 +301,12 @@ fn collect_blob_files_v2( message: "Fragment not found".to_string(), location: location!(), })?; - let data_file = - frag.data_file_for_field(blob_field_id) - .ok_or_else(|| Error::Internal { - message: "Data file not found for blob field".to_string(), - location: location!(), - })?; - let stem = data_file - .path - .strip_suffix(".lance") - .unwrap_or(&data_file.path); - let path = blob_path(&dataset.data_dir(), stem, blob_field_id, blob_id, stem); + frag.data_file_for_field(blob_field_id) + .ok_or_else(|| Error::Internal { + message: "Data file not found for blob field".to_string(), + location: location!(), + })?; + let path = blob_path(&dataset.data_dir(), frag_id, blob_field_id, blob_id); files.push(BlobFile::new_dedicated(dataset.clone(), path, size)); } EXTERNAL_BLOB_KIND => { diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index b6ef0b0ab89..0254ed243f7 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -19,7 +19,9 @@ use std::borrow::Cow; use uuid::Uuid; use crate::dataset::builder::DatasetBuilder; -use crate::dataset::write::{do_write_fragments, preprocess_blob_batches, BlobPreprocessor}; +use crate::dataset::write::{ + do_write_fragments, preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor, +}; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; use crate::Result; @@ -137,6 +139,7 @@ impl<'a> FragmentCreateBuilder<'a> { let filename = format!("{}.lance", generate_random_filename()); let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); + let has_blob_v2 = schema_has_blob_v2(&schema); let obj_writer = object_store.create(&full_path).await?; let mut writer = lance_file::writer::FileWriter::try_new( obj_writer, @@ -152,13 +155,16 @@ impl<'a> FragmentCreateBuilder<'a> { .iter() .map(|(id, _)| *id) .collect::>(); - let mut preprocessor = BlobPreprocessor::new( - object_store.clone(), - base_path.child(DATA_DIR), - filename.trim_end_matches(".lance").to_string(), - id as u32, - field_ids, - ); + let mut preprocessor = if has_blob_v2 { + Some(BlobPreprocessor::new( + object_store.clone(), + base_path.child(DATA_DIR), + id as u32, + field_ids, + )) + } else { + None + }; let (major, minor) = writer.version().to_numbers(); @@ -174,7 +180,9 @@ impl<'a> FragmentCreateBuilder<'a> { .boxed(); while let Some(batched_chunk) = broken_stream.next().await { let mut batch_chunk = batched_chunk?; - batch_chunk = preprocess_blob_batches(&batch_chunk, &mut preprocessor).await?; + if let Some(pre) = preprocessor.as_mut() { + batch_chunk = preprocess_blob_batches(&batch_chunk, pre).await?; + } writer.write_batches(batch_chunk.iter()).await?; } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index fdbc42a86a0..ac3fe02d7d8 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -80,7 +80,6 @@ const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; pub(crate) struct BlobPreprocessor { object_store: Arc, data_dir: Path, - data_file_stem: String, fragment_id: u32, local_counter: u32, field_ids: Vec, @@ -90,14 +89,12 @@ impl BlobPreprocessor { pub(crate) fn new( object_store: Arc, data_dir: Path, - data_file_stem: String, fragment_id: u32, field_ids: Vec, ) -> Self { Self { object_store, data_dir, - data_file_stem, fragment_id, local_counter: 0, field_ids, @@ -111,13 +108,7 @@ impl BlobPreprocessor { } async fn write_blob(&self, field_id: u32, blob_id: u32, data: &[u8]) -> Result { - let path = blob_path( - &self.data_dir, - &self.data_file_stem, - field_id, - blob_id, - &self.data_file_stem, - ); + let path = blob_path(&self.data_dir, self.fragment_id, field_id, blob_id); let mut writer = self.object_store.create(&path).await?; writer.write_all(data).await?; writer.shutdown().await?; @@ -288,6 +279,13 @@ impl BlobPreprocessor { } } +pub(crate) fn schema_has_blob_v2(schema: &Schema) -> bool { + schema + .fields + .iter() + .any(|f| f.is_blob() && matches!(f.data_type(), ArrowDataType::Struct(_))) +} + pub(crate) async fn preprocess_blob_batches( batches: &[RecordBatch], pre: &mut BlobPreprocessor, @@ -1050,13 +1048,16 @@ pub async fn open_writer_with_options( .iter() .map(|f| f.id as u32) .collect::>(); - let preprocessor = Some(BlobPreprocessor::new( - object_store.clone(), - data_dir.clone(), - filename.trim_end_matches(".lance").to_string(), - 0, - field_ids, - )); + let preprocessor = if schema_has_blob_v2(schema) { + Some(BlobPreprocessor::new( + object_store.clone(), + data_dir.clone(), + 0, + field_ids, + )) + } else { + None + }; let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index daf1b6fbf4b..44a8bcdcf7d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -887,7 +887,7 @@ impl MergeInsertJob { .data_storage_format .lance_file_version()?; let mut writer = open_writer( - &dataset.object_store_arc(), + &dataset.object_store, &write_schema, &dataset.base, data_storage_version, From 24f391dcd604ff97adef1a99b135d575f2ac2f9f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 18:37:03 +0800 Subject: [PATCH 03/17] Fix build Signed-off-by: Xuanwo --- rust/lance-core/src/utils/blob.rs | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/rust/lance-core/src/utils/blob.rs b/rust/lance-core/src/utils/blob.rs index 3b66c08d241..8dbdc9a68c3 100644 --- a/rust/lance-core/src/utils/blob.rs +++ b/rust/lance-core/src/utils/blob.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use object_store::path::Path; -use rand::RngCore; /// Directory name for blob sidecar files. pub const BLOB_SIDECAR_DIR: &str = "_blob"; @@ -18,30 +17,6 @@ pub fn blob_path(base: &Path, fragment_id: u32, field_id: u32, blob_id: u32) -> .child(file_name.as_str()) } -/// Generate a high-entropy prefix using the same pattern as data file names. -/// -/// Pattern: first 24 bits as binary, remaining 13 bytes as hex (26 chars). -pub fn generate_random_prefix() -> String { - let mut bytes = [0u8; 16]; - rand::rng().fill_bytes(&mut bytes); - - let mut out = String::with_capacity(50); - - for &b in &bytes[..3] { - for i in (0..8).rev() { - out.push(if (b >> i) & 1 == 1 { '1' } else { '0' }); - } - } - - const HEX: &[u8; 16] = b"0123456789abcdef"; - for &b in &bytes[3..] { - out.push(HEX[(b >> 4) as usize] as char); - out.push(HEX[(b & 0xf) as usize] as char); - } - - out -} - #[cfg(test)] mod tests { use super::*; From b3a6b1712a69ccaef6de8b4af8e84a380d2dcfd6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 18:40:46 +0800 Subject: [PATCH 04/17] remove not needed checks Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 49fd837b4e2..89f38dd2fab 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -12,7 +12,7 @@ use tokio::sync::Mutex; use super::take::TakeBuilder; use super::{Dataset, ProjectionRequest}; -use arrow_array::{Array, StructArray}; +use arrow_array::StructArray; use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::utils::blob::blob_path; use lance_core::{utils::address::RowAddress, Error, Result}; @@ -378,13 +378,6 @@ async fn collect_blob_files_v2( )); } BlobKind::Dedicated => { - if blob_ids.is_null(idx) || sizes.is_null(idx) { - return Err(Error::corrupt_file( - dataset.data_dir(), - "Missing blob_id or size for dedicated blob", - location!(), - )); - } let blob_id = blob_ids.value(idx); let size = sizes.value(idx); let frag_id = RowAddress::from(*row_addr).fragment_id(); From 438d3d442879d59a76edf1ff3b0e590e335c1c42 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 18:49:35 +0800 Subject: [PATCH 05/17] refactor Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 21 +++++++++++++++++++++ rust/lance/src/dataset/write.rs | 5 +---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index cd6377098f0..e96f29efa6a 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -508,6 +508,14 @@ impl Field { .unwrap_or(false) } + /// Returns true if the field is explicitly marked as blob v2 extension. + pub fn is_blob_v2(&self) -> bool { + self.metadata + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BLOB_V2_EXT_NAME) + .unwrap_or(false) + } + /// If the field is a blob, return a new field with the same name and id /// but with the data type set to a struct of the blob description fields. /// @@ -1562,6 +1570,19 @@ mod tests { assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } + #[test] + fn blob_v2_detection_by_extension() { + let metadata = HashMap::from([ + (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), + (BLOB_META_KEY.to_string(), "true".to_string()), + ]); + let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) + .with_metadata(metadata) + .try_into() + .unwrap(); + assert!(field.is_blob_v2()); + } + #[test] fn blob_extension_roundtrip() { let metadata = HashMap::from([ diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index c6a4f60cfa4..dac57ce0f54 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -279,10 +279,7 @@ impl BlobPreprocessor { } pub(super) fn schema_has_blob_v2(schema: &Schema) -> bool { - schema - .fields - .iter() - .any(|f| f.is_blob() && matches!(f.data_type(), ArrowDataType::Struct(_))) + schema.fields.iter().any(|f| f.is_blob_v2()) } pub(super) async fn preprocess_blob_batches( From d949ed9e46252b7c889ed4e7467a21a66d2e12bf Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 19:04:44 +0800 Subject: [PATCH 06/17] refactor blob logic Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 220 ++++++++++++++++++++++ rust/lance/src/dataset/fragment/write.rs | 5 +- rust/lance/src/dataset/write.rs | 226 +---------------------- 3 files changed, 224 insertions(+), 227 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 89f38dd2fab..76b0ba6526d 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -5,9 +5,14 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; +use arrow_array::builder::{LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder}; +use arrow_array::Array; +use arrow_array::RecordBatch; +use arrow_schema::DataType as ArrowDataType; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use object_store::path::Path; use snafu::location; +use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use super::take::TakeBuilder; @@ -27,6 +32,221 @@ pub fn blob_version_from_config(config: &HashMap) -> BlobVersion .unwrap_or(BlobVersion::V1) } +const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; + +pub struct BlobPreprocessor { + object_store: Arc, + data_dir: Path, + fragment_id: u32, + local_counter: u32, + field_ids: Vec, +} + +impl BlobPreprocessor { + pub(crate) fn new( + object_store: Arc, + data_dir: Path, + fragment_id: u32, + field_ids: Vec, + ) -> Self { + Self { + object_store, + data_dir, + fragment_id, + local_counter: 0, + field_ids, + } + } + + fn next_blob_id(&mut self) -> u32 { + let id = (self.fragment_id << 16) | self.local_counter; + self.local_counter = self.local_counter.wrapping_add(1); + id + } + + async fn write_blob(&self, field_id: u32, blob_id: u32, data: &[u8]) -> Result { + let path = blob_path(&self.data_dir, self.fragment_id, field_id, blob_id); + let mut writer = self.object_store.create(&path).await?; + writer.write_all(data).await?; + writer.shutdown().await?; + Ok(path) + } + + async fn delete_blob(&self, path: &Path) { + let _ = self.object_store.delete(path).await; + } + + pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { + let mut new_columns = Vec::with_capacity(batch.num_columns()); + let mut new_fields = Vec::with_capacity(batch.num_columns()); + let mut written_paths = Vec::new(); + + for (col_idx, (array, field)) in batch + .columns() + .iter() + .zip(batch.schema().fields()) + .enumerate() + { + let is_blob_struct = matches!(field.data_type(), ArrowDataType::Struct(_)) + && field.metadata().get(lance_arrow::BLOB_META_KEY).is_some(); + + if !is_blob_struct { + new_columns.push(array.clone()); + new_fields.push(field.clone()); + continue; + } + + let struct_arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::invalid_input("Blob column was not a struct array", location!()) + })?; + + let ArrowDataType::Struct(fields) = field.data_type() else { + unreachable!(); + }; + + let mut data_idx = None; + let mut uri_idx = None; + for (idx, child) in fields.iter().enumerate() { + match child.name().as_str() { + "data" => data_idx = Some(idx), + "uri" => uri_idx = Some(idx), + _ => {} + } + } + + let data_idx = data_idx.ok_or_else(|| { + Error::invalid_input("Blob struct missing `data` field", location!()) + })?; + let uri_idx = uri_idx.ok_or_else(|| { + Error::invalid_input("Blob struct missing `uri` field", location!()) + })?; + + let data_col = struct_arr.column(data_idx).as_binary::(); + let uri_col = struct_arr.column(uri_idx).as_string::(); + + let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); + let mut uri_builder = LargeStringBuilder::with_capacity(struct_arr.len(), 0); + let mut blob_id_builder = + PrimitiveBuilder::::with_capacity(struct_arr.len()); + let mut blob_size_builder = + PrimitiveBuilder::::with_capacity(struct_arr.len()); + + let struct_nulls = struct_arr.nulls(); + + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + continue; + } + + let has_data = !data_col.is_null(i); + let has_uri = !uri_col.is_null(i); + + if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { + let blob_id = self.next_blob_id(); + let field_id = *self.field_ids.get(col_idx).unwrap_or(&(col_idx as u32)); + match self.write_blob(field_id, blob_id, data_col.value(i)).await { + Ok(path) => written_paths.push(path), + Err(err) => { + for path in &written_paths { + self.delete_blob(path).await; + } + return Err(err); + } + } + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(blob_id); + blob_size_builder.append_value(data_col.value(i).len() as u64); + continue; + } + + if has_uri { + let uri_val = uri_col.value(i); + data_builder.append_null(); + uri_builder.append_value(uri_val); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + continue; + } + + if has_data { + let value = data_col.value(i); + data_builder.append_value(value); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + } else { + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + } + } + + let child_fields = vec![ + arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true), + arrow_schema::Field::new("uri", ArrowDataType::Utf8, true), + arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), + arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true), + ]; + + let struct_array = arrow_array::StructArray::try_new( + child_fields.clone().into(), + vec![ + Arc::new(data_builder.finish()), + Arc::new(uri_builder.finish()), + Arc::new(blob_id_builder.finish()), + Arc::new(blob_size_builder.finish()), + ], + struct_nulls.cloned(), + )?; + + new_columns.push(Arc::new(struct_array)); + new_fields.push(Arc::new( + arrow_schema::Field::new( + field.name(), + ArrowDataType::Struct(child_fields.into()), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + )); + } + + let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata( + new_fields + .iter() + .map(|f| f.as_ref().clone()) + .collect::>(), + batch.schema().metadata().clone(), + )); + + RecordBatch::try_new(new_schema, new_columns) + .map_err(|e| Error::invalid_input(e.to_string(), location!())) + } +} + +pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool { + schema.fields.iter().any(|f| f.is_blob_v2()) +} + +pub async fn preprocess_blob_batches( + batches: &[RecordBatch], + pre: &mut BlobPreprocessor, +) -> Result> { + let mut out = Vec::with_capacity(batches.len()); + for batch in batches { + out.push(pre.preprocess_batch(batch).await?); + } + Ok(out) +} + /// Current state of the reader. Held in a mutex for easy sharing /// /// The u64 is the cursor in the file that the reader is currently at diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 0254ed243f7..afb3005edeb 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -18,10 +18,9 @@ use snafu::location; use std::borrow::Cow; use uuid::Uuid; +use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; use crate::dataset::builder::DatasetBuilder; -use crate::dataset::write::{ - do_write_fragments, preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor, -}; +use crate::dataset::write::do_write_fragments; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; use crate::Result; diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index dac57ce0f54..2872da6f578 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1,10 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use arrow_array::{ - builder::LargeBinaryBuilder, builder::LargeStringBuilder, cast::AsArray, Array, RecordBatch, -}; -use arrow_schema::DataType as ArrowDataType; +use arrow_array::RecordBatch; use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; @@ -13,7 +10,6 @@ use lance_core::datatypes::{ BlobVersion, NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, }; use lance_core::error::LanceOptionExt; -use lance_core::utils::blob::blob_path; use lance_core::utils::tempfile::TempDir; use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_DATA, TRACE_FILE_AUDIT}; use lance_core::{datatypes::Schema, Error, Result}; @@ -35,9 +31,9 @@ use std::collections::HashMap; use std::num::NonZero; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use tokio::io::AsyncWriteExt; use tracing::{info, instrument}; +use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; use crate::session::Session; use crate::Dataset; @@ -75,224 +71,6 @@ pub enum WriteDestination<'a> { Uri(&'a str), } -const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; - -pub(super) struct BlobPreprocessor { - object_store: Arc, - data_dir: Path, - fragment_id: u32, - local_counter: u32, - field_ids: Vec, -} - -impl BlobPreprocessor { - pub(super) fn new( - object_store: Arc, - data_dir: Path, - fragment_id: u32, - field_ids: Vec, - ) -> Self { - Self { - object_store, - data_dir, - fragment_id, - local_counter: 0, - field_ids, - } - } - - fn next_blob_id(&mut self) -> u32 { - let id = (self.fragment_id << 16) | self.local_counter; - self.local_counter = self.local_counter.wrapping_add(1); - id - } - - async fn write_blob(&self, field_id: u32, blob_id: u32, data: &[u8]) -> Result { - let path = blob_path(&self.data_dir, self.fragment_id, field_id, blob_id); - let mut writer = self.object_store.create(&path).await?; - writer.write_all(data).await?; - writer.shutdown().await?; - Ok(path) - } - - async fn delete_blob(&self, path: &Path) { - let _ = self.object_store.delete(path).await; - } - - async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { - let mut new_columns = Vec::with_capacity(batch.num_columns()); - let mut new_fields = Vec::with_capacity(batch.num_columns()); - let mut written_paths = Vec::new(); - - for (col_idx, (array, field)) in batch - .columns() - .iter() - .zip(batch.schema().fields()) - .enumerate() - { - let is_blob_struct = matches!(field.data_type(), ArrowDataType::Struct(_)) - && field.metadata().get(lance_arrow::BLOB_META_KEY).is_some(); - - if !is_blob_struct { - new_columns.push(array.clone()); - new_fields.push(field.clone()); - continue; - } - - let struct_arr = array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::invalid_input("Blob column was not a struct array", location!()) - })?; - - let ArrowDataType::Struct(fields) = field.data_type() else { - unreachable!(); - }; - - let mut data_idx = None; - let mut uri_idx = None; - for (idx, child) in fields.iter().enumerate() { - match child.name().as_str() { - "data" => data_idx = Some(idx), - "uri" => uri_idx = Some(idx), - _ => {} - } - } - - let data_idx = data_idx.ok_or_else(|| { - Error::invalid_input("Blob struct missing `data` field", location!()) - })?; - let uri_idx = uri_idx.ok_or_else(|| { - Error::invalid_input("Blob struct missing `uri` field", location!()) - })?; - - let data_col = struct_arr.column(data_idx).as_binary::(); - let uri_col = struct_arr.column(uri_idx).as_string::(); - - let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); - let mut uri_builder = LargeStringBuilder::with_capacity(struct_arr.len(), 0); - let mut blob_id_builder = arrow_array::builder::PrimitiveBuilder::< - arrow_array::types::UInt32Type, - >::with_capacity(struct_arr.len()); - let mut blob_size_builder = arrow_array::builder::PrimitiveBuilder::< - arrow_array::types::UInt64Type, - >::with_capacity(struct_arr.len()); - - let struct_nulls = struct_arr.nulls(); - - for i in 0..struct_arr.len() { - if struct_arr.is_null(i) { - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - continue; - } - - let has_data = !data_col.is_null(i); - let has_uri = !uri_col.is_null(i); - - if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { - let blob_id = self.next_blob_id(); - let field_id = *self.field_ids.get(col_idx).unwrap_or(&(col_idx as u32)); - match self.write_blob(field_id, blob_id, data_col.value(i)).await { - Ok(path) => written_paths.push(path), - Err(err) => { - for path in &written_paths { - self.delete_blob(path).await; - } - return Err(err); - } - } - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_value(blob_id); - blob_size_builder.append_value(data_col.value(i).len() as u64); - continue; - } - - if has_uri { - let uri_val = uri_col.value(i); - data_builder.append_null(); - uri_builder.append_value(uri_val); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - continue; - } - - if has_data { - let value = data_col.value(i); - data_builder.append_value(value); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - } else { - // row null or missing - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - } - } - - let child_fields = vec![ - arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true), - arrow_schema::Field::new("uri", ArrowDataType::Utf8, true), - arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), - arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true), - ]; - - let struct_array = arrow_array::StructArray::try_new( - child_fields.clone().into(), - vec![ - Arc::new(data_builder.finish()), - Arc::new(uri_builder.finish()), - Arc::new(blob_id_builder.finish()), - Arc::new(blob_size_builder.finish()), - ], - struct_nulls.cloned(), - )?; - - new_columns.push(Arc::new(struct_array)); - new_fields.push(Arc::new( - arrow_schema::Field::new( - field.name(), - ArrowDataType::Struct(child_fields.into()), - field.is_nullable(), - ) - .with_metadata(field.metadata().clone()), - )); - } - - let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata( - new_fields - .iter() - .map(|f| f.as_ref().clone()) - .collect::>(), - batch.schema().metadata().clone(), - )); - - RecordBatch::try_new(new_schema, new_columns) - .map_err(|e| Error::invalid_input(e.to_string(), location!())) - } -} - -pub(super) fn schema_has_blob_v2(schema: &Schema) -> bool { - schema.fields.iter().any(|f| f.is_blob_v2()) -} - -pub(super) async fn preprocess_blob_batches( - batches: &[RecordBatch], - pre: &mut BlobPreprocessor, -) -> Result> { - let mut out = Vec::with_capacity(batches.len()); - for batch in batches { - out.push(pre.preprocess_batch(batch).await?); - } - Ok(out) -} - impl WriteDestination<'_> { pub fn dataset(&self) -> Option<&Dataset> { match self { From e0d03d84dadc683fb78aa0e926679a9c19d1c01f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 19:10:12 +0800 Subject: [PATCH 07/17] refactor Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 4 ++-- rust/lance/src/dataset/fragment/write.rs | 2 +- rust/lance/src/dataset/write.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 76b0ba6526d..cb97913b36d 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -35,7 +35,7 @@ pub fn blob_version_from_config(config: &HashMap) -> BlobVersion const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; pub struct BlobPreprocessor { - object_store: Arc, + object_store: ObjectStore, data_dir: Path, fragment_id: u32, local_counter: u32, @@ -44,7 +44,7 @@ pub struct BlobPreprocessor { impl BlobPreprocessor { pub(crate) fn new( - object_store: Arc, + object_store: ObjectStore, data_dir: Path, fragment_id: u32, field_ids: Vec, diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index afb3005edeb..64d5065a457 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -156,7 +156,7 @@ impl<'a> FragmentCreateBuilder<'a> { .collect::>(); let mut preprocessor = if has_blob_v2 { Some(BlobPreprocessor::new( - object_store.clone(), + object_store.as_ref().clone(), base_path.child(DATA_DIR), id as u32, field_ids, diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 2872da6f578..5929b1e4336 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -822,7 +822,7 @@ pub async fn open_writer_with_options( .collect::>(); let preprocessor = if schema_has_blob_v2(schema) { Some(BlobPreprocessor::new( - object_store.clone(), + object_store.as_ref().clone(), data_dir.clone(), 0, field_ids, From b73d3b3fc6939f58cccc2be4a900ec5251d667ef Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 19:12:16 +0800 Subject: [PATCH 08/17] Remove not needed comment Signed-off-by: Xuanwo --- rust/lance-encoding/src/encodings/logical/blob.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 567bd025f3e..5f91c3b278d 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -317,7 +317,6 @@ impl FieldEncoder for BlobV2StructuralEncoder { let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0); for i in 0..struct_arr.len() { - // Schema is expected to be non-nullable; still handle null defensively. if struct_arr.is_null(i) { kind_builder.append_value(BlobKind::Inline as u8); position_builder.append_value(0); From 151e4de7070c6c15827f9fcfb46ee872477dea44 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 19:35:39 +0800 Subject: [PATCH 09/17] Fix build Signed-off-by: Xuanwo --- rust/lance/src/dataset/write.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 5929b1e4336..0c5b1ec9281 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -767,7 +767,7 @@ impl GenericWriter for V2WriterAdapter { } pub async fn open_writer( - object_store: &Arc, + object_store: &ObjectStore, schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, @@ -776,7 +776,7 @@ pub async fn open_writer( } pub async fn open_writer_with_options( - object_store: &Arc, + object_store: &ObjectStore, schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, @@ -822,7 +822,7 @@ pub async fn open_writer_with_options( .collect::>(); let preprocessor = if schema_has_blob_v2(schema) { Some(BlobPreprocessor::new( - object_store.as_ref().clone(), + object_store.clone(), data_dir.clone(), 0, field_ids, From b8a07d385f0c52e751a09c340ba3245d3547cd29 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 22:25:28 +0800 Subject: [PATCH 10/17] Fix build Signed-off-by: Xuanwo --- .../src/encodings/logical/blob.rs | 183 ++++++++---------- rust/lance/src/dataset/blob.rs | 26 +-- 2 files changed, 91 insertions(+), 118 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 5f91c3b278d..7e881998be3 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -272,118 +272,91 @@ impl FieldEncoder for BlobV2StructuralEncoder { row_number: u64, num_rows: u64, ) -> Result> { - // Supported input: Struct - let DataType::Struct(fields) = array.data_type() else { - return Err(Error::InvalidInput { - source: "Blob v2 requires struct input".into(), - location: location!(), - }); - }; - let struct_arr = array.as_struct(); if let Some(validity) = struct_arr.nulls() { repdef.add_validity_bitmap(validity.clone()); } else { repdef.add_no_null(struct_arr.len()); } - let mut data_idx = None; - let mut uri_idx = None; - let mut blob_id_idx = None; - let mut blob_size_idx = None; - for (idx, field) in fields.iter().enumerate() { - match field.name().as_str() { - "data" => data_idx = Some(idx), - "uri" => uri_idx = Some(idx), - "blob_id" => blob_id_idx = Some(idx), - "blob_size" => blob_size_idx = Some(idx), - _ => {} - } - } - let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput { - source: "Blob v2 struct must contain 'data' and 'uri' fields".into(), - location: location!(), - })?; - - let data_col = struct_arr.column(data_idx).as_binary::(); - let uri_col = struct_arr.column(uri_idx).as_string::(); - let blob_id_col = blob_id_idx.map(|i| struct_arr.column(i).as_primitive::()); - let blob_size_col = - blob_size_idx.map(|i| struct_arr.column(i).as_primitive::()); - - let mut kind_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); - let mut position_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); - let mut size_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); - let mut blob_id_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); - let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0); - - for i in 0..struct_arr.len() { - if struct_arr.is_null(i) { - kind_builder.append_value(BlobKind::Inline as u8); - position_builder.append_value(0); - size_builder.append_value(0); - blob_id_builder.append_value(0); - uri_builder.append_value(""); - continue; - } - - let has_blob_id = blob_id_col - .as_ref() - .map(|col| col.is_valid(i)) - .unwrap_or(false); - let has_blob_size = blob_size_col - .as_ref() - .map(|col| col.is_valid(i)) - .unwrap_or(false); - - if has_blob_id || has_blob_size { - if !(has_blob_id && has_blob_size) { - return Err(Error::InvalidInput { - source: "blob_id and blob_size must both be set for dedicated blobs".into(), - location: location!(), - }); - } - let blob_id = blob_id_col.as_ref().unwrap().value(i); - let blob_size = blob_size_col.as_ref().unwrap().value(i); - kind_builder.append_value(BlobKind::Dedicated as u8); - position_builder.append_value(0); - size_builder.append_value(blob_size); - blob_id_builder.append_value(blob_id); - uri_builder.append_value(""); - continue; - } - let data_is_set = !data_col.is_null(i); - let uri_is_set = !uri_col.is_null(i); - if data_is_set == uri_is_set { - return Err(Error::InvalidInput { - source: "Each blob row must set exactly one of data or uri".into(), - location: location!(), - }); - } - - if uri_is_set { - let uri_val = uri_col.value(i); - kind_builder.append_value(BlobKind::External as u8); - position_builder.append_value(0); - size_builder.append_value(0); - blob_id_builder.append_value(0); - uri_builder.append_value(uri_val); - continue; - } - - let value = data_col.value(i); - kind_builder.append_value(BlobKind::Inline as u8); - - if value.is_empty() { - position_builder.append_value(0); - size_builder.append_value(0); - } else { - let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); - position_builder.append_value(position); - size_builder.append_value(value.len() as u64); - } - blob_id_builder.append_value(0); - uri_builder.append_value(""); + let kind_col = struct_arr + .column_by_name("kind") + .expect("kind column must exist") + .as_primitive::(); + let data_col = struct_arr + .column_by_name("data") + .expect("data column must exist") + .as_binary::(); + let uri_col = struct_arr + .column_by_name("uri") + .expect("uri column must exist") + .as_string::(); + let blob_id_col = struct_arr + .column_by_name("blob_id") + .expect("blob_id column must exist") + .as_primitive::(); + let blob_size_col = struct_arr + .column_by_name("blob_size") + .expect("blob_size column must exist") + .as_primitive::(); + + let row_count = struct_arr.len(); + + let mut kind_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut position_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut size_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16); + + for i in 0..row_count { + let (kind_value, position_value, size_value, blob_id_value, uri_value) = + if struct_arr.is_null(i) || kind_col.is_null(i) { + (BlobKind::Inline as u8, 0, 0, 0, "".to_string()) + } else { + let kind_val = BlobKind::try_from(kind_col.value(i))?; + match kind_val { + BlobKind::Dedicated => ( + BlobKind::Dedicated as u8, + 0, + blob_size_col.value(i), + blob_id_col.value(i), + "".to_string(), + ), + BlobKind::External => ( + BlobKind::External as u8, + 0, + 0, + 0, + uri_col.value(i).to_string(), + ), + BlobKind::Inline => { + let data_val = data_col.value(i); + let blob_len = data_val.len() as u64; + let position = external_buffers + .add_buffer(LanceBuffer::from(Buffer::from(data_val))); + + ( + BlobKind::Inline as u8, + position, + blob_len, + 0, + "".to_string(), + ) + } + BlobKind::Packed => { + return Err(Error::InvalidInput { + source: "Packed blob kind is not supported for v2 encoder".into(), + location: location!(), + }); + } + } + }; + + kind_builder.append_value(kind_value); + position_builder.append_value(position_value); + size_builder.append_value(size_value); + blob_id_builder.append_value(blob_id_value); + uri_builder.append_value(uri_value); } let children: Vec = vec![ Arc::new(kind_builder.finish()), diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index cb97913b36d..b0c2b115643 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -72,10 +72,6 @@ impl BlobPreprocessor { Ok(path) } - async fn delete_blob(&self, path: &Path) { - let _ = self.object_store.delete(path).await; - } - pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { let mut new_columns = Vec::with_capacity(batch.num_columns()); let mut new_fields = Vec::with_capacity(batch.num_columns()); @@ -133,6 +129,7 @@ impl BlobPreprocessor { PrimitiveBuilder::::with_capacity(struct_arr.len()); let mut blob_size_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); + let mut kind_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); let struct_nulls = struct_arr.nulls(); @@ -142,6 +139,7 @@ impl BlobPreprocessor { uri_builder.append_null(); blob_id_builder.append_null(); blob_size_builder.append_null(); + kind_builder.append_null(); continue; } @@ -151,15 +149,12 @@ impl BlobPreprocessor { if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { let blob_id = self.next_blob_id(); let field_id = *self.field_ids.get(col_idx).unwrap_or(&(col_idx as u32)); - match self.write_blob(field_id, blob_id, data_col.value(i)).await { - Ok(path) => written_paths.push(path), - Err(err) => { - for path in &written_paths { - self.delete_blob(path).await; - } - return Err(err); - } - } + let path = self + .write_blob(field_id, blob_id, data_col.value(i)) + .await?; + written_paths.push(path); + + kind_builder.append_value(BlobKind::Dedicated as u8); data_builder.append_null(); uri_builder.append_null(); blob_id_builder.append_value(blob_id); @@ -169,6 +164,7 @@ impl BlobPreprocessor { if has_uri { let uri_val = uri_col.value(i); + kind_builder.append_value(BlobKind::External as u8); data_builder.append_null(); uri_builder.append_value(uri_val); blob_id_builder.append_null(); @@ -177,6 +173,7 @@ impl BlobPreprocessor { } if has_data { + kind_builder.append_value(BlobKind::Inline as u8); let value = data_col.value(i); data_builder.append_value(value); uri_builder.append_null(); @@ -187,10 +184,12 @@ impl BlobPreprocessor { uri_builder.append_null(); blob_id_builder.append_null(); blob_size_builder.append_null(); + kind_builder.append_null(); } } let child_fields = vec![ + arrow_schema::Field::new("kind", ArrowDataType::UInt8, true), arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true), arrow_schema::Field::new("uri", ArrowDataType::Utf8, true), arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), @@ -200,6 +199,7 @@ impl BlobPreprocessor { let struct_array = arrow_array::StructArray::try_new( child_fields.clone().into(), vec![ + Arc::new(kind_builder.finish()), Arc::new(data_builder.finish()), Arc::new(uri_builder.finish()), Arc::new(blob_id_builder.finish()), From 51816ed2381282964607c101dd987a30c46af212 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 23:19:14 +0800 Subject: [PATCH 11/17] Fix existsing tests Signed-off-by: Xuanwo --- rust/lance-encoding/src/encodings/logical/blob.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 7e881998be3..86d8e1b748c 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -501,19 +501,32 @@ mod tests { let blob_metadata = HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); + let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + let kind_array = UInt8Array::from(vec![ + BlobKind::Inline as u8, + BlobKind::External as u8, + BlobKind::External as u8, + ]); let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]); let uri_array = StringArray::from(vec![ None, Some("file:///tmp/external.bin"), Some("s3://bucket/blob"), ]); + let blob_id_array = UInt32Array::from(vec![0, 0, 0]); + let blob_size_array = UInt64Array::from(vec![0, 0, 0]); let struct_array = StructArray::from(vec![ + (kind_field, Arc::new(kind_array) as ArrayRef), (data_field, Arc::new(data_array) as ArrayRef), (uri_field, Arc::new(uri_array) as ArrayRef), + (blob_id_field, Arc::new(blob_id_array) as ArrayRef), + (blob_size_field, Arc::new(blob_size_array) as ArrayRef), ]); let expected_descriptor = StructArray::from(vec![ From 6f522ba8509ecadd34580924975b4171ed16082c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 4 Dec 2025 23:43:10 +0800 Subject: [PATCH 12/17] Add tests Signed-off-by: Xuanwo --- .../src/encodings/logical/blob.rs | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 86d8e1b748c..dded1044267 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -568,4 +568,64 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_blob_v2_dedicated_round_trip() { + let blob_metadata = + HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + + let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); + let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); + let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); + let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + + let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]); + let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]); + let uri_array = StringArray::from(vec![Option::<&str>::None, None]); + let blob_id_array = UInt32Array::from(vec![42, 0]); + let blob_size_array = UInt64Array::from(vec![12, 0]); + + let struct_array = StructArray::from(vec![ + (kind_field, Arc::new(kind_array) as ArrayRef), + (data_field, Arc::new(data_array) as ArrayRef), + (uri_field, Arc::new(uri_array) as ArrayRef), + (blob_id_field, Arc::new(blob_id_array) as ArrayRef), + (blob_size_field, Arc::new(blob_size_array) as ArrayRef), + ]); + + let expected_descriptor = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("kind", DataType::UInt8, false)), + Arc::new(UInt8Array::from(vec![ + BlobKind::Dedicated as u8, + BlobKind::Inline as u8, + ])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("position", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("size", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![12, 3])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)), + Arc::new(UInt32Array::from(vec![42, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), + Arc::new(StringArray::from(vec!["", ""])) as ArrayRef, + ), + ]); + + check_round_trip_encoding_of_data_with_expected( + vec![Arc::new(struct_array)], + Some(Arc::new(expected_descriptor)), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + blob_metadata, + ) + .await; + } } From 55d645ef9defb6b091e1501d5c173f765ff50ddf Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 5 Dec 2025 00:01:20 +0800 Subject: [PATCH 13/17] Polish blob id Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index b0c2b115643..fe949ba25d1 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -53,13 +53,14 @@ impl BlobPreprocessor { object_store, data_dir, fragment_id, - local_counter: 0, + // Start at 1 to avoid a potential all-zero blob_id value. + local_counter: 1, field_ids, } } fn next_blob_id(&mut self) -> u32 { - let id = (self.fragment_id << 16) | self.local_counter; + let id = self.local_counter; self.local_counter = self.local_counter.wrapping_add(1); id } From f24739ba0b207c5f3c56cb86d07894407534771c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 5 Dec 2025 00:08:38 +0800 Subject: [PATCH 14/17] polish Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 42 ++++++++++++++-------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index fe949ba25d1..507a2bde63a 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -84,8 +84,11 @@ impl BlobPreprocessor { .zip(batch.schema().fields()) .enumerate() { - let is_blob_struct = matches!(field.data_type(), ArrowDataType::Struct(_)) - && field.metadata().get(lance_arrow::BLOB_META_KEY).is_some(); + let is_blob_struct = field + .metadata() + .get(lance_arrow::ARROW_EXT_NAME_KEY) + .map(|v| v == lance_arrow::BLOB_V2_EXT_NAME) + .unwrap_or(false); if !is_blob_struct { new_columns.push(array.clone()); @@ -100,29 +103,18 @@ impl BlobPreprocessor { Error::invalid_input("Blob column was not a struct array", location!()) })?; - let ArrowDataType::Struct(fields) = field.data_type() else { - unreachable!(); - }; - - let mut data_idx = None; - let mut uri_idx = None; - for (idx, child) in fields.iter().enumerate() { - match child.name().as_str() { - "data" => data_idx = Some(idx), - "uri" => uri_idx = Some(idx), - _ => {} - } - } - - let data_idx = data_idx.ok_or_else(|| { - Error::invalid_input("Blob struct missing `data` field", location!()) - })?; - let uri_idx = uri_idx.ok_or_else(|| { - Error::invalid_input("Blob struct missing `uri` field", location!()) - })?; - - let data_col = struct_arr.column(data_idx).as_binary::(); - let uri_col = struct_arr.column(uri_idx).as_string::(); + let data_col = struct_arr + .column_by_name("data") + .ok_or_else(|| { + Error::invalid_input("Blob struct missing `data` field", location!()) + })? + .as_binary::(); + let uri_col = struct_arr + .column_by_name("uri") + .ok_or_else(|| { + Error::invalid_input("Blob struct missing `uri` field", location!()) + })? + .as_string::(); let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); let mut uri_builder = LargeStringBuilder::with_capacity(struct_arr.len(), 0); From 0289d8df7ccccd008e02fa37097c57288f43d140 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 5 Dec 2025 13:59:24 +0800 Subject: [PATCH 15/17] use better logic for blob path Signed-off-by: Xuanwo --- rust/lance-core/src/utils/blob.rs | 23 ++++----- rust/lance/src/dataset/blob.rs | 59 ++++++++++++------------ rust/lance/src/dataset/fragment/write.rs | 11 ++--- rust/lance/src/dataset/write.rs | 11 ++--- 4 files changed, 43 insertions(+), 61 deletions(-) diff --git a/rust/lance-core/src/utils/blob.rs b/rust/lance-core/src/utils/blob.rs index 8dbdc9a68c3..06cbeb43960 100644 --- a/rust/lance-core/src/utils/blob.rs +++ b/rust/lance-core/src/utils/blob.rs @@ -3,18 +3,14 @@ use object_store::path::Path; -/// Directory name for blob sidecar files. -pub const BLOB_SIDECAR_DIR: &str = "_blob"; - -/// Format a dedicated blob sidecar path. +/// Format a dedicated blob sidecar path for a data file. /// -/// Layout: `_blob///.raw` -pub fn blob_path(base: &Path, fragment_id: u32, field_id: u32, blob_id: u32) -> Path { +/// Layout: `//.raw` +/// - `base` is typically the dataset's data directory. +/// - `data_file_key` is the stem of the data file (without extension). +pub fn blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path { let file_name = format!("{:08x}.raw", blob_id); - base.child(BLOB_SIDECAR_DIR) - .child(format!("{:08x}", fragment_id)) - .child(format!("{:08x}", field_id)) - .child(file_name.as_str()) + base.child(data_file_key).child(file_name.as_str()) } #[cfg(test)] @@ -24,10 +20,7 @@ mod tests { #[test] fn test_blob_path_formatting() { let base = Path::from("base"); - let path = blob_path(&base, 0x10, 1, 2); - assert_eq!( - path.to_string(), - "base/_blob/00000010/00000001/00000002.raw" - ); + let path = blob_path(&base, "deadbeef", 2); + assert_eq!(path.to_string(), "base/deadbeef/00000002.raw"); } } diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 507a2bde63a..cbdc63709bd 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -37,25 +37,18 @@ const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; pub struct BlobPreprocessor { object_store: ObjectStore, data_dir: Path, - fragment_id: u32, + data_file_key: String, local_counter: u32, - field_ids: Vec, } impl BlobPreprocessor { - pub(crate) fn new( - object_store: ObjectStore, - data_dir: Path, - fragment_id: u32, - field_ids: Vec, - ) -> Self { + pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self { Self { object_store, data_dir, - fragment_id, + data_file_key, // Start at 1 to avoid a potential all-zero blob_id value. local_counter: 1, - field_ids, } } @@ -65,8 +58,8 @@ impl BlobPreprocessor { id } - async fn write_blob(&self, field_id: u32, blob_id: u32, data: &[u8]) -> Result { - let path = blob_path(&self.data_dir, self.fragment_id, field_id, blob_id); + async fn write_blob(&self, blob_id: u32, data: &[u8]) -> Result { + let path = blob_path(&self.data_dir, &self.data_file_key, blob_id); let mut writer = self.object_store.create(&path).await?; writer.write_all(data).await?; writer.shutdown().await?; @@ -76,14 +69,8 @@ impl BlobPreprocessor { pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { let mut new_columns = Vec::with_capacity(batch.num_columns()); let mut new_fields = Vec::with_capacity(batch.num_columns()); - let mut written_paths = Vec::new(); - for (col_idx, (array, field)) in batch - .columns() - .iter() - .zip(batch.schema().fields()) - .enumerate() - { + for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { let is_blob_struct = field .metadata() .get(lance_arrow::ARROW_EXT_NAME_KEY) @@ -141,11 +128,7 @@ impl BlobPreprocessor { if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { let blob_id = self.next_blob_id(); - let field_id = *self.field_ids.get(col_idx).unwrap_or(&(col_idx as u32)); - let path = self - .write_blob(field_id, blob_id, data_col.value(i)) - .await?; - written_paths.push(path); + self.write_blob(blob_id, data_col.value(i)).await?; kind_builder.append_value(BlobKind::Dedicated as u8); data_builder.append_null(); @@ -601,12 +584,15 @@ async fn collect_blob_files_v2( message: "Fragment not found".to_string(), location: location!(), })?; - frag.data_file_for_field(blob_field_id) - .ok_or_else(|| Error::Internal { - message: "Data file not found for blob field".to_string(), - location: location!(), - })?; - let path = blob_path(&dataset.data_dir(), frag_id, blob_field_id, blob_id); + let data_file = + frag.data_file_for_field(blob_field_id) + .ok_or_else(|| Error::Internal { + message: "Data file not found for blob field".to_string(), + location: location!(), + })?; + + let data_file_key = data_file_key_from_path(data_file.path.as_str()); + let path = blob_path(&dataset.data_dir(), data_file_key, blob_id); files.push(BlobFile::new_dedicated(dataset.clone(), path, size)); } BlobKind::External => { @@ -632,6 +618,11 @@ async fn collect_blob_files_v2( Ok(files) } +fn data_file_key_from_path(path: &str) -> &str { + let filename = path.rsplit('/').next().unwrap_or(path); + filename.strip_suffix(".lance").unwrap_or(filename) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -646,6 +637,7 @@ mod tests { use lance_datagen::{array, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; + use super::data_file_key_from_path; use crate::{utils::test::TestDatasetGenerator, Dataset}; struct BlobTestFixture { @@ -899,4 +891,11 @@ mod tests { .unwrap(); assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items"); } + + #[test] + fn test_data_file_key_from_path() { + assert_eq!(data_file_key_from_path("data/abc.lance"), "abc"); + assert_eq!(data_file_key_from_path("abc.lance"), "abc"); + assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz"); + } } diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 64d5065a457..0ab7dd15d21 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -135,7 +135,8 @@ impl<'a> FragmentCreateBuilder<'a> { ¶ms.store_params.clone().unwrap_or_default(), ) .await?; - let filename = format!("{}.lance", generate_random_filename()); + let data_file_key = generate_random_filename(); + let filename = format!("{}.lance", data_file_key); let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); let has_blob_v2 = schema_has_blob_v2(&schema); @@ -149,17 +150,11 @@ impl<'a> FragmentCreateBuilder<'a> { }, )?; - let field_ids = writer - .field_id_to_column_indices() - .iter() - .map(|(id, _)| *id) - .collect::>(); let mut preprocessor = if has_blob_v2 { Some(BlobPreprocessor::new( object_store.as_ref().clone(), base_path.child(DATA_DIR), - id as u32, - field_ids, + data_file_key.clone(), )) } else { None diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 0c5b1ec9281..6f675dbcb6e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -783,7 +783,8 @@ pub async fn open_writer_with_options( add_data_dir: bool, base_id: Option, ) -> Result> { - let filename = format!("{}.lance", generate_random_filename()); + let data_file_key = generate_random_filename(); + let filename = format!("{}.lance", data_file_key); let data_dir = if add_data_dir { base_dir.child(DATA_DIR) @@ -815,17 +816,11 @@ pub async fn open_writer_with_options( ..Default::default() }, )?; - let field_ids = schema - .fields - .iter() - .map(|f| f.id as u32) - .collect::>(); let preprocessor = if schema_has_blob_v2(schema) { Some(BlobPreprocessor::new( object_store.clone(), data_dir.clone(), - 0, - field_ids, + data_file_key.clone(), )) } else { None From 026a12cc5f9ddaa840b6e099984a554a1f7ba966 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 5 Dec 2025 14:57:58 +0800 Subject: [PATCH 16/17] Fix uri should use utf-8 instead of large-utf8 Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index cbdc63709bd..7c7bd0aec5e 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; -use arrow_array::builder::{LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder}; +use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::Array; use arrow_array::RecordBatch; use arrow_schema::DataType as ArrowDataType; @@ -104,7 +104,7 @@ impl BlobPreprocessor { .as_string::(); let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); - let mut uri_builder = LargeStringBuilder::with_capacity(struct_arr.len(), 0); + let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0); let mut blob_id_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); let mut blob_size_builder = From 682fee50afbcf45276ead0a76563290d26d780e8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 6 Dec 2025 00:02:22 +0800 Subject: [PATCH 17/17] Address comments Signed-off-by: Xuanwo --- rust/lance-arrow/src/schema.rs | 11 +++++++++++ rust/lance/src/dataset/blob.rs | 16 ++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/rust/lance-arrow/src/schema.rs b/rust/lance-arrow/src/schema.rs index 16840a7a451..8ce9442b4e5 100644 --- a/rust/lance-arrow/src/schema.rs +++ b/rust/lance-arrow/src/schema.rs @@ -40,6 +40,9 @@ pub trait FieldExt { /// Check if the field is marked as a blob fn is_blob(&self) -> bool; + + /// Check if the field is marked as a blob + fn is_blob_v2(&self) -> bool; } impl FieldExt for Field { @@ -108,6 +111,14 @@ impl FieldExt for Field { .map(|value| value == BLOB_V2_EXT_NAME) .unwrap_or(false) } + + fn is_blob_v2(&self) -> bool { + let field_metadata = self.metadata(); + field_metadata + .get(ARROW_EXT_NAME_KEY) + .map(|value| value == BLOB_V2_EXT_NAME) + .unwrap_or(false) + } } /// Extends the functionality of [arrow_schema::Schema]. diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 7c7bd0aec5e..55f3dad8a04 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -9,6 +9,7 @@ use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::Array; use arrow_array::RecordBatch; use arrow_schema::DataType as ArrowDataType; +use lance_arrow::FieldExt; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use object_store::path::Path; use snafu::location; @@ -34,6 +35,11 @@ pub fn blob_version_from_config(config: &HashMap) -> BlobVersion const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; +/// Preprocesses blob v2 columns on the write path so the encoder only sees lightweight descriptors: +/// +/// - Spills large blobs to sidecar files before encoding, reducing memory/CPU and avoiding copying huge payloads through page builders. +/// - Emits `blob_id/blob_size` tied to the data file stem, giving readers a stable path independent of temporary fragment IDs assigned during write. +/// - Leaves small inline blobs and URI rows unchanged for compatibility. pub struct BlobPreprocessor { object_store: ObjectStore, data_dir: Path, @@ -54,7 +60,7 @@ impl BlobPreprocessor { fn next_blob_id(&mut self) -> u32 { let id = self.local_counter; - self.local_counter = self.local_counter.wrapping_add(1); + self.local_counter += 1; id } @@ -71,13 +77,7 @@ impl BlobPreprocessor { let mut new_fields = Vec::with_capacity(batch.num_columns()); for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - let is_blob_struct = field - .metadata() - .get(lance_arrow::ARROW_EXT_NAME_KEY) - .map(|v| v == lance_arrow::BLOB_V2_EXT_NAME) - .unwrap_or(false); - - if !is_blob_struct { + if !field.is_blob_v2() { new_columns.push(array.clone()); new_fields.push(field.clone()); continue;