From e65452f70ca35b79bf4037327a61dd3c6eabdcf1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Nov 2025 19:22:28 +0800 Subject: [PATCH 1/9] refactor: move blob version as a table level config Signed-off-by: Xuanwo --- rust/lance-arrow/src/lib.rs | 2 - rust/lance-core/src/datatypes.rs | 4 -- rust/lance-core/src/datatypes/field.rs | 76 ++++++----------------- rust/lance-core/src/datatypes/schema.rs | 69 +++++---------------- rust/lance/src/dataset.rs | 17 ++++-- rust/lance/src/dataset/blob.rs | 81 ++++++++++++++++++++++--- rust/lance/src/dataset/write.rs | 18 +++++- rust/lance/src/dataset/write/insert.rs | 65 ++++++++++---------- 8 files changed, 165 insertions(+), 167 deletions(-) diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 0c4f50f6b7b..43d8fd2a5cd 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -45,8 +45,6 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata"; /// Key used by lance to mark a field as a blob /// TODO: Use Arrow extension mechanism instead? pub const BLOB_META_KEY: &str = "lance-encoding:blob"; -/// Key used by Lance to record the blob column format version. -pub const BLOB_VERSION_META_KEY: &str = "lance-encoding:blob-version"; type Result = std::result::Result; diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index e7f576d4a18..c9364b88752 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -62,10 +62,6 @@ pub static BLOB_V2_DESC_TYPE: LazyLock = pub static BLOB_V2_DESC_FIELD: LazyLock = LazyLock::new(|| { ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([ (lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()), - ( - lance_arrow::BLOB_VERSION_META_KEY.to_string(), - "2".to_string(), - ), ])) }); diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 43b8e143a79..225beb59fb9 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -86,17 +86,20 @@ pub enum BlobVersion { } impl BlobVersion { - pub fn from_metadata_value(value: Option<&str>) -> Self { + /// Convert a persisted string value (e.g. table config) into a blob version + pub fn from_config_value(value: &str) -> Option { match value { - Some("2") => Self::V2, - _ => Self::V1, + "1" => Some(Self::V1), + "2" => Some(Self::V2), + _ => None, } } - pub fn metadata_value(self) -> Option<&'static str> { + /// Persistable string representation for table config. + pub fn config_value(self) -> &'static str { match self { - Self::V1 => None, - Self::V2 => Some("2"), + Self::V1 => "1", + Self::V2 => "2", } } } @@ -261,7 +264,11 @@ impl Field { } else { let mut new_field = self.clone(); new_field.children = children; - Some(projection.blob_handling.unload_if_needed(new_field)) + Some( + projection + .blob_handling + .unload_if_needed(new_field, projection.blob_version), + ) } } @@ -488,36 +495,13 @@ impl Field { self.metadata.contains_key(BLOB_META_KEY) } - pub fn blob_version(&self) -> BlobVersion { - if !self.is_blob() { - return BlobVersion::V1; - } - let value = self.metadata.get(BLOB_VERSION_META_KEY).map(|s| s.as_str()); - BlobVersion::from_metadata_value(value) - } - - pub fn set_blob_version(&mut self, version: BlobVersion) { - if !self.is_blob() { - return; - } - match version.metadata_value() { - Some(value) => { - self.metadata - .insert(BLOB_VERSION_META_KEY.to_string(), value.to_string()); - } - None => { - self.metadata.remove(BLOB_VERSION_META_KEY); - } - } - } - /// If the field is a blob, return a new field with the same name and id /// but with the data type set to a struct of the blob description fields. /// /// If the field is not a blob, return the field itself. - pub fn into_unloaded(mut self) -> Self { + pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self { if self.data_type().is_binary_like() && self.is_blob() { - match self.blob_version() { + match version { BlobVersion::V2 => { self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); @@ -1529,38 +1513,14 @@ mod tests { assert!(f2.compare_with_options(&f1, &ignore_nullability)); } - #[test] - fn blob_version_detection_and_setting() { - let mut metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]); - let field_v1: Field = ArrowField::new("blob", DataType::LargeBinary, true) - .with_metadata(metadata.clone()) - .try_into() - .unwrap(); - assert_eq!(field_v1.blob_version(), BlobVersion::V1); - - metadata.insert(BLOB_VERSION_META_KEY.to_string(), "2".to_string()); - let mut field_v2: Field = ArrowField::new("blob", DataType::LargeBinary, true) - .with_metadata(metadata) - .try_into() - .unwrap(); - assert_eq!(field_v2.blob_version(), BlobVersion::V2); - - field_v2.set_blob_version(BlobVersion::V1); - assert_eq!(field_v2.blob_version(), BlobVersion::V1); - assert!(!field_v2.metadata.contains_key(BLOB_VERSION_META_KEY)); - } - #[test] fn blob_into_unloaded_selects_v2_layout() { - let metadata = HashMap::from([ - (BLOB_META_KEY.to_string(), "true".to_string()), - (BLOB_VERSION_META_KEY.to_string(), "2".to_string()), - ]); + let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]); let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) .with_metadata(metadata) .try_into() .unwrap(); - let unloaded = field.into_unloaded(); + let unloaded = field.into_unloaded_with_version(BlobVersion::V2); assert_eq!(unloaded.children.len(), 5); assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index a19a93a15b1..05399ff9f4e 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -146,13 +146,6 @@ impl Schema { } } - pub fn apply_blob_version(&mut self, version: BlobVersion, allow_change: bool) -> Result<()> { - for field in self.fields.iter_mut() { - apply_blob_version_to_field(field, version, allow_change)?; - } - Ok(()) - } - pub fn has_dictionary_types(&self) -> bool { self.fields.iter().any(|f| f.has_dictionary_types()) } @@ -893,31 +886,6 @@ fn explain_metadata_difference( } } -fn apply_blob_version_to_field( - field: &mut Field, - version: BlobVersion, - allow_change: bool, -) -> Result<()> { - if field.is_blob() { - let current = field.blob_version(); - if current != version && !allow_change { - return Err(Error::InvalidInput { - source: format!( - "Blob column '{}' uses version {:?}, expected {:?}", - field.name, current, version - ) - .into(), - location: location!(), - }); - } - field.set_blob_version(version); - } - for child in field.children.iter_mut() { - apply_blob_version_to_field(child, version, allow_change)?; - } - Ok(()) -} - /// What to do when a column is missing in the schema #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OnMissing { @@ -928,6 +896,10 @@ pub enum OnMissing { /// A trait for something that we can project fields from. pub trait Projectable: Debug + Send + Sync { fn schema(&self) -> &Schema; + + fn blob_version(&self) -> BlobVersion { + BlobVersion::V1 + } } impl Projectable for Schema { @@ -972,9 +944,9 @@ impl BlobHandling { } } - pub fn unload_if_needed(&self, field: Field) -> Field { + pub fn unload_if_needed(&self, field: Field, version: BlobVersion) -> Field { if self.should_unload(&field) { - field.into_unloaded() + field.into_unloaded_with_version(version) } else { field } @@ -994,6 +966,7 @@ pub struct Projection { pub with_row_last_updated_at_version: bool, pub with_row_created_at_version: bool, pub blob_handling: BlobHandling, + pub blob_version: BlobVersion, } impl Debug for Projection { @@ -1011,6 +984,7 @@ impl Debug for Projection { &self.with_row_created_at_version, ) .field("blob_handling", &self.blob_handling) + .field("blob_version", &self.blob_version) .finish() } } @@ -1018,6 +992,7 @@ impl Debug for Projection { impl Projection { /// Create a new empty projection pub fn empty(base: Arc) -> Self { + let blob_version = base.blob_version(); Self { base, field_ids: HashSet::new(), @@ -1026,6 +1001,7 @@ impl Projection { with_row_last_updated_at_version: false, with_row_created_at_version: false, blob_handling: BlobHandling::default(), + blob_version, } } @@ -1059,6 +1035,11 @@ impl Projection { self } + pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self { + self.blob_version = blob_version; + self + } + fn add_field_children(field_ids: &mut HashSet, field: &Field) { for child in &field.children { field_ids.insert(child.id); @@ -1516,7 +1497,6 @@ mod tests { use std::sync::Arc; use super::*; - use crate::datatypes::BlobVersion; #[test] fn test_resolve_with_quoted_fields() { @@ -2537,23 +2517,4 @@ mod tests { .contains(error_message_contains[idx])); } } - - #[test] - fn apply_blob_version_requires_consistent_metadata() { - let arrow_field = ArrowField::new("blob", ArrowDataType::LargeBinary, true).with_metadata( - HashMap::from([ - (BLOB_META_KEY.to_string(), "true".to_string()), - (BLOB_VERSION_META_KEY.to_string(), "2".to_string()), - ]), - ); - let mut schema = - Schema::try_from(&ArrowSchema::new(vec![arrow_field])).expect("schema creation"); - - assert!(schema.apply_blob_version(BlobVersion::V1, false).is_err()); - - schema - .apply_blob_version(BlobVersion::V1, true) - .expect("allow metadata change when permitted"); - assert_eq!(schema.fields[0].blob_version(), BlobVersion::V1); - } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ec0ac823f4d..3b5a2ec023b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -13,12 +13,15 @@ use futures::future::BoxFuture; use futures::stream::{self, BoxStream, StreamExt, TryStreamExt}; use futures::{FutureExt, Stream}; +use crate::dataset::blob::blob_version_from_config; use crate::dataset::metadata::UpdateFieldMetadataBuilder; use crate::dataset::transaction::translate_schema_metadata_updates; use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey}; use crate::session::index_caches::DSIndexCache; use itertools::Itertools; -use lance_core::datatypes::{Field, OnMissing, OnTypeMismatch, Projectable, Projection}; +use lance_core::datatypes::{ + BlobVersion, Field, OnMissing, OnTypeMismatch, Projectable, Projection, +}; use lance_core::traits::DatasetTakeRows; use lance_core::utils::address::RowAddress; use lance_core::utils::tracing::{ @@ -56,7 +59,7 @@ use std::sync::Arc; use take::row_offsets_to_row_addresses; use tracing::{info, instrument}; -mod blob; +pub(crate) mod blob; mod branch_location; pub mod builder; pub mod cleanup; @@ -2332,6 +2335,10 @@ impl Dataset { &self.manifest.config } + pub(crate) fn blob_version(&self) -> BlobVersion { + blob_version_from_config(&self.manifest.config) + } + /// Delete keys from the config. #[deprecated( note = "Use the new update_config(values, replace) method - pass None values to delete keys" @@ -2597,6 +2604,10 @@ impl Projectable for Dataset { fn schema(&self) -> &Schema { self.schema() } + + fn blob_version(&self) -> BlobVersion { + self.blob_version() + } } #[cfg(test)] @@ -8877,7 +8888,6 @@ mod tests { read_version, Operation::Append { fragments: vec![] }, None, - None, ) } @@ -8939,7 +8949,6 @@ mod tests { ds.load_indices().await.unwrap().as_ref().clone(), &tx_file, &ManifestWriteConfig::default(), - None, ) .unwrap(); let location = write_manifest_file( diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index ba19d1a5d89..2d0cfc2ec5e 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -1,19 +1,30 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{future::Future, ops::DerefMut, sync::Arc}; +use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; -use arrow::datatypes::UInt64Type; +use arrow::datatypes::{UInt64Type, UInt8Type}; use arrow_schema::DataType; use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; use super::Dataset; +use arrow_array::{Array, StructArray}; +use lance_core::datatypes::BlobVersion; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; +pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version"; + +pub fn blob_version_from_config(config: &HashMap) -> BlobVersion { + config + .get(BLOB_VERSION_CONFIG_KEY) + .and_then(|value| BlobVersion::from_config_value(value)) + .unwrap_or(BlobVersion::V1) +} + /// Current state of the reader. Held in a mutex for easy sharing /// /// The u64 is the cursor in the file that the reader is currently at @@ -190,9 +201,25 @@ pub(super) async fn take_blobs( .execute() .await?; let descriptions = description_and_addr.column(0).as_struct(); + let row_addrs = description_and_addr.column(1).as_primitive::(); + let blob_field_id = blob_field_id as u32; + + match dataset.blob_version() { + BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), + BlobVersion::V2 => collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs), + } +} + +const INLINE_BLOB_KIND: u8 = 0; + +fn collect_blob_files_v1( + dataset: &Arc, + blob_field_id: u32, + descriptions: &StructArray, + row_addrs: &arrow::array::PrimitiveArray, +) -> Result> { let positions = descriptions.column(0).as_primitive::(); let sizes = descriptions.column(1).as_primitive::(); - let row_addrs = description_and_addr.column(1).as_primitive::(); Ok(row_addrs .values() @@ -205,17 +232,51 @@ pub(super) async fn take_blobs( Some((*row_addr, position, size)) }) .map(|(row_addr, position, size)| { - BlobFile::new( - dataset.clone(), - blob_field_id as u32, - row_addr, - position, - size, - ) + BlobFile::new(dataset.clone(), blob_field_id, row_addr, position, size) }) .collect()) } +fn collect_blob_files_v2( + dataset: &Arc, + blob_field_id: u32, + descriptions: &StructArray, + row_addrs: &arrow::array::PrimitiveArray, +) -> Result> { + let kinds = descriptions.column(0).as_primitive::(); + let positions = descriptions.column(1).as_primitive::(); + let sizes = descriptions.column(2).as_primitive::(); + + let mut files = Vec::with_capacity(row_addrs.len()); + for (idx, row_addr) in row_addrs.values().iter().enumerate() { + if positions.is_null(idx) || sizes.is_null(idx) { + continue; + } + + if !kinds.is_null(idx) { + let kind = kinds.value(idx); + if kind != INLINE_BLOB_KIND { + return Err(Error::NotSupported { + source: format!("Blob kind {} is not supported", kind).into(), + location: location!(), + }); + } + } + + let position = positions.value(idx); + let size = sizes.value(idx); + files.push(BlobFile::new( + dataset.clone(), + blob_field_id, + *row_addr, + position, + size, + )); + } + + Ok(files) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 1bdf05e640a..1fe0138ec03 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -41,7 +41,7 @@ use super::transaction::Transaction; use super::utils::SchemaAdapter; use super::DATA_DIR; -fn blob_version_for(storage_version: LanceFileVersion) -> BlobVersion { +pub(super) fn blob_version_for(storage_version: LanceFileVersion) -> BlobVersion { if storage_version >= LanceFileVersion::V2_2 { BlobVersion::V2 } else { @@ -590,7 +590,7 @@ pub async fn write_fragments_internal( let allow_blob_version_change = dataset.is_none() || matches!(params.mode, WriteMode::Overwrite); - let (mut schema, storage_version) = if let Some(dataset) = dataset { + let (schema, storage_version) = if let Some(dataset) = dataset { match params.mode { WriteMode::Append | WriteMode::Create => { // Append mode, so we need to check compatibility @@ -638,7 +638,19 @@ pub async fn write_fragments_internal( }; let target_blob_version = blob_version_for(storage_version); - schema.apply_blob_version(target_blob_version, allow_blob_version_change)?; + if let Some(dataset) = dataset { + let existing_version = dataset.blob_version(); + if !allow_blob_version_change && existing_version != target_blob_version { + return Err(Error::InvalidInput { + source: format!( + "Blob column version mismatch. Dataset uses {:?} but write requires {:?}", + existing_version, target_blob_version + ) + .into(), + location: location!(), + }); + } + } let fragments = do_write_fragments( object_store, diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index b9c4ffc7334..58243a6cc11 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -19,6 +19,7 @@ use lance_table::io::commit::CommitHandler; use object_store::path::Path; use snafu::location; +use crate::dataset::blob::BLOB_VERSION_CONFIG_KEY; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; @@ -27,6 +28,7 @@ use crate::Dataset; use crate::{Error, Result}; use tracing::info; +use super::blob_version_for; use super::commit::CommitBuilder; use super::resolve_commit_handler; use super::WriteDestination; @@ -214,43 +216,42 @@ impl<'a> InsertBuilder<'a> { fragments: Vec, context: &WriteContext<'_>, ) -> Result { + let blob_version = blob_version_for(context.storage_version); + let mut config_entries = HashMap::from([( + BLOB_VERSION_CONFIG_KEY.to_string(), + blob_version.config_value().to_string(), + )]); + + if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { + config_entries.insert( + String::from("lance.auto_cleanup.interval"), + auto_cleanup_params.interval.to_string(), + ); + + match auto_cleanup_params.older_than.to_std() { + Ok(duration) => { + config_entries.insert( + String::from("lance.auto_cleanup.older_than"), + format_duration(duration).to_string(), + ); + } + Err(e) => { + return Err(Error::InvalidInput { + source: e.into(), + location: location!(), + }) + } + }; + } + + let mut config_payload = Some(config_entries); let operation = match context.params.mode { WriteMode::Create => { - // Fetch auto_cleanup params from context - let config_upsert_values = match context.params.auto_cleanup.as_ref() { - Some(auto_cleanup_params) => { - let mut upsert_values = HashMap::new(); - - upsert_values.insert( - String::from("lance.auto_cleanup.interval"), - auto_cleanup_params.interval.to_string(), - ); - - match auto_cleanup_params.older_than.to_std() { - Ok(d) => { - upsert_values.insert( - String::from("lance.auto_cleanup.older_than"), - format_duration(d).to_string(), - ); - } - Err(e) => { - return Err(Error::InvalidInput { - source: e.into(), - location: location!(), - }) - } - }; - - Some(upsert_values) - } - None => None, - }; - Operation::Overwrite { // Use the full schema, not the written schema schema, fragments, - config_upsert_values, + config_upsert_values: config_payload.take(), initial_bases: context.params.initial_bases.clone(), } } @@ -259,7 +260,7 @@ impl<'a> InsertBuilder<'a> { // Use the full schema, not the written schema schema, fragments, - config_upsert_values: None, + config_upsert_values: config_payload.take(), initial_bases: context.params.initial_bases.clone(), } } From 6a54d59cf014d0a0af6561efb502523c6b308314 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 12 Nov 2025 19:33:42 +0800 Subject: [PATCH 2/9] Format code Signed-off-by: Xuanwo --- rust/lance/src/dataset.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ced4c813fe0..028e7c708e9 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -8884,11 +8884,7 @@ mod tests { } fn make_tx(read_version: u64) -> Transaction { - Transaction::new( - read_version, - Operation::Append { fragments: vec![] }, - None, - ) + Transaction::new(read_version, Operation::Append { fragments: vec![] }, None) } async fn delete_external_tx_file(ds: &Dataset) { From 6e2866a195be3796fc1be51cad40ef401f1b7942 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Nov 2025 01:45:08 +0800 Subject: [PATCH 3/9] Fix tests Signed-off-by: Xuanwo --- rust/lance/src/dataset/transaction.rs | 19 ++++++--- rust/lance/src/dataset/write/insert.rs | 59 ++++++++++++-------------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 6637b76d58f..364b5fe800b 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -45,12 +45,12 @@ //! the operation does not modify the region of the column being replaced. //! -use super::ManifestWriteConfig; +use super::{blob::BLOB_VERSION_CONFIG_KEY, ManifestWriteConfig}; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::index::mem_wal::update_mem_wal_index_in_indices_list; use crate::utils::temporal::timestamp_to_nanos; use deepsize::DeepSizeOf; -use lance_core::{datatypes::Schema, Error, Result}; +use lance_core::{datatypes::BlobVersion, datatypes::Schema, Error, Result}; use lance_file::{datatypes::Fields, version::LanceFileVersion}; use lance_index::mem_wal::MemWal; use lance_index::{frag_reuse::FRAG_REUSE_INDEX_NAME, is_system_index}; @@ -2184,12 +2184,19 @@ impl Transaction { } else { let data_storage_format = Self::data_storage_format_from_files(&final_fragments, user_requested_version)?; - Manifest::new( + let mut manifest = Manifest::new( schema, Arc::new(final_fragments), data_storage_format, reference_paths, - ) + ); + if manifest.data_storage_format.lance_file_version()? >= LanceFileVersion::V2_2 { + manifest.config_mut().insert( + BLOB_VERSION_CONFIG_KEY.to_string(), + BlobVersion::V2.config_value().to_string(), + ); + } + manifest }; manifest.tag.clone_from(&self.tag); @@ -2786,9 +2793,9 @@ impl TryFrom for Transaction { initial_bases, })) => { let config_upsert_option = if config_upsert_values.is_empty() { - Some(config_upsert_values) - } else { None + } else { + Some(config_upsert_values) }; Operation::Overwrite { diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 58243a6cc11..47c8f1c6e16 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -19,7 +19,6 @@ use lance_table::io::commit::CommitHandler; use object_store::path::Path; use snafu::location; -use crate::dataset::blob::BLOB_VERSION_CONFIG_KEY; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; @@ -28,7 +27,6 @@ use crate::Dataset; use crate::{Error, Result}; use tracing::info; -use super::blob_version_for; use super::commit::CommitBuilder; use super::resolve_commit_handler; use super::WriteDestination; @@ -216,42 +214,41 @@ impl<'a> InsertBuilder<'a> { fragments: Vec, context: &WriteContext<'_>, ) -> Result { - let blob_version = blob_version_for(context.storage_version); - let mut config_entries = HashMap::from([( - BLOB_VERSION_CONFIG_KEY.to_string(), - blob_version.config_value().to_string(), - )]); - - if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { - config_entries.insert( - String::from("lance.auto_cleanup.interval"), - auto_cleanup_params.interval.to_string(), - ); - - match auto_cleanup_params.older_than.to_std() { - Ok(duration) => { - config_entries.insert( - String::from("lance.auto_cleanup.older_than"), - format_duration(duration).to_string(), - ); - } - Err(e) => { - return Err(Error::InvalidInput { - source: e.into(), - location: location!(), - }) - } - }; - } + let mut config_payload = + if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { + let mut upsert_values = HashMap::new(); + upsert_values.insert( + String::from("lance.auto_cleanup.interval"), + auto_cleanup_params.interval.to_string(), + ); - let mut config_payload = Some(config_entries); + match auto_cleanup_params.older_than.to_std() { + Ok(d) => { + upsert_values.insert( + String::from("lance.auto_cleanup.older_than"), + format_duration(d).to_string(), + ); + } + Err(e) => { + return Err(Error::InvalidInput { + source: e.into(), + location: location!(), + }) + } + }; + + Some(upsert_values) + } else { + None + }; let operation = match context.params.mode { WriteMode::Create => { + let config_upsert_values = config_payload.take(); Operation::Overwrite { // Use the full schema, not the written schema schema, fragments, - config_upsert_values: config_payload.take(), + config_upsert_values, initial_bases: context.params.initial_bases.clone(), } } From ed12f93df4880bce3c5887a0fa81a6c18950d082 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Nov 2025 02:29:38 +0800 Subject: [PATCH 4/9] Fix test Signed-off-by: Xuanwo --- rust/lance/src/dataset/write.rs | 2 +- rust/lance/src/dataset/write/insert.rs | 63 ++++++++++++-------------- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 1fe0138ec03..6eb48b8e1b9 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -268,7 +268,7 @@ impl Default for WriteParams { enable_stable_row_ids: false, enable_v2_manifest_paths: false, session: None, - auto_cleanup: Some(AutoCleanupParams::default()), + auto_cleanup: None, skip_auto_cleanup: false, transaction_properties: None, initial_bases: None, diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 47c8f1c6e16..471e415c861 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -214,36 +214,32 @@ impl<'a> InsertBuilder<'a> { fragments: Vec, context: &WriteContext<'_>, ) -> Result { - let mut config_payload = - if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { - let mut upsert_values = HashMap::new(); - upsert_values.insert( - String::from("lance.auto_cleanup.interval"), - auto_cleanup_params.interval.to_string(), - ); + let operation = match context.params.mode { + WriteMode::Create => { + let config_upsert_values = if let Some(auto_cleanup_params) = + context.params.auto_cleanup.as_ref() + { + let mut upsert_values = HashMap::new(); + upsert_values.insert( + String::from("lance.auto_cleanup.interval"), + auto_cleanup_params.interval.to_string(), + ); - match auto_cleanup_params.older_than.to_std() { - Ok(d) => { - upsert_values.insert( - String::from("lance.auto_cleanup.older_than"), - format_duration(d).to_string(), - ); - } - Err(e) => { - return Err(Error::InvalidInput { + let duration = auto_cleanup_params + .older_than + .to_std() + .map_err(|e| Error::InvalidInput { source: e.into(), location: location!(), - }) - } + })?; + upsert_values.insert( + String::from("lance.auto_cleanup.older_than"), + format_duration(duration).to_string(), + ); + Some(upsert_values) + } else { + None }; - - Some(upsert_values) - } else { - None - }; - let operation = match context.params.mode { - WriteMode::Create => { - let config_upsert_values = config_payload.take(); Operation::Overwrite { // Use the full schema, not the written schema schema, @@ -252,15 +248,12 @@ impl<'a> InsertBuilder<'a> { initial_bases: context.params.initial_bases.clone(), } } - WriteMode::Overwrite => { - Operation::Overwrite { - // Use the full schema, not the written schema - schema, - fragments, - config_upsert_values: config_payload.take(), - initial_bases: context.params.initial_bases.clone(), - } - } + WriteMode::Overwrite => Operation::Overwrite { + schema, + fragments, + config_upsert_values: None, + initial_bases: context.params.initial_bases.clone(), + }, WriteMode::Append => Operation::Append { fragments }, }; From cad47fd56e74af088bb1bc505168c6708facdfdb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Nov 2025 02:31:23 +0800 Subject: [PATCH 5/9] Format code Signed-off-by: Xuanwo --- rust/lance/src/dataset/write/insert.rs | 44 ++++++++++++-------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 471e415c861..50520fb32f8 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -216,30 +216,28 @@ impl<'a> InsertBuilder<'a> { ) -> Result { let operation = match context.params.mode { WriteMode::Create => { - let config_upsert_values = if let Some(auto_cleanup_params) = - context.params.auto_cleanup.as_ref() - { - let mut upsert_values = HashMap::new(); - upsert_values.insert( - String::from("lance.auto_cleanup.interval"), - auto_cleanup_params.interval.to_string(), - ); - - let duration = auto_cleanup_params - .older_than - .to_std() - .map_err(|e| Error::InvalidInput { - source: e.into(), - location: location!(), + let config_upsert_values = + if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { + let mut upsert_values = HashMap::new(); + upsert_values.insert( + String::from("lance.auto_cleanup.interval"), + auto_cleanup_params.interval.to_string(), + ); + + let duration = auto_cleanup_params.older_than.to_std().map_err(|e| { + Error::InvalidInput { + source: e.into(), + location: location!(), + } })?; - upsert_values.insert( - String::from("lance.auto_cleanup.older_than"), - format_duration(duration).to_string(), - ); - Some(upsert_values) - } else { - None - }; + upsert_values.insert( + String::from("lance.auto_cleanup.older_than"), + format_duration(duration).to_string(), + ); + Some(upsert_values) + } else { + None + }; Operation::Overwrite { // Use the full schema, not the written schema schema, From 795967c2069aafc856c4f5ffa870a21efa681815 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Nov 2025 02:47:12 +0800 Subject: [PATCH 6/9] Don't touch projectable Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/schema.rs | 7 +------ rust/lance/src/dataset.rs | 8 ++------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 05399ff9f4e..176f7c82e79 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -896,10 +896,6 @@ pub enum OnMissing { /// A trait for something that we can project fields from. pub trait Projectable: Debug + Send + Sync { fn schema(&self) -> &Schema; - - fn blob_version(&self) -> BlobVersion { - BlobVersion::V1 - } } impl Projectable for Schema { @@ -992,7 +988,6 @@ impl Debug for Projection { impl Projection { /// Create a new empty projection pub fn empty(base: Arc) -> Self { - let blob_version = base.blob_version(); Self { base, field_ids: HashSet::new(), @@ -1001,7 +996,7 @@ impl Projection { with_row_last_updated_at_version: false, with_row_created_at_version: false, blob_handling: BlobHandling::default(), - blob_version, + blob_version: BlobVersion::V1, } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 028e7c708e9..5c5a858caab 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1611,12 +1611,12 @@ impl Dataset { /// Similar to [Self::schema], but only returns fields that are not marked as blob columns /// Creates a new empty projection into the dataset schema pub fn empty_projection(self: &Arc) -> Projection { - Projection::empty(self.clone()) + Projection::empty(self.clone()).with_blob_version(self.blob_version()) } /// Creates a projection that includes all columns in the dataset pub fn full_projection(self: &Arc) -> Projection { - Projection::full(self.clone()) + Projection::full(self.clone()).with_blob_version(self.blob_version()) } /// Get fragments. @@ -2604,10 +2604,6 @@ impl Projectable for Dataset { fn schema(&self) -> &Schema { self.schema() } - - fn blob_version(&self) -> BlobVersion { - self.blob_version() - } } #[cfg(test)] From 7682df07803844b5199c08ce3552c5649bc91f82 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 13 Nov 2025 15:23:41 +0800 Subject: [PATCH 7/9] Fix tests Signed-off-by: Xuanwo --- rust/lance/src/dataset/write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 6eb48b8e1b9..1fe0138ec03 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -268,7 +268,7 @@ impl Default for WriteParams { enable_stable_row_ids: false, enable_v2_manifest_paths: false, session: None, - auto_cleanup: None, + auto_cleanup: Some(AutoCleanupParams::default()), skip_auto_cleanup: false, transaction_properties: None, initial_bases: None, From e0e5fd86fa84392728140be8b094dd49628c340b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Nov 2025 19:38:32 +0800 Subject: [PATCH 8/9] feat: introduce blob arrow extension type Signed-off-by: Xuanwo --- rust/lance-arrow/src/lib.rs | 3 ++ rust/lance-arrow/src/schema.rs | 6 ++- rust/lance-core/src/datatypes.rs | 7 +++ rust/lance-core/src/datatypes/field.rs | 69 +++++++++++++++++++++++++- 4 files changed, 82 insertions(+), 3 deletions(-) diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 43d8fd2a5cd..2610dd34e0f 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -46,6 +46,9 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata"; /// TODO: Use Arrow extension mechanism instead? pub const BLOB_META_KEY: &str = "lance-encoding:blob"; +/// Arrow extension type name for Lance blob v2 columns +pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2"; + type Result = std::result::Result; pub trait DataTypeExt { diff --git a/rust/lance-arrow/src/schema.rs b/rust/lance-arrow/src/schema.rs index 2c2e608a106..16840a7a451 100644 --- a/rust/lance-arrow/src/schema.rs +++ b/rust/lance-arrow/src/schema.rs @@ -5,7 +5,7 @@ use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema}; -use crate::BLOB_META_KEY; +use crate::{ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME}; pub enum Indentation { OneLine, @@ -103,6 +103,10 @@ impl FieldExt for Field { fn is_blob(&self) -> bool { let field_metadata = self.metadata(); field_metadata.get(BLOB_META_KEY).is_some() + || field_metadata + .get(ARROW_EXT_NAME_KEY) + .map(|value| value == BLOB_V2_EXT_NAME) + .unwrap_or(false) } } diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index c9364b88752..eea7b811a13 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -68,6 +68,8 @@ pub static BLOB_V2_DESC_FIELD: LazyLock = LazyLock::new(|| { pub static BLOB_V2_DESC_LANCE_FIELD: LazyLock = LazyLock::new(|| Field::try_from(&*BLOB_V2_DESC_FIELD).unwrap()); +pub const BLOB_LOGICAL_TYPE: &str = "blob"; + /// LogicalType is a string presentation of arrow type. /// to be serialized into protobuf. #[derive(Debug, Clone, PartialEq, DeepSizeOf)] @@ -91,6 +93,10 @@ impl LogicalType { fn is_struct(&self) -> bool { self.0 == "struct" } + + fn is_blob(&self) -> bool { + self.0 == BLOB_LOGICAL_TYPE + } } impl From<&str> for LogicalType { @@ -224,6 +230,7 @@ impl TryFrom<&LogicalType> for DataType { "binary" => Some(Binary), "large_string" => Some(LargeUtf8), "large_binary" => Some(LargeBinary), + BLOB_LOGICAL_TYPE => Some(LargeBinary), "json" => Some(LargeBinary), "date32:day" => Some(Date32), "date64:ms" => Some(Date64), diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 225beb59fb9..96e8165e61f 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -21,7 +21,7 @@ use arrow_schema::{DataType, Field as ArrowField}; use deepsize::DeepSizeOf; use lance_arrow::{ json::{is_arrow_json_field, is_json_field}, - ARROW_EXT_NAME_KEY, *, + DataTypeExt, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME, }; use snafu::location; @@ -42,6 +42,15 @@ use crate::{ /// (3) The field must not be within a list type. pub const LANCE_UNENFORCED_PRIMARY_KEY: &str = "lance-schema:unenforced-primary-key"; +fn has_blob_v2_extension(field: &ArrowField) -> bool { + field.data_type() == &DataType::LargeBinary + && field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BLOB_V2_EXT_NAME) + .unwrap_or(false) +} + #[derive(Debug, Default)] pub enum NullabilityComparison { // If the nullabilities don't match then the fields don't match @@ -493,6 +502,11 @@ impl Field { /// Blob fields will load descriptions by default pub fn is_blob(&self) -> bool { self.metadata.contains_key(BLOB_META_KEY) + || self + .metadata + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BLOB_V2_EXT_NAME) + .unwrap_or(false) } /// If the field is a blob, return a new field with the same name and id @@ -975,15 +989,24 @@ impl TryFrom<&ArrowField> for Field { DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?], _ => vec![], }; - let metadata = field.metadata().clone(); + let mut metadata = field.metadata().clone(); let unenforced_primary_key = metadata .get(LANCE_UNENFORCED_PRIMARY_KEY) .map(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes")) .unwrap_or(false); + let is_blob_v2 = has_blob_v2_extension(field); + + if is_blob_v2 { + metadata + .entry(BLOB_META_KEY.to_string()) + .or_insert_with(|| "true".to_string()); + } // Check for JSON extension types (both Arrow and Lance) let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") + } else if is_blob_v2 { + LogicalType::from(super::BLOB_LOGICAL_TYPE) } else { LogicalType::try_from(field.data_type())? }; @@ -1023,6 +1046,19 @@ impl From<&Field> for ArrowField { let out = Self::new(&field.name, field.data_type(), field.nullable); let mut metadata = field.metadata.clone(); + if field.logical_type.is_blob() { + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + ); + metadata + .entry(ARROW_EXT_META_KEY.to_string()) + .or_default(); + metadata + .entry(BLOB_META_KEY.to_string()) + .or_insert_with(|| "true".to_string()); + } + // Add JSON extension metadata if this is a JSON field if field.logical_type.0 == "json" { metadata.insert( @@ -1041,6 +1077,8 @@ mod tests { use arrow_array::{DictionaryArray, StringArray, UInt32Array}; use arrow_schema::{Fields, TimeUnit}; + use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME}; + use std::collections::HashMap; #[test] fn arrow_field_to_field() { for (name, data_type) in [ @@ -1524,4 +1562,31 @@ mod tests { assert_eq!(unloaded.children.len(), 5); assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } + + #[test] + fn blob_extension_roundtrip() { + let metadata = HashMap::from([ + (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), + (ARROW_EXT_META_KEY.to_string(), "".to_string()), + ]); + let arrow_field = + ArrowField::new("blob", DataType::LargeBinary, true).with_metadata(metadata); + let field = Field::try_from(&arrow_field).unwrap(); + assert_eq!( + field.logical_type, + LogicalType::from(crate::datatypes::BLOB_LOGICAL_TYPE) + ); + assert!(field.is_blob()); + assert_eq!(field.data_type(), DataType::LargeBinary); + + let roundtrip: ArrowField = ArrowField::from(&field); + assert_eq!( + roundtrip.metadata().get(ARROW_EXT_NAME_KEY), + Some(&BLOB_V2_EXT_NAME.to_string()) + ); + assert_eq!( + roundtrip.metadata().get(BLOB_META_KEY), + Some(&"true".to_string()) + ); + } } From ea3a0b379cf3c4908cfbed50a7fd0e7dc2145a94 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 14 Nov 2025 19:41:06 +0800 Subject: [PATCH 9/9] Format code Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 96e8165e61f..6f4c8a5f8f1 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -1051,9 +1051,7 @@ impl From<&Field> for ArrowField { ARROW_EXT_NAME_KEY.to_string(), lance_arrow::BLOB_V2_EXT_NAME.to_string(), ); - metadata - .entry(ARROW_EXT_META_KEY.to_string()) - .or_default(); + metadata.entry(ARROW_EXT_META_KEY.to_string()).or_default(); metadata .entry(BLOB_META_KEY.to_string()) .or_insert_with(|| "true".to_string());