From b903fb2408af2229f084fc8348f53c53c1737917 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 19 Jan 2026 23:12:45 +0800 Subject: [PATCH 01/21] fix: always use v2 encoder while blob v2 enabled --- rust/lance-encoding/src/encoder.rs | 35 ++- .../src/encodings/logical/blob.rs | 246 +++++++++++++++++- .../src/encodings/logical/struct.rs | 7 +- rust/lance-file/benches/reader.rs | 3 +- rust/lance/src/dataset/take.rs | 44 +++- 5 files changed, 313 insertions(+), 22 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 203b3b99642..09bd70e2900 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -394,6 +394,29 @@ impl StructuralEncodingStrategy { // Check if field is marked as blob if field.is_blob() { + if self.version >= LanceFileVersion::V2_2 { + match data_type { + DataType::Binary | DataType::LargeBinary | DataType::Struct(_) => { + return Ok(Box::new(BlobV2StructuralEncoder::new( + field, + column_index.next_column_index(field.id as u32), + options, + self.compression_strategy.clone(), + )?)); + } + _ => { + return Err(Error::InvalidInput { + source: format!( + "Blob encoding only supports Binary/LargeBinary or Struct, got {}", + data_type + ) + .into(), + location: location!(), + }); + } + } + } + match data_type { DataType::Binary | DataType::LargeBinary => { return Ok(Box::new(BlobStructuralEncoder::new( @@ -403,24 +426,16 @@ impl StructuralEncodingStrategy { self.compression_strategy.clone(), )?)); } - DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => { - return Ok(Box::new(BlobV2StructuralEncoder::new( - field, - column_index.next_column_index(field.id as u32), - options, - self.compression_strategy.clone(), - )?)); - } DataType::Struct(_) => { return Err(Error::InvalidInput { - source: "Blob v2 struct input requires file version >= 2.2".into(), + source: "Blob struct input requires file version >= 2.2".into(), location: location!(), }); } _ => { return Err(Error::InvalidInput { source: format!( - "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}", + "Blob encoding only supports Binary/LargeBinary, got {}", data_type ) .into(), diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 349d3d4b700..b733d2ae954 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow_array::{ - builder::{PrimitiveBuilder, StringBuilder}, + builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}, cast::AsArray, types::{UInt32Type, UInt64Type, UInt8Type}, Array, ArrayRef, StructArray, UInt64Array, @@ -272,7 +272,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { row_number: u64, num_rows: u64, ) -> Result> { - let struct_arr = array.as_struct(); + let struct_arr = normalize_blob_v2_input(array)?; if let Some(validity) = struct_arr.nulls() { repdef.add_validity_bitmap(validity.clone()); } else { @@ -281,27 +281,45 @@ impl FieldEncoder for BlobV2StructuralEncoder { let kind_col = struct_arr .column_by_name("kind") - .expect("kind column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `kind` field".into(), + location: location!(), + })? .as_primitive::(); let data_col = struct_arr .column_by_name("data") - .expect("data column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `data` field".into(), + location: location!(), + })? .as_binary::(); let uri_col = struct_arr .column_by_name("uri") - .expect("uri column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `uri` field".into(), + location: location!(), + })? .as_string::(); let blob_id_col = struct_arr .column_by_name("blob_id") - .expect("blob_id column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `blob_id` field".into(), + location: location!(), + })? .as_primitive::(); let blob_size_col = struct_arr .column_by_name("blob_size") - .expect("blob_size column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `blob_size` field".into(), + location: location!(), + })? .as_primitive::(); let packed_position_col = struct_arr .column_by_name("position") - .expect("position column must exist") + .ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct missing `position` field".into(), + location: location!(), + })? .as_primitive::(); let row_count = struct_arr.len(); @@ -402,6 +420,218 @@ impl FieldEncoder for BlobV2StructuralEncoder { } } +fn normalize_blob_v2_input(array: ArrayRef) -> Result { + match array.data_type() { + DataType::Struct(_) => { + let struct_arr = array.as_struct(); + if struct_arr.fields().len() != struct_arr.columns().len() { + return Err(Error::InvalidInput { + source: format!( + "Invalid StructArray: {} fields but {} columns", + struct_arr.fields().len(), + struct_arr.columns().len() + ) + .into(), + location: location!(), + }); + } + + let is_normalized = ["kind", "data", "uri", "blob_id", "blob_size", "position"] + .iter() + .all(|name| struct_arr.column_by_name(name).is_some()); + if is_normalized { + return Ok(struct_arr.clone()); + } + + let data_col = + struct_arr + .column_by_name("data") + .ok_or_else(|| Error::InvalidInput { + source: "Blob struct missing `data` field".into(), + location: location!(), + })?; + let uri_col = struct_arr + .column_by_name("uri") + .ok_or_else(|| Error::InvalidInput { + source: "Blob struct missing `uri` field".into(), + location: location!(), + })?; + + if struct_arr.columns().len() != 2 { + return Err(Error::InvalidInput { + source: format!( + "Unsupported blob struct input: expected 2 or 6 fields, got {}", + struct_arr.columns().len() + ) + .into(), + location: location!(), + }); + } + + let data_col = data_col.as_binary::(); + let uri_col = uri_col.as_string::(); + let row_count = struct_arr.len(); + + let mut kind_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut data_builder = LargeBinaryBuilder::with_capacity(row_count, 0); + let mut uri_builder = StringBuilder::with_capacity(row_count, 0); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut blob_size_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut position_builder = PrimitiveBuilder::::with_capacity(row_count); + + for i in 0..row_count { + if struct_arr.is_null(i) { + kind_builder.append_null(); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + continue; + } + + let has_data = !data_col.is_null(i); + let has_uri = !uri_col.is_null(i); + if has_uri { + kind_builder.append_value(BlobKind::External as u8); + data_builder.append_null(); + uri_builder.append_value(uri_col.value(i)); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + } else if has_data { + kind_builder.append_value(BlobKind::Inline as u8); + data_builder.append_value(data_col.value(i)); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + } else { + kind_builder.append_null(); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + } + } + + let fields = Fields::from(vec![ + ArrowField::new("kind", DataType::UInt8, true), + ArrowField::new("data", DataType::LargeBinary, true), + ArrowField::new("uri", DataType::Utf8, true), + ArrowField::new("blob_id", DataType::UInt32, true), + ArrowField::new("blob_size", DataType::UInt64, true), + ArrowField::new("position", DataType::UInt64, true), + ]); + + StructArray::try_new( + fields, + vec![ + Arc::new(kind_builder.finish()) as ArrayRef, + Arc::new(data_builder.finish()) as ArrayRef, + Arc::new(uri_builder.finish()) as ArrayRef, + Arc::new(blob_id_builder.finish()) as ArrayRef, + Arc::new(blob_size_builder.finish()) as ArrayRef, + Arc::new(position_builder.finish()) as ArrayRef, + ], + struct_arr.nulls().cloned(), + ) + .map_err(|e| Error::InvalidInput { + source: e.to_string().into(), + location: location!(), + }) + } + DataType::Binary | DataType::LargeBinary => { + let row_count = array.len(); + let nulls = array.nulls().cloned(); + + let mut kind_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut data_builder = LargeBinaryBuilder::with_capacity(row_count, 0); + let mut uri_builder = StringBuilder::with_capacity(row_count, 0); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut blob_size_builder = PrimitiveBuilder::::with_capacity(row_count); + let mut position_builder = PrimitiveBuilder::::with_capacity(row_count); + + if let Some(binary) = array.as_binary_opt::() { + for i in 0..row_count { + if binary.is_null(i) { + kind_builder.append_null(); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + continue; + } + + kind_builder.append_value(BlobKind::Inline as u8); + data_builder.append_value(binary.value(i)); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + } + } else if let Some(binary) = array.as_binary_opt::() { + for i in 0..row_count { + if binary.is_null(i) { + kind_builder.append_null(); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + continue; + } + + kind_builder.append_value(BlobKind::Inline as u8); + data_builder.append_value(binary.value(i)); + uri_builder.append_null(); + blob_id_builder.append_null(); + blob_size_builder.append_null(); + position_builder.append_null(); + } + } else { + return Err(Error::InvalidInput { + source: format!("Expected (Large)Binary array, got {}", array.data_type()) + .into(), + location: location!(), + }); + } + + let fields = Fields::from(vec![ + ArrowField::new("kind", DataType::UInt8, true), + ArrowField::new("data", DataType::LargeBinary, true), + ArrowField::new("uri", DataType::Utf8, true), + ArrowField::new("blob_id", DataType::UInt32, true), + ArrowField::new("blob_size", DataType::UInt64, true), + ArrowField::new("position", DataType::UInt64, true), + ]); + + StructArray::try_new( + fields, + vec![ + Arc::new(kind_builder.finish()) as ArrayRef, + Arc::new(data_builder.finish()) as ArrayRef, + Arc::new(uri_builder.finish()) as ArrayRef, + Arc::new(blob_id_builder.finish()) as ArrayRef, + Arc::new(blob_size_builder.finish()) as ArrayRef, + Arc::new(position_builder.finish()) as ArrayRef, + ], + nulls, + ) + .map_err(|e| Error::InvalidInput { + source: e.to_string().into(), + location: location!(), + }) + } + _ => Err(Error::InvalidInput { + source: format!("Unsupported blob v2 input type {}", array.data_type()).into(), + location: location!(), + }), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 793e8347360..3eb9a6bd250 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -389,7 +389,12 @@ impl StructuralDecodeArrayTask for RepDefStructDecodeTask { repdef.unravel_validity(length) }; - let array = StructArray::new(self.child_fields, children, validity); + let array = StructArray::try_new(self.child_fields, children, validity).map_err(|e| { + Error::InvalidInput { + source: e.to_string().into(), + location: location!(), + } + })?; Ok(DecodedArray { array: Arc::new(array), repdef, diff --git a/rust/lance-file/benches/reader.rs b/rust/lance-file/benches/reader.rs index 1c6f0fdb425..a00af5015fa 100644 --- a/rust/lance-file/benches/reader.rs +++ b/rust/lance-file/benches/reader.rs @@ -120,8 +120,7 @@ fn bench_reader(c: &mut Criterion) { } #[cfg(not(target_os = "linux"))] -pub fn drop_file_from_cache(path: impl AsRef) -> std::io::Result<()> { - println!("drop_file_from_cache: not implemented on this platform"); +pub fn drop_file_from_cache(_path: impl AsRef) -> std::io::Result<()> { Ok(()) } diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index f39531f9853..b5c21e02b87 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -541,12 +541,13 @@ fn take_struct_array(array: &StructArray, indices: &UInt64Array) -> Result Date: Tue, 20 Jan 2026 20:33:25 +0800 Subject: [PATCH 02/21] Allow both v1 and v2 during reading --- rust/lance-arrow/src/lib.rs | 7 +++++++ rust/lance/src/dataset/blob.rs | 33 ++++++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 83b8b65d954..dead45858e3 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -839,6 +839,13 @@ fn project_array(array: &ArrayRef, target_field: &Field) -> Result { } fn project(struct_array: &StructArray, fields: &Fields) -> Result { + if struct_array.fields().len() != struct_array.columns().len() { + return Err(ArrowError::SchemaError(format!( + "Invalid StructArray: {} fields but {} columns", + struct_array.fields().len(), + struct_array.columns().len() + ))); + } if fields.is_empty() { return Ok(StructArray::new_empty_fields( struct_array.len(), diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 3debbd7298f..bf0451e3a75 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -598,11 +598,10 @@ pub(super) async fn take_blobs( let row_addrs = description_and_addr.column(1).as_primitive::(); let blob_field_id = blob_field_id as u32; - match dataset.blob_version() { + match blob_version_from_descriptions(descriptions)? { BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), - BlobVersion::V2 => { - collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await - } + BlobVersion::V2 => collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs) + .await, } } @@ -645,7 +644,7 @@ pub async fn take_blobs_by_addresses( let row_addrs_result = description_and_addr.column(1).as_primitive::(); let blob_field_id = blob_field_id as u32; - match dataset.blob_version() { + match blob_version_from_descriptions(descriptions)? { BlobVersion::V1 => { collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result) } @@ -655,6 +654,30 @@ pub async fn take_blobs_by_addresses( } } +fn blob_version_from_descriptions(descriptions: &StructArray) -> Result { + let fields = descriptions.fields(); + if fields.len() == 2 && fields[0].name() == "position" && fields[1].name() == "size" { + return Ok(BlobVersion::V1); + } + if fields.len() == 5 + && fields[0].name() == "kind" + && fields[1].name() == "position" + && fields[2].name() == "size" + && fields[3].name() == "blob_id" + && fields[4].name() == "blob_uri" + { + return Ok(BlobVersion::V2); + } + Err(Error::InvalidInput { + source: format!( + "Unrecognized blob descriptions schema: expected v1 (position,size) or v2 (kind,position,size,blob_id,blob_uri) but got {:?}", + fields.iter().map(|f| f.name().as_str()).collect::>(), + ) + .into(), + location: location!(), + }) +} + fn collect_blob_files_v1( dataset: &Arc, blob_field_id: u32, From 7afd37defa26fde1d760fe5613b035289eeded7c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 21 Jan 2026 14:00:41 +0800 Subject: [PATCH 03/21] Fix tests --- rust/lance-encoding/src/encoder.rs | 60 +++++++-- .../src/encodings/logical/blob.rs | 14 +- rust/lance-encoding/src/testing.rs | 33 ++++- rust/lance-file/src/writer.rs | 47 ++++++- rust/lance/src/dataset/blob.rs | 15 ++- rust/lance/src/dataset/cleanup.rs | 53 ++++---- rust/lance/src/dataset/fragment/write.rs | 16 +-- rust/lance/src/dataset/take.rs | 47 ++++++- rust/lance/src/dataset/transaction.rs | 15 +-- rust/lance/src/dataset/updater.rs | 1 + rust/lance/src/dataset/write.rs | 94 ++++++++++---- rust/lance/src/dataset/write/insert.rs | 120 ++++++++++++++---- rust/lance/src/dataset/write/merge_insert.rs | 1 + 13 files changed, 388 insertions(+), 128 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 09bd70e2900..6f23694abde 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -19,7 +19,7 @@ use arrow_array::{Array, ArrayRef, RecordBatch}; use arrow_schema::DataType; use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; -use lance_core::datatypes::{Field, Schema}; +use lance_core::datatypes::{BlobVersion, Field, Schema}; use lance_core::error::LanceOptionExt; use lance_core::utils::bit::{is_pwr_two, pad_bytes_to}; use lance_core::{Error, Result}; @@ -302,6 +302,7 @@ pub fn default_encoding_strategy(version: LanceFileVersion) -> Box Result> { match version.resolve() { LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input( @@ -309,11 +310,18 @@ pub fn default_encoding_strategy_with_params( location!(), )), _ => { + if blob_version != BlobVersion::V1 && version < LanceFileVersion::V2_2 { + return Err(Error::InvalidInput { + source: "Blob version v2 requires file version >= 2.2".into(), + location: location!(), + }); + } let compression_strategy = Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version)); Ok(Box::new(StructuralEncodingStrategy { compression_strategy, version, + blob_version, })) } } @@ -324,6 +332,7 @@ pub fn default_encoding_strategy_with_params( pub struct StructuralEncodingStrategy { pub compression_strategy: Arc, pub version: LanceFileVersion, + pub blob_version: BlobVersion, } // For some reason, clippy thinks we can add Default to the above derive but @@ -334,6 +343,7 @@ impl Default for StructuralEncodingStrategy { Self { compression_strategy: Arc::new(DefaultCompressionStrategy::new()), version: LanceFileVersion::default(), + blob_version: BlobVersion::V1, } } } @@ -343,6 +353,18 @@ impl StructuralEncodingStrategy { Self { compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)), version, + blob_version: BlobVersion::V1, + } + } + + pub fn with_version_and_blob_version( + version: LanceFileVersion, + blob_version: BlobVersion, + ) -> Self { + Self { + compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)), + version, + blob_version, } } @@ -394,7 +416,14 @@ impl StructuralEncodingStrategy { // Check if field is marked as blob if field.is_blob() { - if self.version >= LanceFileVersion::V2_2 { + if self.blob_version == BlobVersion::V2 && self.version < LanceFileVersion::V2_2 { + return Err(Error::InvalidInput { + source: "Blob v2 requires file version >= 2.2".into(), + location: location!(), + }); + } + + if self.blob_version == BlobVersion::V2 { match data_type { DataType::Binary | DataType::LargeBinary | DataType::Struct(_) => { return Ok(Box::new(BlobV2StructuralEncoder::new( @@ -428,7 +457,7 @@ impl StructuralEncodingStrategy { } DataType::Struct(_) => { return Err(Error::InvalidInput { - source: "Blob struct input requires file version >= 2.2".into(), + source: "Blob struct input requires blob version v2".into(), location: location!(), }); } @@ -822,24 +851,35 @@ mod tests { ); // Test with V2.1 - should succeed - let strategy = - default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone()) - .expect("Should succeed for V2.1"); + let strategy = default_encoding_strategy_with_params( + LanceFileVersion::V2_1, + params.clone(), + BlobVersion::V1, + ) + .expect("Should succeed for V2.1"); // Verify it's a StructuralEncodingStrategy assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy")); assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy")); // Test with V2.0 - should fail - let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone()) - .expect_err("Should fail for V2.0"); + let err = default_encoding_strategy_with_params( + LanceFileVersion::V2_0, + params.clone(), + BlobVersion::V1, + ) + .expect_err("Should fail for V2.0"); assert!(err .to_string() .contains("only supported in Lance file version 2.1")); // Test with Legacy - should fail - let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params) - .expect_err("Should fail for Legacy"); + let err = default_encoding_strategy_with_params( + LanceFileVersion::Legacy, + params, + BlobVersion::V1, + ) + .expect_err("Should fail for Legacy"); assert!(err .to_string() .contains("only supported in Lance file version 2.1")); diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index b733d2ae954..d978e9ade0d 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -27,6 +27,8 @@ use crate::{ repdef::{DefinitionInterpretation, RepDefBuilder}, }; use lance_core::datatypes::BlobKind; +#[cfg(test)] +use lance_core::datatypes::BlobVersion; /// Blob structural encoder - stores large binary data in external buffers /// @@ -801,7 +803,9 @@ mod tests { check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], Some(Arc::new(expected_descriptor)), - &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + &TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_blob_version(BlobVersion::V2), blob_metadata, ) .await; @@ -864,7 +868,9 @@ mod tests { check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], Some(Arc::new(expected_descriptor)), - &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + &TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_blob_version(BlobVersion::V2), blob_metadata, ) .await; @@ -924,7 +930,9 @@ mod tests { check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], Some(Arc::new(expected_descriptor)), - &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + &TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_blob_version(BlobVersion::V2), blob_metadata, ) .await; diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 9b4cadfd8c7..a89454d060a 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -21,7 +21,7 @@ use futures::{future::BoxFuture, FutureExt, StreamExt}; use log::{debug, info, trace}; use tokio::sync::mpsc::{self, UnboundedSender}; -use lance_core::{utils::bit::pad_bytes, Result}; +use lance_core::{datatypes::BlobVersion, utils::bit::pad_bytes, Result}; use lance_datagen::{array, gen_batch, ArrayGenerator, RowCount, Seed}; use crate::{ @@ -32,7 +32,8 @@ use crate::{ }, encoder::{ default_encoding_strategy, ColumnIndexSequence, EncodedColumn, EncodedPage, - EncodingOptions, FieldEncoder, OutOfLineBuffers, MIN_PAGE_BUFFER_ALIGNMENT, + EncodingOptions, FieldEncoder, OutOfLineBuffers, StructuralEncodingStrategy, + MIN_PAGE_BUFFER_ALIGNMENT, }, repdef::RepDefBuilder, version::LanceFileVersion, @@ -402,6 +403,7 @@ pub struct TestCases { max_file_version: Option, verify_encoding: Option>, expected_encoding: Option>, + blob_version: Option, } impl Default for TestCases { @@ -417,6 +419,7 @@ impl Default for TestCases { max_file_version: None, verify_encoding: None, expected_encoding: None, + blob_version: None, } } } @@ -462,6 +465,11 @@ impl TestCases { self } + pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self { + self.blob_version = Some(blob_version); + self + } + pub fn with_max_file_version(mut self, version: LanceFileVersion) -> Self { self.max_file_version = Some(version); self @@ -733,7 +741,26 @@ pub async fn check_round_trip_encoding_of_data_with_expected( let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap(); for file_version in test_cases.get_versions() { for page_size in test_cases.page_sizes.iter() { - let encoding_strategy = default_encoding_strategy(file_version); + let encoding_strategy = if let Some(blob_version) = test_cases.blob_version { + if blob_version == BlobVersion::V1 { + default_encoding_strategy(file_version) + } else { + if file_version < LanceFileVersion::V2_2 { + panic!("Blob version v2 requires file version >= 2.2"); + } + match file_version.resolve() { + LanceFileVersion::Legacy | LanceFileVersion::V2_0 => { + panic!("Blob version v2 requires file version >= 2.2"); + } + _ => Box::new(StructuralEncodingStrategy::with_version_and_blob_version( + file_version, + blob_version, + )), + } + } + } else { + default_encoding_strategy(file_version) + }; let mut column_index_seq = ColumnIndexSequence::default(); let encoding_options = EncodingOptions { cache_bytes_per_column: *page_size, diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 4952d9476c4..4627b218a5f 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -12,6 +12,7 @@ use arrow_data::ArrayData; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::FuturesOrdered; use futures::StreamExt; +use lance_core::datatypes::BlobVersion; use lance_core::datatypes::{Field, Schema as LanceSchema}; use lance_core::utils::bit::pad_bytes; use lance_core::{Error, Result}; @@ -19,6 +20,7 @@ use lance_encoding::decoder::PageEncoding; use lance_encoding::encoder::{ default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage, EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers, + StructuralEncodingStrategy, }; use lance_encoding::repdef::RepDefBuilder; use lance_encoding::version::LanceFileVersion; @@ -91,6 +93,10 @@ pub struct FileWriterOptions { /// while) might keep a much larger record batch around in memory (even though most /// of that batch's data has been written to disk) pub keep_original_array: Option, + /// Controls how blob columns are encoded. + /// + /// When unset, blob columns default to blob v1 encoding. + pub blob_version: Option, pub encoding_strategy: Option>, /// The format version to use when writing the file /// @@ -309,10 +315,40 @@ impl FileWriter { schema.validate()?; let keep_original_array = self.options.keep_original_array.unwrap_or(false); - let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| { - let version = self.version(); - default_encoding_strategy(version).into() - }); + let encoding_strategy: Arc = + if let Some(encoding_strategy) = self.options.encoding_strategy.clone() { + encoding_strategy + } else { + let version = self.version(); + let blob_version = self.options.blob_version.unwrap_or(BlobVersion::V1); + if blob_version != BlobVersion::V1 && version < LanceFileVersion::V2_2 { + return Err(Error::invalid_input( + "Blob version v2 requires file version >= 2.2", + location!(), + )); + } + match version.resolve() { + LanceFileVersion::Legacy => { + return Err(Error::invalid_input( + "Cannot create encoding strategy for legacy file version", + location!(), + )); + } + LanceFileVersion::V2_0 => { + if blob_version != BlobVersion::V1 { + return Err(Error::invalid_input( + "Blob version v2 requires file version >= 2.2", + location!(), + )); + } + Arc::from(default_encoding_strategy(version)) + } + _ => Arc::new(StructuralEncodingStrategy::with_version_and_blob_version( + version, + blob_version, + )), + } + }; let encoding_options = EncodingOptions { cache_bytes_per_column, @@ -1064,6 +1100,7 @@ mod tests { let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params( LanceFileVersion::V2_1, params, + lance_core::datatypes::BlobVersion::V1, ) .unwrap(); @@ -1212,6 +1249,7 @@ mod tests { let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params( LanceFileVersion::V2_1, params, + lance_core::datatypes::BlobVersion::V1, ) .unwrap(); @@ -1314,6 +1352,7 @@ mod tests { let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params( LanceFileVersion::V2_1, params, + lance_core::datatypes::BlobVersion::V1, ) .unwrap(); diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index bf0451e3a75..e8240af9426 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -338,10 +338,6 @@ impl BlobPreprocessor { } } -pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool { - schema.fields.iter().any(|f| f.is_blob_v2()) -} - pub async fn preprocess_blob_batches( batches: &[RecordBatch], pre: &mut BlobPreprocessor, @@ -600,8 +596,9 @@ pub(super) async fn take_blobs( match blob_version_from_descriptions(descriptions)? { BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), - BlobVersion::V2 => collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs) - .await, + BlobVersion::V2 => { + collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await + } } } @@ -1104,7 +1101,11 @@ mod tests { let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); - let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); + let params = WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + blob_version: Some(lance_core::datatypes::BlobVersion::V2), + ..Default::default() + }; let dataset = Arc::new( Dataset::write(reader, &test_dir, Some(params)) .await diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index dc948c251b8..0bd90a86782 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -755,6 +755,7 @@ mod tests { use super::*; use crate::blob::{blob_field, BlobArrayBuilder}; use crate::{ + dataset::write::InsertBuilder, dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, index::vector::VectorIndexParams, }; @@ -1122,19 +1123,19 @@ mod tests { let fixture = MockDatasetFixture::try_new().unwrap(); // First version: write a packed blob (sidecar .blob file). - Dataset::write( - blob_v2_batch(100 * 1024), - &fixture.dataset_path, - Some(WriteParams { - store_params: Some(fixture.os_params()), - commit_handler: Some(Arc::new(RenameCommitHandler)), - mode: WriteMode::Create, - data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), - ..Default::default() - }), - ) - .await - .unwrap(); + let write_params = WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Create, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + blob_version: Some(lance_core::datatypes::BlobVersion::V2), + ..Default::default() + }; + InsertBuilder::new(&fixture.dataset_path) + .with_params(&write_params) + .execute_stream(blob_v2_batch(100 * 1024)) + .await + .unwrap(); assert_gt!(fixture.count_blob_files().await.unwrap(), 0); // Second version: overwrite with an inline blob (no sidecar). @@ -1167,19 +1168,19 @@ mod tests { async fn cleanup_recent_blob_v2_sidecar_files_when_verified() { let fixture = MockDatasetFixture::try_new().unwrap(); - Dataset::write( - blob_v2_batch(100 * 1024), - &fixture.dataset_path, - Some(WriteParams { - store_params: Some(fixture.os_params()), - commit_handler: Some(Arc::new(RenameCommitHandler)), - mode: WriteMode::Create, - data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), - ..Default::default() - }), - ) - .await - .unwrap(); + let write_params = WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Create, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + blob_version: Some(lance_core::datatypes::BlobVersion::V2), + ..Default::default() + }; + InsertBuilder::new(&fixture.dataset_path) + .with_params(&write_params) + .execute_stream(blob_v2_batch(100 * 1024)) + .await + .unwrap(); Dataset::write( blob_v2_batch(1024), diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index a9c87e44a05..b6e9361361f 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -4,7 +4,7 @@ use arrow_schema::Schema as ArrowSchema; use datafusion::execution::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; -use lance_core::datatypes::Schema; +use lance_core::datatypes::{BlobVersion, Schema}; use lance_core::Error; use lance_datafusion::chunker::{break_stream, chunk_stream}; use lance_datafusion::utils::StreamingWriteSource; @@ -18,7 +18,7 @@ use snafu::location; use std::borrow::Cow; use uuid::Uuid; -use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; +use crate::dataset::blob::{preprocess_blob_batches, BlobPreprocessor}; use crate::dataset::builder::DatasetBuilder; use crate::dataset::write::do_write_fragments; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; @@ -139,7 +139,6 @@ impl<'a> FragmentCreateBuilder<'a> { let filename = format!("{}.lance", data_file_key); let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); - let has_blob_v2 = schema_has_blob_v2(&schema); let obj_writer = object_store.create(&full_path).await?; let mut writer = lance_file::writer::FileWriter::try_new( obj_writer, @@ -150,15 +149,7 @@ impl<'a> FragmentCreateBuilder<'a> { }, )?; - let mut preprocessor = if has_blob_v2 { - Some(BlobPreprocessor::new( - object_store.as_ref().clone(), - base_path.child(DATA_DIR), - data_file_key.clone(), - )) - } else { - None - }; + let mut preprocessor: Option = None; let (major, minor) = writer.version().to_numbers(); @@ -227,6 +218,7 @@ impl<'a> FragmentCreateBuilder<'a> { stream, params.into_owned(), version, + BlobVersion::V1, None, // Fragment creation doesn't use target_bases ) .await diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index b5c21e02b87..cfc49fe876a 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -726,7 +726,7 @@ mod test { } #[tokio::test] - async fn test_take_blob_v2_from_legacy_large_binary_on_v2_2() { + async fn test_take_blob_v1_from_legacy_large_binary_on_v2_2_by_default() { let mut metadata = HashMap::new(); metadata.insert(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()); @@ -754,6 +754,51 @@ mod test { .await .unwrap(); + assert!(dataset + .config() + .get(crate::dataset::blob::BLOB_VERSION_CONFIG_KEY) + .is_none()); + + let proj = ProjectionRequest::from_columns(["blob"], dataset.schema()); + let values = dataset.take(&[0u64], proj).await.unwrap(); + + let struct_arr = values.column(0).as_struct(); + assert_eq!(struct_arr.fields().len(), 2); + assert_eq!(struct_arr.fields()[0].name(), "position"); + assert_eq!(struct_arr.fields()[1].name(), "size"); + } + + #[tokio::test] + async fn test_take_blob_v2_from_blob_v2_struct_on_v2_2() { + let schema = Arc::new(ArrowSchema::new(vec![crate::blob::blob_field( + "blob", true, + )])); + let mut builder = crate::blob::BlobArrayBuilder::new(1); + builder.push_bytes(b"hello").unwrap(); + let array = builder.finish().unwrap(); + + let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); + let write_params = WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + blob_version: Some(lance_core::datatypes::BlobVersion::V2), + ..Default::default() + }; + let batches = RecordBatchIterator::new([Ok(batch)], schema); + let dataset = crate::dataset::write::InsertBuilder::new("memory://") + .with_params(&write_params) + .execute_stream(batches) + .await + .unwrap(); + + assert_eq!( + dataset + .config() + .get(crate::dataset::blob::BLOB_VERSION_CONFIG_KEY) + .unwrap() + .as_str(), + lance_core::datatypes::BlobVersion::V2.config_value() + ); + let proj = ProjectionRequest::from_columns(["blob"], dataset.schema()); let values = dataset.take(&[0u64], proj).await.unwrap(); diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 090ef465727..593b58106fc 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -46,12 +46,12 @@ //! use super::write::merge_insert::inserted_rows::KeyExistenceFilter; -use super::{blob::BLOB_VERSION_CONFIG_KEY, ManifestWriteConfig}; +use super::ManifestWriteConfig; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::index::mem_wal::update_mem_wal_index_merged_generations; use crate::utils::temporal::timestamp_to_nanos; use deepsize::DeepSizeOf; -use lance_core::{datatypes::BlobVersion, datatypes::Schema, Error, Result}; +use lance_core::{datatypes::Schema, Error, Result}; use lance_file::{datatypes::Fields, version::LanceFileVersion}; use lance_index::mem_wal::MergedGeneration; use lance_index::{frag_reuse::FRAG_REUSE_INDEX_NAME, is_system_index}; @@ -2175,19 +2175,12 @@ impl Transaction { } else { let data_storage_format = Self::data_storage_format_from_files(&final_fragments, user_requested_version)?; - let mut manifest = Manifest::new( + Manifest::new( schema, Arc::new(final_fragments), data_storage_format, reference_paths, - ); - if manifest.data_storage_format.lance_file_version()? >= LanceFileVersion::V2_2 { - manifest.config_mut().insert( - BLOB_VERSION_CONFIG_KEY.to_string(), - BlobVersion::V2.config_value().to_string(), - ); - } - manifest + ) }; manifest.tag.clone_from(&self.tag); diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 7197f877943..57871761d96 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -148,6 +148,7 @@ impl Updater { &schema, &self.fragment.dataset().base, data_storage_version, + self.fragment.dataset().blob_version(), ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index d953ea4dd44..f0fd22b7375 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -33,7 +33,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tracing::{info, instrument}; -use crate::dataset::blob::{preprocess_blob_batches, schema_has_blob_v2, BlobPreprocessor}; +use crate::dataset::blob::{preprocess_blob_batches, BlobPreprocessor}; use crate::session::Session; use crate::Dataset; @@ -43,14 +43,6 @@ use super::transaction::Transaction; use super::utils::SchemaAdapter; use super::DATA_DIR; -pub(super) fn blob_version_for(storage_version: LanceFileVersion) -> BlobVersion { - if storage_version >= LanceFileVersion::V2_2 { - BlobVersion::V2 - } else { - BlobVersion::V1 - } -} - mod commit; pub mod delete; mod insert; @@ -252,6 +244,13 @@ pub struct WriteParams { /// These will be resolved to IDs when the write operation executes. /// Resolution happens at builder execution time when dataset context is available. pub target_base_names_or_paths: Option>, + + /// If set and this write creates a new dataset, the blob encoding version to persist + /// into the dataset config as `lance.blob.version`. + /// + /// For existing datasets, the blob version is determined by the dataset config and + /// must not be changed by writes. + pub blob_version: Option, } impl Default for WriteParams { @@ -276,6 +275,7 @@ impl Default for WriteParams { initial_bases: None, target_bases: None, target_base_names_or_paths: None, + blob_version: None, } } } @@ -376,6 +376,7 @@ pub async fn write_fragments( .await } +#[allow(clippy::too_many_arguments)] pub async fn do_write_fragments( object_store: Arc, base_dir: &Path, @@ -383,6 +384,7 @@ pub async fn do_write_fragments( data: SendableRecordBatchStream, params: WriteParams, storage_version: LanceFileVersion, + blob_version: BlobVersion, target_bases_info: Option>, ) -> Result> { let adapter = SchemaAdapter::new(data.schema()); @@ -404,6 +406,7 @@ pub async fn do_write_fragments( base_dir, schema, storage_version, + blob_version, target_bases_info, ); let mut writer: Option> = None; @@ -571,9 +574,10 @@ pub async fn write_fragments_internal( base_dir: &Path, schema: Schema, data: SendableRecordBatchStream, - mut params: WriteParams, + params: WriteParams, target_bases_info: Option>, ) -> Result<(Vec, Schema)> { + let mut params = params; let adapter = SchemaAdapter::new(data.schema()); let (data, converted_schema) = if adapter.requires_physical_conversion() { @@ -637,14 +641,30 @@ pub async fn write_fragments_internal( (converted_schema, params.storage_version_or_default()) }; - let target_blob_version = blob_version_for(storage_version); - if let Some(dataset) = dataset { + let requested_blob_version = params.blob_version; + + let target_blob_version = requested_blob_version + .or_else(|| dataset.map(|d| d.blob_version())) + .unwrap_or(BlobVersion::V1); + + if storage_version < LanceFileVersion::V2_2 && target_blob_version == BlobVersion::V2 { + return Err(Error::InvalidInput { + source: format!( + "Blob version v2 requires file version >= 2.2 (got {:?})", + storage_version + ) + .into(), + location: location!(), + }); + } + + if let (Some(dataset), Some(requested_blob_version)) = (dataset, requested_blob_version) { let existing_version = dataset.blob_version(); - if existing_version != target_blob_version { + if existing_version != requested_blob_version { return Err(Error::InvalidInput { source: format!( "Blob column version mismatch. Existing dataset uses {:?} but requested write requires {:?}. Changing blob version is not allowed", - existing_version, target_blob_version + existing_version, requested_blob_version ) .into(), location: location!(), @@ -659,6 +679,7 @@ pub async fn write_fragments_internal( data, params, storage_version, + target_blob_version, target_bases_info, ) .await?; @@ -774,8 +795,18 @@ pub async fn open_writer( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, + blob_version: BlobVersion, ) -> Result> { - open_writer_with_options(object_store, schema, base_dir, storage_version, true, None).await + open_writer_with_options( + object_store, + schema, + base_dir, + storage_version, + blob_version, + true, + None, + ) + .await } pub async fn open_writer_with_options( @@ -783,6 +814,7 @@ pub async fn open_writer_with_options( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, + blob_version: BlobVersion, add_data_dir: bool, base_id: Option, ) -> Result> { @@ -815,19 +847,21 @@ pub async fn open_writer_with_options( writer, schema.clone(), FileWriterOptions { + blob_version: (blob_version == BlobVersion::V2).then_some(blob_version), format_version: Some(storage_version), ..Default::default() }, )?; - let preprocessor = if schema_has_blob_v2(schema) { - Some(BlobPreprocessor::new( - object_store.clone(), - data_dir.clone(), - data_file_key.clone(), - )) - } else { - None - }; + let preprocessor = + if storage_version >= LanceFileVersion::V2_2 && blob_version == BlobVersion::V2 { + Some(BlobPreprocessor::new( + object_store.clone(), + data_dir.clone(), + data_file_key.clone(), + )) + } else { + None + }; let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, @@ -859,6 +893,7 @@ struct WriterGenerator { base_dir: Path, schema: Schema, storage_version: LanceFileVersion, + blob_version: BlobVersion, /// Target base information (if writing to specific bases) target_bases_info: Option>, /// Counter for round-robin selection @@ -871,6 +906,7 @@ impl WriterGenerator { base_dir: &Path, schema: &Schema, storage_version: LanceFileVersion, + blob_version: BlobVersion, target_bases_info: Option>, ) -> Self { Self { @@ -878,6 +914,7 @@ impl WriterGenerator { base_dir: base_dir.clone(), schema: schema.clone(), storage_version, + blob_version, target_bases_info, next_base_index: AtomicUsize::new(0), } @@ -904,16 +941,20 @@ impl WriterGenerator { &self.schema, &base_info.base_dir, self.storage_version, + self.blob_version, base_info.is_dataset_root, Some(base_info.base_id), ) .await? } else { - open_writer( + open_writer_with_options( &self.object_store, &self.schema, &self.base_dir, self.storage_version, + self.blob_version, + true, + None, ) .await? }; @@ -1546,6 +1587,7 @@ mod tests { &base_dir, &schema, LanceFileVersion::Stable, + BlobVersion::V1, Some(target_bases), ); @@ -1590,6 +1632,7 @@ mod tests { &schema, &base_dir, LanceFileVersion::Stable, + BlobVersion::V1, false, // Don't add /data None, ) @@ -1656,6 +1699,7 @@ mod tests { &Path::from("default"), &schema, LanceFileVersion::Stable, + BlobVersion::V1, Some(target_bases), ); diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 7763c5e8f7c..4d193cd28be 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use arrow_array::{RecordBatch, RecordBatchIterator}; use datafusion::execution::SendableRecordBatchStream; use humantime::format_duration; -use lance_core::datatypes::{NullabilityComparison, Schema, SchemaCompareOptions}; +use lance_core::datatypes::{BlobVersion, NullabilityComparison, Schema, SchemaCompareOptions}; use lance_core::utils::tracing::{DATASET_WRITING_EVENT, TRACE_DATASET_EVENTS}; use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; use lance_datafusion::utils::StreamingWriteSource; @@ -19,6 +19,7 @@ use lance_table::io::commit::CommitHandler; use object_store::path::Path; use snafu::location; +use crate::dataset::blob::BLOB_VERSION_CONFIG_KEY; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; @@ -216,28 +217,43 @@ impl<'a> InsertBuilder<'a> { ) -> Result { let operation = match context.params.mode { WriteMode::Create => { - let config_upsert_values = - if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { - let mut upsert_values = HashMap::new(); - upsert_values.insert( - String::from("lance.auto_cleanup.interval"), - auto_cleanup_params.interval.to_string(), - ); - - let duration = auto_cleanup_params.older_than.to_std().map_err(|e| { - Error::InvalidInput { - source: e.into(), - location: location!(), - } - })?; - upsert_values.insert( - String::from("lance.auto_cleanup.older_than"), - format_duration(duration).to_string(), - ); - Some(upsert_values) - } else { - None - }; + let mut upsert_values = HashMap::new(); + if let Some(auto_cleanup_params) = context.params.auto_cleanup.as_ref() { + upsert_values.insert( + String::from("lance.auto_cleanup.interval"), + auto_cleanup_params.interval.to_string(), + ); + + let duration = auto_cleanup_params.older_than.to_std().map_err(|e| { + Error::InvalidInput { + source: e.into(), + location: location!(), + } + })?; + upsert_values.insert( + String::from("lance.auto_cleanup.older_than"), + format_duration(duration).to_string(), + ); + } + if let Some(blob_version) = context.params.blob_version { + if blob_version != BlobVersion::V1 + && context.storage_version < LanceFileVersion::V2_2 + { + return Err(Error::InvalidInput { + source: "Blob version v2 requires file version >= 2.2".into(), + location: location!(), + }); + } + upsert_values.insert( + BLOB_VERSION_CONFIG_KEY.to_string(), + blob_version.config_value().to_string(), + ); + } + let config_upsert_values = if upsert_values.is_empty() { + None + } else { + Some(upsert_values) + }; Operation::Overwrite { // Use the full schema, not the written schema schema, @@ -434,9 +450,14 @@ struct WriteContext<'a> { #[cfg(test)] mod test { - use arrow_array::{Int32Array, RecordBatchReader, StructArray}; + use std::collections::HashMap; + + use arrow_array::{BinaryArray, Int32Array, RecordBatchReader, StructArray}; use arrow_schema::{ArrowError, DataType, Field, Schema}; + use lance_arrow::BLOB_META_KEY; + use lance_core::datatypes::BlobVersion; + use crate::dataset::ProjectionRequest; use crate::session::Session; use super::*; @@ -488,7 +509,7 @@ mod test { } #[tokio::test] - async fn prevent_blob_version_upgrade_on_overwrite() { + async fn allow_overwrite_to_v2_2_without_blob_upgrade() { let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) .unwrap(); @@ -513,7 +534,54 @@ mod test { .execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())) .await; - assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn create_v2_2_dataset_with_forced_blob_v2() { + let schema = Arc::new(Schema::new(vec![Field::new( + "blob", + DataType::Binary, + false, + ) + .with_metadata(HashMap::from([( + BLOB_META_KEY.to_string(), + "true".to_string(), + )]))])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(BinaryArray::from(vec![Some(b"abc".as_slice())]))], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://forced-blob-v2") + .with_params(&WriteParams { + mode: WriteMode::Create, + data_storage_version: Some(LanceFileVersion::V2_2), + blob_version: Some(BlobVersion::V2), + ..Default::default() + }) + .execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())) + .await + .unwrap(); + + assert_eq!( + dataset + .manifest + .config + .get(BLOB_VERSION_CONFIG_KEY) + .map(String::as_str), + Some("2") + ); + + let batch = dataset + .take( + &[0u64], + ProjectionRequest::from_columns(["blob"], dataset.schema()), + ) + .await + .unwrap(); + assert_eq!(batch.num_rows(), 1); } mod external_error { diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index ee03bf2aa62..629ca428c51 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -930,6 +930,7 @@ impl MergeInsertJob { &write_schema, &dataset.base, data_storage_version, + dataset.blob_version(), ) .await?; From 234f2c2c5192ae6dc1f34e917b292aea53a55e94 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 21 Jan 2026 16:50:40 +0800 Subject: [PATCH 04/21] Fix python --- python/python/lance/dataset.py | 4 ++++ python/python/tests/test_blob.py | 12 ++++++++++-- python/src/dataset.rs | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7b19ad72c80..457113fbdbe 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -5540,6 +5540,7 @@ def write_dataset( data_storage_version: Optional[ Literal["stable", "2.0", "2.1", "2.2", "next", "legacy", "0.1"] ] = None, + blob_version: Optional[Literal["v1", "v2"]] = None, use_legacy_format: Optional[bool] = None, enable_v2_manifest_paths: bool = True, enable_stable_row_ids: bool = False, @@ -5802,6 +5803,9 @@ def write_dataset( "target_bases": target_bases, } + if blob_version is not None: + params["blob_version"] = blob_version + # Add storage_options_provider if created from namespace if storage_options_provider is not None: params["storage_options_provider"] = storage_options_provider diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 48d724fbf15..bbaeaae316a 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -259,7 +259,12 @@ def test_scan_blob(tmp_path, dataset_with_blobs): def test_blob_extension_write_inline(tmp_path): table = pa.table({"blob": lance.blob_array([b"foo", b"bar"])}) - ds = lance.write_dataset(table, tmp_path / "test_ds_v2", data_storage_version="2.2") + ds = lance.write_dataset( + table, + tmp_path / "test_ds_v2", + data_storage_version="2.2", + blob_version="v2", + ) desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) assert pa.types.is_struct(desc.type) @@ -276,7 +281,10 @@ def test_blob_extension_write_external(tmp_path): table = pa.table({"blob": lance.blob_array([uri])}) ds = lance.write_dataset( - table, tmp_path / "test_ds_v2_external", data_storage_version="2.2" + table, + tmp_path / "test_ds_v2_external", + data_storage_version="2.2", + blob_version="v2", ) blob = ds.take_blobs("blob", indices=[0])[0] diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 09f28218241..f67d23906b2 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -60,6 +60,7 @@ use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt}; use lance::{dataset::builder::DatasetBuilder, index::vector::IndexFileVersion}; use lance_arrow::as_fixed_size_list_array; use lance_core::Error; +use lance_core::datatypes::BlobVersion; use lance_datafusion::utils::reader_to_stream; use lance_encoding::decoder::DecoderConfig; use lance_file::reader::FileReaderOptions; @@ -3133,6 +3134,9 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult(options, "blob_version")? { + p.blob_version = Some(parse_blob_version(&blob_version)?); + } if let Some(progress) = get_dict_opt::>(options, "progress")? { p.progress = Arc::new(PyWriteProgress::new(progress.into_py_any(options.py())?)); } @@ -3235,6 +3239,16 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult PyResult { + match value.to_lowercase().as_str() { + "v1" | "1" => Ok(BlobVersion::V1), + "v2" | "2" => Ok(BlobVersion::V2), + _ => Err(PyValueError::new_err(format!( + "Invalid blob_version: {value} (expected 'v1' or 'v2')" + ))), + } +} + fn prepare_vector_index_params( index_type: &str, column_type: &DataType, From e6259661f5ef32411e9b962bfa97439fc52ee06c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 21 Jan 2026 18:18:28 +0800 Subject: [PATCH 05/21] Fix ci --- rust/lance/src/dataset/blob.rs | 203 +++++++++++++++++--------------- rust/lance/src/dataset/write.rs | 1 + 2 files changed, 107 insertions(+), 97 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index d45cfe33133..1fd3b270509 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -130,15 +130,34 @@ pub struct BlobPreprocessor { data_file_key: String, local_counter: u32, pack_writer: PackWriter, + blob_v2_cols: Vec, + dedicated_thresholds: Vec, + writer_metadata: Vec>, } impl BlobPreprocessor { - pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self { + pub(crate) fn new( + object_store: ObjectStore, + data_dir: Path, + data_file_key: String, + schema: &lance_core::datatypes::Schema, + ) -> Self { let pack_writer = PackWriter::new( object_store.clone(), data_dir.clone(), data_file_key.clone(), ); + let arrow_schema = arrow_schema::Schema::from(schema); + let fields = arrow_schema.fields(); + let blob_v2_cols = fields.iter().map(|field| field.is_blob_v2()).collect(); + let dedicated_thresholds = fields + .iter() + .map(|field| dedicated_threshold_from_metadata(field.as_ref())) + .collect(); + let writer_metadata = fields + .iter() + .map(|field| field.metadata().clone()) + .collect(); Self { object_store, data_dir, @@ -146,6 +165,9 @@ impl BlobPreprocessor { // Start at 1 to avoid a potential all-zero blob_id value. local_counter: 1, pack_writer, + blob_v2_cols, + dedicated_thresholds, + writer_metadata, } } @@ -177,23 +199,33 @@ impl BlobPreprocessor { .await } pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { + let expected_columns = self.blob_v2_cols.len(); + if batch.num_columns() != expected_columns { + return Err(Error::invalid_input( + format!( + "Unexpected number of columns: expected {}, got {}", + expected_columns, + batch.num_columns() + ), + location!(), + )); + } + + let batch_schema = batch.schema(); + let batch_fields = batch_schema.fields(); + let mut new_columns = Vec::with_capacity(batch.num_columns()); let mut new_fields = Vec::with_capacity(batch.num_columns()); - for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - if !field.is_blob_v2() { + for idx in 0..batch.num_columns() { + let array = batch.column(idx); + let field = &batch_fields[idx]; + if !self.blob_v2_cols[idx] { new_columns.push(array.clone()); new_fields.push(field.clone()); continue; } - let dedicated_threshold = field - .metadata() - .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY) - .and_then(|value| value.parse::().ok()) - .filter(|&value| value > DEDICATED_THRESHOLD) - .unwrap_or(DEDICATED_THRESHOLD); - let struct_arr = array .as_any() .downcast_ref::() @@ -241,6 +273,7 @@ impl BlobPreprocessor { let has_uri = !uri_col.is_null(i); let data_len = if has_data { data_col.value(i).len() } else { 0 }; + let dedicated_threshold = self.dedicated_thresholds[idx]; if has_data && data_len > dedicated_threshold { let blob_id = self.next_blob_id(); self.write_dedicated(blob_id, data_col.value(i)).await?; @@ -324,7 +357,7 @@ impl BlobPreprocessor { ArrowDataType::Struct(child_fields.into()), field.is_nullable(), ) - .with_metadata(field.metadata().clone()), + .with_metadata(self.writer_metadata[idx].clone()), )); } @@ -333,7 +366,7 @@ impl BlobPreprocessor { .iter() .map(|f| f.as_ref().clone()) .collect::>(), - batch.schema().metadata().clone(), + batch_schema.metadata().clone(), )); RecordBatch::try_new(new_schema, new_columns) @@ -345,6 +378,16 @@ impl BlobPreprocessor { } } +fn dedicated_threshold_from_metadata(field: &arrow_schema::Field) -> usize { + field + .metadata() + .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY) + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .and_then(|value| usize::try_from(value).ok()) + .unwrap_or(DEDICATED_THRESHOLD) +} + pub async fn preprocess_blob_batches( batches: &[RecordBatch], pre: &mut BlobPreprocessor, @@ -816,9 +859,9 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; use lance_arrow::{DataTypeExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY}; + use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_io::stream::RecordBatchStream; - use lance_core::datatypes::BlobKind; use lance_core::{utils::tempfile::TempStrDir, Error, Result}; use lance_datagen::{array, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; @@ -1132,108 +1175,74 @@ mod tests { assert_eq!(second.as_ref(), b"world"); } - fn build_schema_with_meta(threshold_opt: Option) -> Arc { - let mut blob_field_with_meta = blob_field("blob", true); - if let Some(threshold) = threshold_opt { - let mut metadata = blob_field_with_meta.metadata().clone(); - metadata.insert( - BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), - threshold.to_string(), - ); - blob_field_with_meta = blob_field_with_meta.with_metadata(metadata); - } + async fn preprocess_kind_with_schema_metadata(metadata_value: &str, data_len: usize) -> u8 { + let (object_store, base_path) = ObjectStore::from_uri_and_params( + Arc::new(ObjectStoreRegistry::default()), + "memory://blob_preprocessor", + &ObjectStoreParams::default(), + ) + .await + .unwrap(); + let object_store = object_store.as_ref().clone(); + let data_dir = base_path.child("data"); + + let mut field = blob_field("blob", true); + let mut metadata = field.metadata().clone(); + metadata.insert( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), + metadata_value.to_string(), + ); + field = field.with_metadata(metadata); - Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - blob_field_with_meta, - ])) - } + let writer_arrow_schema = Schema::new(vec![field.clone()]); + let writer_schema = lance_core::datatypes::Schema::try_from(&writer_arrow_schema).unwrap(); - async fn write_then_get_blob_kinds( - blob_sizes: Vec, - threshold_opt: Option, - ) -> Vec { - let test_dir = TempStrDir::default(); + let mut preprocessor = super::BlobPreprocessor::new( + object_store.clone(), + data_dir, + "data_file_key".to_string(), + &writer_schema, + ); - let mut blob_builder = BlobArrayBuilder::new(blob_sizes.len()); - for size in &blob_sizes { - blob_builder.push_bytes(vec![0u8; *size]).unwrap(); - } + let mut blob_builder = BlobArrayBuilder::new(1); + blob_builder.push_bytes(vec![0u8; data_len]).unwrap(); let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); - let id_values: Vec = (0..blob_sizes.len() as u32).collect(); - let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(id_values)); - - let schema = build_schema_with_meta(threshold_opt); + let field_without_metadata = + Field::new("blob", field.data_type().clone(), field.is_nullable()); + let batch_schema = Arc::new(Schema::new(vec![field_without_metadata])); + let batch = RecordBatch::try_new(batch_schema, vec![blob_array]).unwrap(); - let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); - let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); - - let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); - let dataset = Arc::new( - Dataset::write(reader, &test_dir, Some(params)) - .await - .unwrap(), - ); - - let indices: Vec = (0..blob_sizes.len() as u64).collect(); - let blobs = dataset - .take_blobs_by_indices(&indices, "blob") - .await + let out = preprocessor.preprocess_batch(&batch).await.unwrap(); + let struct_arr = out + .column(0) + .as_any() + .downcast_ref::() .unwrap(); - - assert_eq!(blobs.len(), blob_sizes.len()); - - blobs.into_iter().map(|b| b.kind()).collect() + struct_arr + .column_by_name("kind") + .unwrap() + .as_primitive::() + .value(0) } #[tokio::test] async fn test_blob_v2_dedicated_threshold_ignores_non_positive_metadata() { - let small_blob_len = super::DEDICATED_THRESHOLD / 2; - let large_blob_len = super::DEDICATED_THRESHOLD + 1; - - // Sanity check assumptions for this test - assert!(small_blob_len > super::INLINE_MAX); - - let cases = vec![(None, "no_metadata"), (Some(0), "zero_threshold")]; - - for (threshold_opt, label) in cases { - let kinds = - write_then_get_blob_kinds(vec![small_blob_len, large_blob_len], threshold_opt) - .await; - - assert_eq!(kinds.len(), 2, "case: {label}"); - assert_eq!(kinds[0], BlobKind::Packed, "case: {label}"); - assert_eq!(kinds[1], BlobKind::Dedicated, "case: {label}"); - } + let kind = preprocess_kind_with_schema_metadata("0", 256 * 1024).await; + assert_eq!(kind, lance_core::datatypes::BlobKind::Packed as u8); } #[tokio::test] async fn test_blob_v2_dedicated_threshold_respects_smaller_metadata() { - let blob_len = super::DEDICATED_THRESHOLD / 2; - let overridden_threshold = super::DEDICATED_THRESHOLD / 4; - - assert!(blob_len > super::INLINE_MAX); - assert!(overridden_threshold > 0); - assert!(blob_len > overridden_threshold); - - let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await; - - assert_eq!(kinds.len(), 1); - assert_eq!(kinds[0], BlobKind::Packed); + let kind = preprocess_kind_with_schema_metadata("131072", 256 * 1024).await; + assert_eq!(kind, lance_core::datatypes::BlobKind::Dedicated as u8); } #[tokio::test] async fn test_blob_v2_dedicated_threshold_respects_larger_metadata() { - let blob_len = super::DEDICATED_THRESHOLD + 1; - let overridden_threshold = super::DEDICATED_THRESHOLD * 2; - - assert!(blob_len > super::INLINE_MAX); - assert!(blob_len < overridden_threshold); - - let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await; - - assert_eq!(kinds.len(), 1); - assert_eq!(kinds[0], BlobKind::Packed); + let kind = + preprocess_kind_with_schema_metadata("8388608", super::DEDICATED_THRESHOLD + 1024) + .await; + assert_eq!(kind, lance_core::datatypes::BlobKind::Packed as u8); } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index f0fd22b7375..3b88f5e8613 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -858,6 +858,7 @@ pub async fn open_writer_with_options( object_store.clone(), data_dir.clone(), data_file_key.clone(), + schema, )) } else { None From 31661a7f88a8173bed27881b4642155004421a52 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 21 Jan 2026 18:24:10 +0800 Subject: [PATCH 06/21] Format code --- python/src/dataset.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 743dac9ef47..078be227422 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -58,8 +58,8 @@ use lance::index::vector::utils::get_vector_type; use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt}; use lance::{dataset::builder::DatasetBuilder, index::vector::IndexFileVersion}; use lance_arrow::as_fixed_size_list_array; -use lance_core::Error; use lance_core::datatypes::BlobVersion; +use lance_core::Error; use lance_datafusion::utils::reader_to_stream; use lance_encoding::decoder::DecoderConfig; use lance_file::reader::FileReaderOptions; From 0086b1a46602a9653ff73e6e4c5d63ef01f80413 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 00:40:55 +0800 Subject: [PATCH 07/21] Use blob v2 by default for V2_2 writes --- python/python/lance/dataset.py | 4 - python/python/tests/test_blob.py | 2 - python/src/dataset.rs | 14 -- rust/lance-core/src/datatypes/field.rs | 77 +++------ rust/lance-core/src/datatypes/schema.rs | 30 +--- rust/lance-datafusion/src/projection.rs | 36 ++--- .../src/encodings/logical/blob.rs | 18 ++- rust/lance-encoding/src/testing.rs | 3 +- rust/lance/src/dataset.rs | 20 +-- rust/lance/src/dataset/blob.rs | 10 -- rust/lance/src/dataset/cleanup.rs | 2 - rust/lance/src/dataset/fragment/write.rs | 3 +- rust/lance/src/dataset/scanner.rs | 9 +- rust/lance/src/dataset/take.rs | 32 +--- rust/lance/src/dataset/updater.rs | 1 - rust/lance/src/dataset/write.rs | 151 +++++++++--------- rust/lance/src/dataset/write/insert.rs | 55 ++----- rust/lance/src/dataset/write/merge_insert.rs | 1 - 18 files changed, 156 insertions(+), 312 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 457113fbdbe..7b19ad72c80 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -5540,7 +5540,6 @@ def write_dataset( data_storage_version: Optional[ Literal["stable", "2.0", "2.1", "2.2", "next", "legacy", "0.1"] ] = None, - blob_version: Optional[Literal["v1", "v2"]] = None, use_legacy_format: Optional[bool] = None, enable_v2_manifest_paths: bool = True, enable_stable_row_ids: bool = False, @@ -5803,9 +5802,6 @@ def write_dataset( "target_bases": target_bases, } - if blob_version is not None: - params["blob_version"] = blob_version - # Add storage_options_provider if created from namespace if storage_options_provider is not None: params["storage_options_provider"] = storage_options_provider diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index bbaeaae316a..16d14779341 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -263,7 +263,6 @@ def test_blob_extension_write_inline(tmp_path): table, tmp_path / "test_ds_v2", data_storage_version="2.2", - blob_version="v2", ) desc = ds.to_table(columns=["blob"]).column("blob").chunk(0) @@ -284,7 +283,6 @@ def test_blob_extension_write_external(tmp_path): table, tmp_path / "test_ds_v2_external", data_storage_version="2.2", - blob_version="v2", ) blob = ds.take_blobs("blob", indices=[0])[0] diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 078be227422..2c4eb737bd6 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -58,7 +58,6 @@ use lance::index::vector::utils::get_vector_type; use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt}; use lance::{dataset::builder::DatasetBuilder, index::vector::IndexFileVersion}; use lance_arrow::as_fixed_size_list_array; -use lance_core::datatypes::BlobVersion; use lance_core::Error; use lance_datafusion::utils::reader_to_stream; use lance_encoding::decoder::DecoderConfig; @@ -3088,9 +3087,6 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult(options, "blob_version")? { - p.blob_version = Some(parse_blob_version(&blob_version)?); - } if let Some(progress) = get_dict_opt::>(options, "progress")? { p.progress = Arc::new(PyWriteProgress::new(progress.into_py_any(options.py())?)); } @@ -3193,16 +3189,6 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult PyResult { - match value.to_lowercase().as_str() { - "v1" | "1" => Ok(BlobVersion::V1), - "v2" | "2" => Ok(BlobVersion::V2), - _ => Err(PyValueError::new_err(format!( - "Invalid blob_version: {value} (expected 'v1' or 'v2')" - ))), - } -} - fn prepare_vector_index_params( index_type: &str, column_type: &DataType, diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index a091632c524..a2f21585b34 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -99,25 +99,6 @@ pub enum BlobVersion { /// Blob v2 struct format. V2, } - -impl BlobVersion { - /// Convert a persisted string value (e.g. table config) into a blob version - pub fn from_config_value(value: &str) -> Option { - match value { - "1" => Some(Self::V1), - "2" => Some(Self::V2), - _ => None, - } - } - - /// Persistable string representation for table config. - pub fn config_value(self) -> &'static str { - match self { - Self::V1 => "1", - Self::V2 => "2", - } - } -} /// Encoding enum. #[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] pub enum Encoding { @@ -302,11 +283,7 @@ impl Field { } else { let mut new_field = self.clone(); new_field.children = children; - Some( - projection - .blob_handling - .unload_if_needed(new_field, projection.blob_version), - ) + Some(projection.blob_handling.unload_if_needed(new_field)) } } @@ -562,28 +539,6 @@ impl Field { } } - /// If the field is a blob, return a new field with the same name and id - /// but with the data type set to a struct of the blob description fields. - /// - /// If the field is not a blob, return the field itself. - pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self { - if self.is_blob() { - match version { - BlobVersion::V2 => { - self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); - self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); - self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone(); - } - BlobVersion::V1 => { - self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); - self.children = BLOB_DESC_LANCE_FIELD.children.clone(); - self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); - } - } - } - self - } - pub fn project(&self, path_components: &[&str]) -> Result { let mut f = Self { name: self.name.clone(), @@ -1806,14 +1761,34 @@ mod tests { } #[test] - fn blob_into_unloaded_selects_v2_layout() { + fn blob_unloaded_mut_selects_layout_from_metadata() { let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]); - let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) + let mut field: Field = ArrowField::new("blob", DataType::LargeBinary, true) .with_metadata(metadata) .try_into() .unwrap(); - let unloaded = field.into_unloaded_with_version(BlobVersion::V2); - assert_eq!(unloaded.children.len(), 5); - assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); + field.unloaded_mut(); + assert_eq!(field.children.len(), 2); + assert_eq!(field.logical_type, BLOB_DESC_LANCE_FIELD.logical_type); + + let metadata = + HashMap::from([(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())]); + let mut field: Field = ArrowField::new( + "blob", + DataType::Struct( + vec![ + ArrowField::new("data", DataType::LargeBinary, true), + ArrowField::new("uri", DataType::Utf8, true), + ] + .into(), + ), + true, + ) + .with_metadata(metadata) + .try_into() + .unwrap(); + field.unloaded_mut(); + assert_eq!(field.children.len(), 5); + assert_eq!(field.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } } diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 3e8340bd19b..88181a75b2e 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -15,7 +15,7 @@ use deepsize::DeepSizeOf; use lance_arrow::*; use snafu::location; -use super::field::{BlobVersion, Field, OnTypeMismatch, SchemaCompareOptions}; +use super::field::{Field, OnTypeMismatch, SchemaCompareOptions}; use crate::{Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, WILDCARD}; /// Lance Schema. @@ -1024,12 +1024,11 @@ impl BlobHandling { } } - pub fn unload_if_needed(&self, field: Field, version: BlobVersion) -> Field { + pub fn unload_if_needed(&self, mut field: Field) -> Field { if self.should_unload(&field) { - field.into_unloaded_with_version(version) - } else { - field + field.unloaded_mut(); } + field } } @@ -1046,7 +1045,6 @@ pub struct Projection { pub with_row_last_updated_at_version: bool, pub with_row_created_at_version: bool, pub blob_handling: BlobHandling, - pub blob_version: BlobVersion, } impl Debug for Projection { @@ -1064,7 +1062,6 @@ impl Debug for Projection { &self.with_row_created_at_version, ) .field("blob_handling", &self.blob_handling) - .field("blob_version", &self.blob_version) .finish() } } @@ -1080,7 +1077,6 @@ impl Projection { with_row_last_updated_at_version: false, with_row_created_at_version: false, blob_handling: BlobHandling::default(), - blob_version: BlobVersion::V1, } } @@ -1114,11 +1110,6 @@ impl Projection { self } - pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self { - self.blob_version = blob_version; - self - } - fn add_field_children(field_ids: &mut HashSet, field: &Field) { for child in &field.children { field_ids.insert(child.id); @@ -1583,19 +1574,6 @@ mod tests { use super::*; - #[test] - fn projection_from_schema_defaults_to_v1() { - let field = Field::try_from(&ArrowField::new("a", ArrowDataType::Int32, true)).unwrap(); - let schema = Schema { - fields: vec![field], - metadata: HashMap::new(), - }; - - let projection = Projection::empty(Arc::new(schema)); - - assert_eq!(projection.blob_version, BlobVersion::V1); - } - #[test] fn test_resolve_with_quoted_fields() { // Create a schema with fields containing dots diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index f586ac4bb20..c036c1db6c5 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -15,7 +15,7 @@ use std::{ use tracing::instrument; use lance_core::{ - datatypes::{BlobVersion, OnMissing, Projectable, Projection, Schema}, + datatypes::{OnMissing, Projectable, Projection, Schema}, Error, Result, ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET, WILDCARD, }; @@ -38,16 +38,11 @@ struct ProjectionBuilder { needs_row_created_at: bool, must_add_row_offset: bool, has_wildcard: bool, - blob_version: BlobVersion, } impl ProjectionBuilder { - fn new(base: Arc, blob_version: BlobVersion) -> Self { - let full_schema = Arc::new( - Projection::full(base.clone()) - .with_blob_version(blob_version) - .to_arrow_schema(), - ); + fn new(base: Arc) -> Self { + let full_schema = Arc::new(Projection::full(base.clone()).to_arrow_schema()); let full_schema = Arc::new(ProjectionPlan::add_system_columns(&full_schema)); let planner = Planner::new(full_schema); @@ -64,7 +59,6 @@ impl ProjectionBuilder { needs_row_last_updated_at: false, must_add_row_offset: false, has_wildcard: false, - blob_version, } } @@ -153,8 +147,6 @@ impl ProjectionBuilder { .union_columns(&self.physical_cols, OnMissing::Ignore)? }; - physical_projection = physical_projection.with_blob_version(self.blob_version); - physical_projection.with_row_id = self.needs_row_id; physical_projection.with_row_addr = self.needs_row_addr || self.must_add_row_offset; physical_projection.with_row_last_updated_at_version = self.needs_row_last_updated_at; @@ -211,9 +203,8 @@ impl ProjectionPlan { pub fn from_expressions( base: Arc, columns: &[(impl AsRef, impl AsRef)], - blob_version: BlobVersion, ) -> Result { - let mut builder = ProjectionBuilder::new(base, blob_version); + let mut builder = ProjectionBuilder::new(base); builder.add_columns(columns)?; builder.build() } @@ -252,11 +243,7 @@ impl ProjectionPlan { /// ``` /// /// This is something that cannot be done easily using expressions. - pub fn from_schema( - base: Arc, - projection: &Schema, - blob_version: BlobVersion, - ) -> Result { + pub fn from_schema(base: Arc, projection: &Schema) -> Result { // Separate data columns from system columns // System columns (_rowid, _rowaddr, etc.) are handled via flags in Projection, // not as fields in the Schema @@ -296,9 +283,7 @@ impl ProjectionPlan { }; // Calculate the physical projection from data columns only - let mut physical_projection = Projection::empty(base) - .union_schema(&data_schema) - .with_blob_version(blob_version); + let mut physical_projection = Projection::empty(base).union_schema(&data_schema); physical_projection.with_row_id = with_row_id; physical_projection.with_row_addr = with_row_addr; @@ -319,7 +304,7 @@ impl ProjectionPlan { }) } - pub fn full(base: Arc, blob_version: BlobVersion) -> Result { + pub fn full(base: Arc) -> Result { let physical_cols: Vec<&str> = base .schema() .fields @@ -327,9 +312,8 @@ impl ProjectionPlan { .map(|f| f.name.as_ref()) .collect::>(); - let physical_projection = Projection::empty(base.clone()) - .union_columns(&physical_cols, OnMissing::Ignore)? - .with_blob_version(blob_version); + let physical_projection = + Projection::empty(base.clone()).union_columns(&physical_cols, OnMissing::Ignore)?; let requested_output_expr = physical_cols .into_iter() @@ -468,7 +452,7 @@ mod tests { let base_schema = Schema::try_from(&arrow_schema).unwrap(); let base = Arc::new(base_schema.clone()); - let plan = ProjectionPlan::from_schema(base, &base_schema, BlobVersion::default()).unwrap(); + let plan = ProjectionPlan::from_schema(base, &base_schema).unwrap(); let physical = plan.physical_projection.to_arrow_schema(); assert!(is_json_field(physical.field_with_name("meta").unwrap())); diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index d978e9ade0d..a540cd1be21 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -735,8 +735,10 @@ mod tests { #[tokio::test] async fn test_blob_v2_external_round_trip() { - let blob_metadata = - HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let blob_metadata = HashMap::from([( + lance_arrow::ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + )]); let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); @@ -813,8 +815,10 @@ mod tests { #[tokio::test] async fn test_blob_v2_dedicated_round_trip() { - let blob_metadata = - HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let blob_metadata = HashMap::from([( + lance_arrow::ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + )]); let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); @@ -878,8 +882,10 @@ mod tests { #[tokio::test] async fn test_blob_v2_packed_round_trip() { - let blob_metadata = - HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + let blob_metadata = HashMap::from([( + lance_arrow::ARROW_EXT_NAME_KEY.to_string(), + lance_arrow::BLOB_V2_EXT_NAME.to_string(), + )]); let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index a89454d060a..a740cdbb160 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -993,8 +993,7 @@ async fn check_round_trip_encoding_inner( let decode_field = if is_structural_encoding { let mut lance_field = lance_core::datatypes::Field::try_from(field).unwrap(); if lance_field.is_blob() && matches!(lance_field.data_type(), DataType::Struct(_)) { - lance_field = - lance_field.into_unloaded_with_version(lance_core::datatypes::BlobVersion::V2); + lance_field.unloaded_mut(); let mut arrow_field = ArrowField::from(&lance_field); let mut metadata = arrow_field.metadata().clone(); metadata.insert("lance-encoding:packed".to_string(), "true".to_string()); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bf4bede7d50..1c32e3aff31 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -13,15 +13,12 @@ use futures::future::BoxFuture; use futures::stream::{self, BoxStream, StreamExt, TryStreamExt}; use futures::{FutureExt, Stream}; -use crate::dataset::blob::blob_version_from_config; use crate::dataset::metadata::UpdateFieldMetadataBuilder; use crate::dataset::transaction::translate_schema_metadata_updates; use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey}; use crate::session::index_caches::DSIndexCache; use itertools::Itertools; -use lance_core::datatypes::{ - BlobVersion, Field, OnMissing, OnTypeMismatch, Projectable, Projection, -}; +use lance_core::datatypes::{Field, OnMissing, OnTypeMismatch, Projectable, Projection}; use lance_core::traits::DatasetTakeRows; use lance_core::utils::address::RowAddress; use lance_core::utils::tracing::{ @@ -402,7 +399,6 @@ impl ProjectionRequest { } pub fn into_projection_plan(self, dataset: Arc) -> Result { - let blob_version = dataset.blob_version(); match self { Self::Schema(schema) => { // The schema might contain system columns (_rowid, _rowaddr) which are not @@ -415,7 +411,7 @@ impl ProjectionRequest { if system_columns_present { // If system columns are present, we can't use project_by_schema directly // Just pass the schema to ProjectionPlan::from_schema which handles it - ProjectionPlan::from_schema(dataset, schema.as_ref(), blob_version) + ProjectionPlan::from_schema(dataset, schema.as_ref()) } else { // No system columns, use normal path with validation let projection = dataset.schema().project_by_schema( @@ -423,10 +419,10 @@ impl ProjectionRequest { OnMissing::Error, OnTypeMismatch::Error, )?; - ProjectionPlan::from_schema(dataset, &projection, blob_version) + ProjectionPlan::from_schema(dataset, &projection) } } - Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns, blob_version), + Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns), } } } @@ -1859,12 +1855,12 @@ impl Dataset { /// Similar to [Self::schema], but only returns fields that are not marked as blob columns /// Creates a new empty projection into the dataset schema pub fn empty_projection(self: &Arc) -> Projection { - Projection::empty(self.clone()).with_blob_version(self.blob_version()) + Projection::empty(self.clone()) } /// Creates a projection that includes all columns in the dataset pub fn full_projection(self: &Arc) -> Projection { - Projection::full(self.clone()).with_blob_version(self.blob_version()) + Projection::full(self.clone()) } /// Get fragments. @@ -2824,10 +2820,6 @@ impl Dataset { &self.manifest.config } - pub(crate) fn blob_version(&self) -> BlobVersion { - blob_version_from_config(&self.manifest.config) - } - /// Delete keys from the config. #[deprecated( note = "Use the new update_config(values, replace) method - pass None values to delete keys" diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 1fd3b270509..720edd33534 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -24,15 +24,6 @@ use lance_core::utils::blob::blob_path; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; -pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version"; - -pub fn blob_version_from_config(config: &HashMap) -> BlobVersion { - config - .get(BLOB_VERSION_CONFIG_KEY) - .and_then(|value| BlobVersion::from_config_value(value)) - .unwrap_or(BlobVersion::V1) -} - const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GiB per .pack sidecar @@ -1154,7 +1145,6 @@ mod tests { let params = WriteParams { data_storage_version: Some(LanceFileVersion::V2_2), - blob_version: Some(lance_core::datatypes::BlobVersion::V2), ..Default::default() }; let dataset = Arc::new( diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 0bd90a86782..bbaf307c5e8 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1128,7 +1128,6 @@ mod tests { commit_handler: Some(Arc::new(RenameCommitHandler)), mode: WriteMode::Create, data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), - blob_version: Some(lance_core::datatypes::BlobVersion::V2), ..Default::default() }; InsertBuilder::new(&fixture.dataset_path) @@ -1173,7 +1172,6 @@ mod tests { commit_handler: Some(Arc::new(RenameCommitHandler)), mode: WriteMode::Create, data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), - blob_version: Some(lance_core::datatypes::BlobVersion::V2), ..Default::default() }; InsertBuilder::new(&fixture.dataset_path) diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index b6e9361361f..e70ef68cb21 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -4,7 +4,7 @@ use arrow_schema::Schema as ArrowSchema; use datafusion::execution::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; -use lance_core::datatypes::{BlobVersion, Schema}; +use lance_core::datatypes::Schema; use lance_core::Error; use lance_datafusion::chunker::{break_stream, chunk_stream}; use lance_datafusion::utils::StreamingWriteSource; @@ -218,7 +218,6 @@ impl<'a> FragmentCreateBuilder<'a> { stream, params.into_owned(), version, - BlobVersion::V1, None, // Fragment creation doesn't use target_bases ) .await diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a0812d6caf4..e4a456d11d0 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -760,8 +760,7 @@ impl TakeOperation { impl Scanner { pub fn new(dataset: Arc) -> Self { - let projection_plan = - ProjectionPlan::full(dataset.clone(), dataset.blob_version()).unwrap(); + let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap(); let file_reader_options = dataset.file_reader_options.clone(); let mut scanner = Self { dataset, @@ -885,11 +884,7 @@ impl Scanner { columns: &[(impl AsRef, impl AsRef)], ) -> Result<&mut Self> { self.explicit_projection = true; - self.projection_plan = ProjectionPlan::from_expressions( - self.dataset.clone(), - columns, - self.dataset.blob_version(), - )?; + self.projection_plan = ProjectionPlan::from_expressions(self.dataset.clone(), columns)?; if self.legacy_with_row_id { self.projection_plan.include_row_id(); } diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index cfc49fe876a..08599f8c09f 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -726,7 +726,7 @@ mod test { } #[tokio::test] - async fn test_take_blob_v1_from_legacy_large_binary_on_v2_2_by_default() { + async fn test_reject_legacy_blob_schema_on_v2_2() { let mut metadata = HashMap::new(); metadata.insert(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()); @@ -750,22 +750,12 @@ mod test { ..Default::default() }; let batches = RecordBatchIterator::new([Ok(batch)], schema); - let dataset = Dataset::write(batches, "memory://", Some(write_params)) + let err = Dataset::write(batches, "memory://", Some(write_params)) .await - .unwrap(); - - assert!(dataset - .config() - .get(crate::dataset::blob::BLOB_VERSION_CONFIG_KEY) - .is_none()); - - let proj = ProjectionRequest::from_columns(["blob"], dataset.schema()); - let values = dataset.take(&[0u64], proj).await.unwrap(); - - let struct_arr = values.column(0).as_struct(); - assert_eq!(struct_arr.fields().len(), 2); - assert_eq!(struct_arr.fields()[0].name(), "position"); - assert_eq!(struct_arr.fields()[1].name(), "size"); + .unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("Legacy blob columns")); + assert!(msg.contains("lance.blob.v2")); } #[tokio::test] @@ -780,7 +770,6 @@ mod test { let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); let write_params = WriteParams { data_storage_version: Some(LanceFileVersion::V2_2), - blob_version: Some(lance_core::datatypes::BlobVersion::V2), ..Default::default() }; let batches = RecordBatchIterator::new([Ok(batch)], schema); @@ -790,15 +779,6 @@ mod test { .await .unwrap(); - assert_eq!( - dataset - .config() - .get(crate::dataset::blob::BLOB_VERSION_CONFIG_KEY) - .unwrap() - .as_str(), - lance_core::datatypes::BlobVersion::V2.config_value() - ); - let proj = ProjectionRequest::from_columns(["blob"], dataset.schema()); let values = dataset.take(&[0u64], proj).await.unwrap(); diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 57871761d96..7197f877943 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -148,7 +148,6 @@ impl Updater { &schema, &self.fragment.dataset().base, data_storage_version, - self.fragment.dataset().blob_version(), ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 3b88f5e8613..2856a08f889 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -6,8 +6,10 @@ use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{Stream, StreamExt, TryStreamExt}; +use lance_arrow::BLOB_META_KEY; +use lance_core::datatypes::BlobVersion; use lance_core::datatypes::{ - BlobVersion, NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, + NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, }; use lance_core::error::LanceOptionExt; use lance_core::utils::tempfile::TempDir; @@ -244,13 +246,6 @@ pub struct WriteParams { /// These will be resolved to IDs when the write operation executes. /// Resolution happens at builder execution time when dataset context is available. pub target_base_names_or_paths: Option>, - - /// If set and this write creates a new dataset, the blob encoding version to persist - /// into the dataset config as `lance.blob.version`. - /// - /// For existing datasets, the blob version is determined by the dataset config and - /// must not be changed by writes. - pub blob_version: Option, } impl Default for WriteParams { @@ -275,7 +270,6 @@ impl Default for WriteParams { initial_bases: None, target_bases: None, target_base_names_or_paths: None, - blob_version: None, } } } @@ -384,7 +378,6 @@ pub async fn do_write_fragments( data: SendableRecordBatchStream, params: WriteParams, storage_version: LanceFileVersion, - blob_version: BlobVersion, target_bases_info: Option>, ) -> Result> { let adapter = SchemaAdapter::new(data.schema()); @@ -406,7 +399,6 @@ pub async fn do_write_fragments( base_dir, schema, storage_version, - blob_version, target_bases_info, ); let mut writer: Option> = None; @@ -594,6 +586,21 @@ pub async fn write_fragments_internal( // Make sure the max rows per group is not larger than the max rows per file params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file); + let write_storage_version = if let Some(dataset) = dataset { + let dataset_storage_version = dataset + .manifest() + .data_storage_format + .lance_file_version()?; + match params.mode { + WriteMode::Append | WriteMode::Create => dataset_storage_version, + WriteMode::Overwrite => params + .data_storage_version + .unwrap_or(dataset_storage_version), + } + } else { + params.storage_version_or_default() + }; + let (schema, storage_version) = if let Some(dataset) = dataset { match params.mode { WriteMode::Append | WriteMode::Create => { @@ -616,41 +623,25 @@ pub async fn write_fragments_internal( OnTypeMismatch::Error, )?; // Use the storage version from the dataset, ignoring any version from the user. - let data_storage_version = dataset - .manifest() - .data_storage_format - .lance_file_version()?; - (write_schema, data_storage_version) + (write_schema, write_storage_version) } WriteMode::Overwrite => { // Overwrite, use the schema from the data. If the user specified // a storage version use that. Otherwise use the version from the // dataset. - let data_storage_version = params.data_storage_version.unwrap_or( - dataset - .manifest() - .data_storage_format - .lance_file_version()?, - ); - (converted_schema, data_storage_version) + (converted_schema, write_storage_version) } } } else { // Brand new dataset, use the schema from the data and the storage version // from the user or the default. - (converted_schema, params.storage_version_or_default()) + (converted_schema, write_storage_version) }; - let requested_blob_version = params.blob_version; - - let target_blob_version = requested_blob_version - .or_else(|| dataset.map(|d| d.blob_version())) - .unwrap_or(BlobVersion::V1); - - if storage_version < LanceFileVersion::V2_2 && target_blob_version == BlobVersion::V2 { + if storage_version < LanceFileVersion::V2_2 && schema.fields.iter().any(|f| f.is_blob_v2()) { return Err(Error::InvalidInput { source: format!( - "Blob version v2 requires file version >= 2.2 (got {:?})", + "Blob v2 requires file version >= 2.2 (got {:?})", storage_version ) .into(), @@ -658,18 +649,19 @@ pub async fn write_fragments_internal( }); } - if let (Some(dataset), Some(requested_blob_version)) = (dataset, requested_blob_version) { - let existing_version = dataset.blob_version(); - if existing_version != requested_blob_version { - return Err(Error::InvalidInput { - source: format!( - "Blob column version mismatch. Existing dataset uses {:?} but requested write requires {:?}. Changing blob version is not allowed", - existing_version, requested_blob_version - ) - .into(), - location: location!(), - }); - } + if storage_version >= LanceFileVersion::V2_2 + && schema + .fields + .iter() + .any(|f| f.metadata.contains_key(BLOB_META_KEY)) + { + return Err(Error::InvalidInput { + source: format!( + "Legacy blob columns (field metadata key {BLOB_META_KEY:?}) are not supported for file version >= 2.2. Use the blob v2 extension type (ARROW:extension:name = \"lance.blob.v2\") and the new blob APIs (e.g. lance::blob::blob_field / lance::blob::BlobArrayBuilder)." + ) + .into(), + location: location!(), + }); } let fragments = do_write_fragments( @@ -679,7 +671,6 @@ pub async fn write_fragments_internal( data, params, storage_version, - target_blob_version, target_bases_info, ) .await?; @@ -795,18 +786,8 @@ pub async fn open_writer( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, - blob_version: BlobVersion, ) -> Result> { - open_writer_with_options( - object_store, - schema, - base_dir, - storage_version, - blob_version, - true, - None, - ) - .await + open_writer_with_options(object_store, schema, base_dir, storage_version, true, None).await } pub async fn open_writer_with_options( @@ -814,7 +795,6 @@ pub async fn open_writer_with_options( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, - blob_version: BlobVersion, add_data_dir: bool, base_id: Option, ) -> Result> { @@ -843,26 +823,51 @@ pub async fn open_writer_with_options( }) } else { let writer = object_store.create(&full_path).await?; + let has_blob_v2 = schema.fields.iter().any(|f| f.is_blob_v2()); + if storage_version < LanceFileVersion::V2_2 && has_blob_v2 { + return Err(Error::InvalidInput { + source: format!( + "Blob v2 requires file version >= 2.2 (got {:?})", + storage_version + ) + .into(), + location: location!(), + }); + } + if storage_version >= LanceFileVersion::V2_2 + && schema + .fields + .iter() + .any(|f| f.metadata.contains_key(BLOB_META_KEY)) + { + return Err(Error::InvalidInput { + source: format!( + "Legacy blob columns (field metadata key {BLOB_META_KEY:?}) are not supported for file version >= 2.2. Use the blob v2 extension type (ARROW:extension:name = \"lance.blob.v2\") and the new blob APIs (e.g. lance::blob::blob_field / lance::blob::BlobArrayBuilder)." + ) + .into(), + location: location!(), + }); + } + let enable_blob_v2 = storage_version >= LanceFileVersion::V2_2; let file_writer = current_writer::FileWriter::try_new( writer, schema.clone(), FileWriterOptions { - blob_version: (blob_version == BlobVersion::V2).then_some(blob_version), + blob_version: enable_blob_v2.then_some(BlobVersion::V2), format_version: Some(storage_version), ..Default::default() }, )?; - let preprocessor = - if storage_version >= LanceFileVersion::V2_2 && blob_version == BlobVersion::V2 { - Some(BlobPreprocessor::new( - object_store.clone(), - data_dir.clone(), - data_file_key.clone(), - schema, - )) - } else { - None - }; + let preprocessor = if enable_blob_v2 { + Some(BlobPreprocessor::new( + object_store.clone(), + data_dir.clone(), + data_file_key.clone(), + schema, + )) + } else { + None + }; let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, @@ -894,7 +899,6 @@ struct WriterGenerator { base_dir: Path, schema: Schema, storage_version: LanceFileVersion, - blob_version: BlobVersion, /// Target base information (if writing to specific bases) target_bases_info: Option>, /// Counter for round-robin selection @@ -907,7 +911,6 @@ impl WriterGenerator { base_dir: &Path, schema: &Schema, storage_version: LanceFileVersion, - blob_version: BlobVersion, target_bases_info: Option>, ) -> Self { Self { @@ -915,7 +918,6 @@ impl WriterGenerator { base_dir: base_dir.clone(), schema: schema.clone(), storage_version, - blob_version, target_bases_info, next_base_index: AtomicUsize::new(0), } @@ -942,7 +944,6 @@ impl WriterGenerator { &self.schema, &base_info.base_dir, self.storage_version, - self.blob_version, base_info.is_dataset_root, Some(base_info.base_id), ) @@ -953,7 +954,6 @@ impl WriterGenerator { &self.schema, &self.base_dir, self.storage_version, - self.blob_version, true, None, ) @@ -1588,7 +1588,6 @@ mod tests { &base_dir, &schema, LanceFileVersion::Stable, - BlobVersion::V1, Some(target_bases), ); @@ -1633,7 +1632,6 @@ mod tests { &schema, &base_dir, LanceFileVersion::Stable, - BlobVersion::V1, false, // Don't add /data None, ) @@ -1700,7 +1698,6 @@ mod tests { &Path::from("default"), &schema, LanceFileVersion::Stable, - BlobVersion::V1, Some(target_bases), ); diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 4d193cd28be..f2fb5aa0dbc 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use arrow_array::{RecordBatch, RecordBatchIterator}; use datafusion::execution::SendableRecordBatchStream; use humantime::format_duration; -use lance_core::datatypes::{BlobVersion, NullabilityComparison, Schema, SchemaCompareOptions}; +use lance_core::datatypes::{NullabilityComparison, Schema, SchemaCompareOptions}; use lance_core::utils::tracing::{DATASET_WRITING_EVENT, TRACE_DATASET_EVENTS}; use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; use lance_datafusion::utils::StreamingWriteSource; @@ -19,7 +19,6 @@ use lance_table::io::commit::CommitHandler; use object_store::path::Path; use snafu::location; -use crate::dataset::blob::BLOB_VERSION_CONFIG_KEY; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; @@ -194,7 +193,7 @@ impl<'a> InsertBuilder<'a> { let target_base_info = validate_and_resolve_target_bases(&mut context.params, existing_base_paths).await?; - let (written_fragments, _) = write_fragments_internal( + let (written_fragments, written_schema) = write_fragments_internal( context.dest.dataset(), context.object_store.clone(), &context.base_path, @@ -205,7 +204,7 @@ impl<'a> InsertBuilder<'a> { ) .await?; - let transaction = Self::build_transaction(schema, written_fragments, &context)?; + let transaction = Self::build_transaction(written_schema, written_fragments, &context)?; Ok((transaction, context)) } @@ -235,20 +234,6 @@ impl<'a> InsertBuilder<'a> { format_duration(duration).to_string(), ); } - if let Some(blob_version) = context.params.blob_version { - if blob_version != BlobVersion::V1 - && context.storage_version < LanceFileVersion::V2_2 - { - return Err(Error::InvalidInput { - source: "Blob version v2 requires file version >= 2.2".into(), - location: location!(), - }); - } - upsert_values.insert( - BLOB_VERSION_CONFIG_KEY.to_string(), - blob_version.config_value().to_string(), - ); - } let config_upsert_values = if upsert_values.is_empty() { None } else { @@ -455,9 +440,7 @@ mod test { use arrow_array::{BinaryArray, Int32Array, RecordBatchReader, StructArray}; use arrow_schema::{ArrowError, DataType, Field, Schema}; use lance_arrow::BLOB_META_KEY; - use lance_core::datatypes::BlobVersion; - use crate::dataset::ProjectionRequest; use crate::session::Session; use super::*; @@ -538,7 +521,7 @@ mod test { } #[tokio::test] - async fn create_v2_2_dataset_with_forced_blob_v2() { + async fn create_v2_2_dataset_rejects_legacy_blob_schema() { let schema = Arc::new(Schema::new(vec![Field::new( "blob", DataType::Binary, @@ -558,30 +541,20 @@ mod test { .with_params(&WriteParams { mode: WriteMode::Create, data_storage_version: Some(LanceFileVersion::V2_2), - blob_version: Some(BlobVersion::V2), ..Default::default() }) .execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())) - .await - .unwrap(); - - assert_eq!( - dataset - .manifest - .config - .get(BLOB_VERSION_CONFIG_KEY) - .map(String::as_str), - Some("2") - ); + .await; - let batch = dataset - .take( - &[0u64], - ProjectionRequest::from_columns(["blob"], dataset.schema()), - ) - .await - .unwrap(); - assert_eq!(batch.num_rows(), 1); + let err = dataset.unwrap_err(); + match err { + Error::InvalidInput { source, .. } => { + let message = source.to_string(); + assert!(message.contains("Legacy blob columns")); + assert!(message.contains("lance.blob.v2")); + } + other => panic!("unexpected error: {other:?}"), + } } mod external_error { diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index e6da2c3e478..e488a2f2439 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -930,7 +930,6 @@ impl MergeInsertJob { &write_schema, &dataset.base, data_storage_version, - dataset.blob_version(), ) .await?; From cda5817be0bc22aef8c0350d14fdc9499ee6010e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 02:25:18 +0800 Subject: [PATCH 08/21] Remove dead blob preprocessor path --- rust/lance/src/dataset/fragment/write.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index e70ef68cb21..bc8f78871b4 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -18,7 +18,6 @@ use snafu::location; use std::borrow::Cow; use uuid::Uuid; -use crate::dataset::blob::{preprocess_blob_batches, BlobPreprocessor}; use crate::dataset::builder::DatasetBuilder; use crate::dataset::write::do_write_fragments; use crate::dataset::{WriteMode, WriteParams, DATA_DIR}; @@ -149,8 +148,6 @@ impl<'a> FragmentCreateBuilder<'a> { }, )?; - let mut preprocessor: Option = None; - let (major, minor) = writer.version().to_numbers(); let data_file = DataFile::new_unstarted(filename, major, minor); @@ -164,10 +161,7 @@ impl<'a> FragmentCreateBuilder<'a> { .map_ok(|batch| vec![batch]) .boxed(); while let Some(batched_chunk) = broken_stream.next().await { - let mut batch_chunk = batched_chunk?; - if let Some(pre) = preprocessor.as_mut() { - batch_chunk = preprocess_blob_batches(&batch_chunk, pre).await?; - } + let batch_chunk = batched_chunk?; writer.write_batches(batch_chunk.iter()).await?; } From 92413ce4dc0ae0a2d4c53195db86015eb0eb3a6b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 17:45:56 +0800 Subject: [PATCH 09/21] remove not needed check --- rust/lance-arrow/src/lib.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 04b07743889..0a61a407395 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -843,13 +843,6 @@ fn project_array(array: &ArrayRef, target_field: &Field) -> Result { } fn project(struct_array: &StructArray, fields: &Fields) -> Result { - if struct_array.fields().len() != struct_array.columns().len() { - return Err(ArrowError::SchemaError(format!( - "Invalid StructArray: {} fields but {} columns", - struct_array.fields().len(), - struct_array.columns().len() - ))); - } if fields.is_empty() { return Ok(StructArray::new_empty_fields( struct_array.len(), From af281f57ed1bfc44d1970b21df680fc50ce839e1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 17:55:56 +0800 Subject: [PATCH 10/21] clean up --- .../src/encodings/logical/blob.rs | 216 +----------------- 1 file changed, 2 insertions(+), 214 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index a540cd1be21..387db744ceb 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow_array::{ - builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}, + builder::{PrimitiveBuilder, StringBuilder}, cast::AsArray, types::{UInt32Type, UInt64Type, UInt8Type}, Array, ArrayRef, StructArray, UInt64Array, @@ -274,7 +274,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { row_number: u64, num_rows: u64, ) -> Result> { - let struct_arr = normalize_blob_v2_input(array)?; + let struct_arr = array.as_struct(); if let Some(validity) = struct_arr.nulls() { repdef.add_validity_bitmap(validity.clone()); } else { @@ -422,218 +422,6 @@ impl FieldEncoder for BlobV2StructuralEncoder { } } -fn normalize_blob_v2_input(array: ArrayRef) -> Result { - match array.data_type() { - DataType::Struct(_) => { - let struct_arr = array.as_struct(); - if struct_arr.fields().len() != struct_arr.columns().len() { - return Err(Error::InvalidInput { - source: format!( - "Invalid StructArray: {} fields but {} columns", - struct_arr.fields().len(), - struct_arr.columns().len() - ) - .into(), - location: location!(), - }); - } - - let is_normalized = ["kind", "data", "uri", "blob_id", "blob_size", "position"] - .iter() - .all(|name| struct_arr.column_by_name(name).is_some()); - if is_normalized { - return Ok(struct_arr.clone()); - } - - let data_col = - struct_arr - .column_by_name("data") - .ok_or_else(|| Error::InvalidInput { - source: "Blob struct missing `data` field".into(), - location: location!(), - })?; - let uri_col = struct_arr - .column_by_name("uri") - .ok_or_else(|| Error::InvalidInput { - source: "Blob struct missing `uri` field".into(), - location: location!(), - })?; - - if struct_arr.columns().len() != 2 { - return Err(Error::InvalidInput { - source: format!( - "Unsupported blob struct input: expected 2 or 6 fields, got {}", - struct_arr.columns().len() - ) - .into(), - location: location!(), - }); - } - - let data_col = data_col.as_binary::(); - let uri_col = uri_col.as_string::(); - let row_count = struct_arr.len(); - - let mut kind_builder = PrimitiveBuilder::::with_capacity(row_count); - let mut data_builder = LargeBinaryBuilder::with_capacity(row_count, 0); - let mut uri_builder = StringBuilder::with_capacity(row_count, 0); - let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); - let mut blob_size_builder = PrimitiveBuilder::::with_capacity(row_count); - let mut position_builder = PrimitiveBuilder::::with_capacity(row_count); - - for i in 0..row_count { - if struct_arr.is_null(i) { - kind_builder.append_null(); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - continue; - } - - let has_data = !data_col.is_null(i); - let has_uri = !uri_col.is_null(i); - if has_uri { - kind_builder.append_value(BlobKind::External as u8); - data_builder.append_null(); - uri_builder.append_value(uri_col.value(i)); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - } else if has_data { - kind_builder.append_value(BlobKind::Inline as u8); - data_builder.append_value(data_col.value(i)); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - } else { - kind_builder.append_null(); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - } - } - - let fields = Fields::from(vec![ - ArrowField::new("kind", DataType::UInt8, true), - ArrowField::new("data", DataType::LargeBinary, true), - ArrowField::new("uri", DataType::Utf8, true), - ArrowField::new("blob_id", DataType::UInt32, true), - ArrowField::new("blob_size", DataType::UInt64, true), - ArrowField::new("position", DataType::UInt64, true), - ]); - - StructArray::try_new( - fields, - vec![ - Arc::new(kind_builder.finish()) as ArrayRef, - Arc::new(data_builder.finish()) as ArrayRef, - Arc::new(uri_builder.finish()) as ArrayRef, - Arc::new(blob_id_builder.finish()) as ArrayRef, - Arc::new(blob_size_builder.finish()) as ArrayRef, - Arc::new(position_builder.finish()) as ArrayRef, - ], - struct_arr.nulls().cloned(), - ) - .map_err(|e| Error::InvalidInput { - source: e.to_string().into(), - location: location!(), - }) - } - DataType::Binary | DataType::LargeBinary => { - let row_count = array.len(); - let nulls = array.nulls().cloned(); - - let mut kind_builder = PrimitiveBuilder::::with_capacity(row_count); - let mut data_builder = LargeBinaryBuilder::with_capacity(row_count, 0); - let mut uri_builder = StringBuilder::with_capacity(row_count, 0); - let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); - let mut blob_size_builder = PrimitiveBuilder::::with_capacity(row_count); - let mut position_builder = PrimitiveBuilder::::with_capacity(row_count); - - if let Some(binary) = array.as_binary_opt::() { - for i in 0..row_count { - if binary.is_null(i) { - kind_builder.append_null(); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - continue; - } - - kind_builder.append_value(BlobKind::Inline as u8); - data_builder.append_value(binary.value(i)); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - } - } else if let Some(binary) = array.as_binary_opt::() { - for i in 0..row_count { - if binary.is_null(i) { - kind_builder.append_null(); - data_builder.append_null(); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - continue; - } - - kind_builder.append_value(BlobKind::Inline as u8); - data_builder.append_value(binary.value(i)); - uri_builder.append_null(); - blob_id_builder.append_null(); - blob_size_builder.append_null(); - position_builder.append_null(); - } - } else { - return Err(Error::InvalidInput { - source: format!("Expected (Large)Binary array, got {}", array.data_type()) - .into(), - location: location!(), - }); - } - - let fields = Fields::from(vec![ - ArrowField::new("kind", DataType::UInt8, true), - ArrowField::new("data", DataType::LargeBinary, true), - ArrowField::new("uri", DataType::Utf8, true), - ArrowField::new("blob_id", DataType::UInt32, true), - ArrowField::new("blob_size", DataType::UInt64, true), - ArrowField::new("position", DataType::UInt64, true), - ]); - - StructArray::try_new( - fields, - vec![ - Arc::new(kind_builder.finish()) as ArrayRef, - Arc::new(data_builder.finish()) as ArrayRef, - Arc::new(uri_builder.finish()) as ArrayRef, - Arc::new(blob_id_builder.finish()) as ArrayRef, - Arc::new(blob_size_builder.finish()) as ArrayRef, - Arc::new(position_builder.finish()) as ArrayRef, - ], - nulls, - ) - .map_err(|e| Error::InvalidInput { - source: e.to_string().into(), - location: location!(), - }) - } - _ => Err(Error::InvalidInput { - source: format!("Unsupported blob v2 input type {}", array.data_type()).into(), - location: location!(), - }), - } -} - #[cfg(test)] mod tests { use super::*; From 8e6b9a2f43ddb06d70db98a683e425f6a88d7000 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 18:11:40 +0800 Subject: [PATCH 11/21] clean up apis --- rust/lance-encoding/src/encoder.rs | 7 +++- .../src/encodings/logical/blob.rs | 20 +++++------ .../src/previous/encodings/logical/blob.rs | 9 +++-- rust/lance-encoding/src/testing.rs | 33 ++----------------- 4 files changed, 25 insertions(+), 44 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 6f23694abde..4e01d5b6d39 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -350,10 +350,15 @@ impl Default for StructuralEncodingStrategy { impl StructuralEncodingStrategy { pub fn with_version(version: LanceFileVersion) -> Self { + let blob_version = if version >= LanceFileVersion::V2_2 { + BlobVersion::V2 + } else { + BlobVersion::V1 + }; Self { compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)), version, - blob_version: BlobVersion::V1, + blob_version, } } diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 387db744ceb..9bf54e32ebb 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -28,7 +28,6 @@ use crate::{ }; use lance_core::datatypes::BlobKind; #[cfg(test)] -use lance_core::datatypes::BlobVersion; /// Blob structural encoder - stores large binary data in external buffers /// @@ -518,7 +517,12 @@ mod tests { ])); // Use the standard test harness - check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await; + check_round_trip_encoding_of_data( + vec![array], + &TestCases::default().with_max_file_version(LanceFileVersion::V2_1), + blob_metadata, + ) + .await; } #[tokio::test] @@ -593,9 +597,7 @@ mod tests { check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], Some(Arc::new(expected_descriptor)), - &TestCases::default() - .with_min_file_version(LanceFileVersion::V2_2) - .with_blob_version(BlobVersion::V2), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), blob_metadata, ) .await; @@ -660,9 +662,7 @@ mod tests { check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], Some(Arc::new(expected_descriptor)), - &TestCases::default() - .with_min_file_version(LanceFileVersion::V2_2) - .with_blob_version(BlobVersion::V2), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), blob_metadata, ) .await; @@ -724,9 +724,7 @@ mod tests { check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], Some(Arc::new(expected_descriptor)), - &TestCases::default() - .with_min_file_version(LanceFileVersion::V2_2) - .with_blob_version(BlobVersion::V2), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), blob_metadata, ) .await; diff --git a/rust/lance-encoding/src/previous/encodings/logical/blob.rs b/rust/lance-encoding/src/previous/encodings/logical/blob.rs index e9719553124..3d79df2b03d 100644 --- a/rust/lance-encoding/src/previous/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/previous/encodings/logical/blob.rs @@ -400,7 +400,7 @@ pub mod tests { use crate::{ format::pb::column_encoding, - testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases}, + testing::{check_round_trip_encoding_of_data, check_specific_random, TestCases}, version::LanceFileVersion, }; @@ -414,7 +414,11 @@ pub mod tests { #[test_log::test(tokio::test)] async fn test_basic_blob() { let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone()); - check_basic_random(field).await; + check_specific_random( + field, + TestCases::basic().with_max_file_version(LanceFileVersion::V2_1), + ) + .await; } #[test_log::test(tokio::test)] @@ -423,6 +427,7 @@ pub mod tests { let val2: &[u8] = &[7, 8, 9]; let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)])); let test_cases = TestCases::default() + .with_max_file_version(LanceFileVersion::V2_1) .with_expected_encoding("packed_struct") .with_verify_encoding(Arc::new(|cols, version| { if version < &LanceFileVersion::V2_1 { diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index dd8bf889b39..37df889035f 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -21,7 +21,7 @@ use futures::{future::BoxFuture, FutureExt, StreamExt}; use log::{debug, info, trace}; use tokio::sync::mpsc::{self, UnboundedSender}; -use lance_core::{datatypes::BlobVersion, utils::bit::pad_bytes, Result}; +use lance_core::{utils::bit::pad_bytes, Result}; use lance_datagen::{array, gen_batch, ArrayGenerator, RowCount, Seed}; use crate::{ @@ -32,8 +32,7 @@ use crate::{ }, encoder::{ default_encoding_strategy, ColumnIndexSequence, EncodedColumn, EncodedPage, - EncodingOptions, FieldEncoder, OutOfLineBuffers, StructuralEncodingStrategy, - MIN_PAGE_BUFFER_ALIGNMENT, + EncodingOptions, FieldEncoder, OutOfLineBuffers, MIN_PAGE_BUFFER_ALIGNMENT, }, repdef::RepDefBuilder, version::LanceFileVersion, @@ -403,7 +402,6 @@ pub struct TestCases { max_file_version: Option, verify_encoding: Option>, expected_encoding: Option>, - blob_version: Option, } impl Default for TestCases { @@ -419,7 +417,6 @@ impl Default for TestCases { max_file_version: None, verify_encoding: None, expected_encoding: None, - blob_version: None, } } } @@ -465,11 +462,6 @@ impl TestCases { self } - pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self { - self.blob_version = Some(blob_version); - self - } - pub fn with_max_file_version(mut self, version: LanceFileVersion) -> Self { self.max_file_version = Some(version); self @@ -754,26 +746,7 @@ pub async fn check_round_trip_encoding_of_data_with_expected( let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap(); for file_version in test_cases.get_versions() { for page_size in test_cases.page_sizes.iter() { - let encoding_strategy = if let Some(blob_version) = test_cases.blob_version { - if blob_version == BlobVersion::V1 { - default_encoding_strategy(file_version) - } else { - if file_version < LanceFileVersion::V2_2 { - panic!("Blob version v2 requires file version >= 2.2"); - } - match file_version.resolve() { - LanceFileVersion::Legacy | LanceFileVersion::V2_0 => { - panic!("Blob version v2 requires file version >= 2.2"); - } - _ => Box::new(StructuralEncodingStrategy::with_version_and_blob_version( - file_version, - blob_version, - )), - } - } - } else { - default_encoding_strategy(file_version) - }; + let encoding_strategy = default_encoding_strategy(file_version); let mut column_index_seq = ColumnIndexSequence::default(); let encoding_options = EncodingOptions { cache_bytes_per_column: *page_size, From 492822e7d914578952607a0dbf6e72779086350d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 18:18:17 +0800 Subject: [PATCH 12/21] Cleanup --- rust/lance-encoding/src/encoder.rs | 37 ++++++------------- .../src/encodings/logical/blob.rs | 1 - rust/lance-file/src/writer.rs | 3 -- 3 files changed, 12 insertions(+), 29 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 4e01d5b6d39..bc155711eaf 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -302,7 +302,6 @@ pub fn default_encoding_strategy(version: LanceFileVersion) -> Box Result> { match version.resolve() { LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input( @@ -310,14 +309,13 @@ pub fn default_encoding_strategy_with_params( location!(), )), _ => { - if blob_version != BlobVersion::V1 && version < LanceFileVersion::V2_2 { - return Err(Error::InvalidInput { - source: "Blob version v2 requires file version >= 2.2".into(), - location: location!(), - }); - } let compression_strategy = Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version)); + let blob_version = if version >= LanceFileVersion::V2_2 { + BlobVersion::V2 + } else { + BlobVersion::V1 + }; Ok(Box::new(StructuralEncodingStrategy { compression_strategy, version, @@ -856,35 +854,24 @@ mod tests { ); // Test with V2.1 - should succeed - let strategy = default_encoding_strategy_with_params( - LanceFileVersion::V2_1, - params.clone(), - BlobVersion::V1, - ) - .expect("Should succeed for V2.1"); + let strategy = + default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone()) + .expect("Should succeed for V2.1"); // Verify it's a StructuralEncodingStrategy assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy")); assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy")); // Test with V2.0 - should fail - let err = default_encoding_strategy_with_params( - LanceFileVersion::V2_0, - params.clone(), - BlobVersion::V1, - ) - .expect_err("Should fail for V2.0"); + let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone()) + .expect_err("Should fail for V2.0"); assert!(err .to_string() .contains("only supported in Lance file version 2.1")); // Test with Legacy - should fail - let err = default_encoding_strategy_with_params( - LanceFileVersion::Legacy, - params, - BlobVersion::V1, - ) - .expect_err("Should fail for Legacy"); + let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params) + .expect_err("Should fail for Legacy"); assert!(err .to_string() .contains("only supported in Lance file version 2.1")); diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 9bf54e32ebb..bc3bc80db27 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -27,7 +27,6 @@ use crate::{ repdef::{DefinitionInterpretation, RepDefBuilder}, }; use lance_core::datatypes::BlobKind; -#[cfg(test)] /// Blob structural encoder - stores large binary data in external buffers /// diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 4627b218a5f..f00dac25f4a 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -1100,7 +1100,6 @@ mod tests { let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params( LanceFileVersion::V2_1, params, - lance_core::datatypes::BlobVersion::V1, ) .unwrap(); @@ -1249,7 +1248,6 @@ mod tests { let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params( LanceFileVersion::V2_1, params, - lance_core::datatypes::BlobVersion::V1, ) .unwrap(); @@ -1352,7 +1350,6 @@ mod tests { let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params( LanceFileVersion::V2_1, params, - lance_core::datatypes::BlobVersion::V1, ) .unwrap(); From 5eaa92b7bc14033c054918f360a0319bf15778e0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 18:19:34 +0800 Subject: [PATCH 13/21] cleanup --- rust/lance-encoding/src/encoder.rs | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index bc155711eaf..46163502d8a 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -419,36 +419,6 @@ impl StructuralEncodingStrategy { // Check if field is marked as blob if field.is_blob() { - if self.blob_version == BlobVersion::V2 && self.version < LanceFileVersion::V2_2 { - return Err(Error::InvalidInput { - source: "Blob v2 requires file version >= 2.2".into(), - location: location!(), - }); - } - - if self.blob_version == BlobVersion::V2 { - match data_type { - DataType::Binary | DataType::LargeBinary | DataType::Struct(_) => { - return Ok(Box::new(BlobV2StructuralEncoder::new( - field, - column_index.next_column_index(field.id as u32), - options, - self.compression_strategy.clone(), - )?)); - } - _ => { - return Err(Error::InvalidInput { - source: format!( - "Blob encoding only supports Binary/LargeBinary or Struct, got {}", - data_type - ) - .into(), - location: location!(), - }); - } - } - } - match data_type { DataType::Binary | DataType::LargeBinary => { return Ok(Box::new(BlobStructuralEncoder::new( From fddee9fa85a72fbaacad73869e394020826d625e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 18:21:19 +0800 Subject: [PATCH 14/21] Fix --- rust/lance-encoding/src/encoder.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 46163502d8a..9e036286d51 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -428,6 +428,14 @@ impl StructuralEncodingStrategy { self.compression_strategy.clone(), )?)); } + DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => { + return Ok(Box::new(BlobV2StructuralEncoder::new( + field, + column_index.next_column_index(field.id as u32), + options, + self.compression_strategy.clone(), + )?)); + } DataType::Struct(_) => { return Err(Error::InvalidInput { source: "Blob struct input requires blob version v2".into(), From 48438603ca67d36d818653f43523550684407ee7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 18:47:21 +0800 Subject: [PATCH 15/21] Fix --- rust/lance-file/src/writer.rs | 27 ++------------------------- rust/lance/src/dataset/write.rs | 2 -- 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index f00dac25f4a..9f9fe457f3e 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -12,7 +12,6 @@ use arrow_data::ArrayData; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::FuturesOrdered; use futures::StreamExt; -use lance_core::datatypes::BlobVersion; use lance_core::datatypes::{Field, Schema as LanceSchema}; use lance_core::utils::bit::pad_bytes; use lance_core::{Error, Result}; @@ -93,10 +92,6 @@ pub struct FileWriterOptions { /// while) might keep a much larger record batch around in memory (even though most /// of that batch's data has been written to disk) pub keep_original_array: Option, - /// Controls how blob columns are encoded. - /// - /// When unset, blob columns default to blob v1 encoding. - pub blob_version: Option, pub encoding_strategy: Option>, /// The format version to use when writing the file /// @@ -320,13 +315,6 @@ impl FileWriter { encoding_strategy } else { let version = self.version(); - let blob_version = self.options.blob_version.unwrap_or(BlobVersion::V1); - if blob_version != BlobVersion::V1 && version < LanceFileVersion::V2_2 { - return Err(Error::invalid_input( - "Blob version v2 requires file version >= 2.2", - location!(), - )); - } match version.resolve() { LanceFileVersion::Legacy => { return Err(Error::invalid_input( @@ -334,19 +322,8 @@ impl FileWriter { location!(), )); } - LanceFileVersion::V2_0 => { - if blob_version != BlobVersion::V1 { - return Err(Error::invalid_input( - "Blob version v2 requires file version >= 2.2", - location!(), - )); - } - Arc::from(default_encoding_strategy(version)) - } - _ => Arc::new(StructuralEncodingStrategy::with_version_and_blob_version( - version, - blob_version, - )), + LanceFileVersion::V2_0 => Arc::from(default_encoding_strategy(version)), + _ => Arc::new(StructuralEncodingStrategy::with_version(version)), } }; diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 2856a08f889..2380f9e16bf 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -7,7 +7,6 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{Stream, StreamExt, TryStreamExt}; use lance_arrow::BLOB_META_KEY; -use lance_core::datatypes::BlobVersion; use lance_core::datatypes::{ NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, }; @@ -853,7 +852,6 @@ pub async fn open_writer_with_options( writer, schema.clone(), FileWriterOptions { - blob_version: enable_blob_v2.then_some(BlobVersion::V2), format_version: Some(storage_version), ..Default::default() }, From b83f386a935c14789514568ca53ac7c4bc564453 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 18:51:54 +0800 Subject: [PATCH 16/21] revert --- rust/lance-file/src/writer.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 9f9fe457f3e..de19a2d89ed 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -310,22 +310,10 @@ impl FileWriter { schema.validate()?; let keep_original_array = self.options.keep_original_array.unwrap_or(false); - let encoding_strategy: Arc = - if let Some(encoding_strategy) = self.options.encoding_strategy.clone() { - encoding_strategy - } else { - let version = self.version(); - match version.resolve() { - LanceFileVersion::Legacy => { - return Err(Error::invalid_input( - "Cannot create encoding strategy for legacy file version", - location!(), - )); - } - LanceFileVersion::V2_0 => Arc::from(default_encoding_strategy(version)), - _ => Arc::new(StructuralEncodingStrategy::with_version(version)), - } - }; + let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| { + let version = self.version(); + default_encoding_strategy(version).into() + }); let encoding_options = EncodingOptions { cache_bytes_per_column, From 88954c3e1de791775e009b8038fae238b23879cb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 19:09:58 +0800 Subject: [PATCH 17/21] revert not needed changes --- rust/lance/src/dataset/write.rs | 65 ++++++++++----------------------- 1 file changed, 19 insertions(+), 46 deletions(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 2380f9e16bf..a165cd46725 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -193,6 +193,9 @@ pub struct WriteParams { /// If not specified then the latest stable version will be used. pub data_storage_version: Option, + #[cfg(test)] + pub blob_version: Option, + /// Experimental: if set to true, the writer will use stable row ids. /// These row ids are stable after compaction operations, but not after updates. /// This makes compaction more efficient, since with stable row ids no @@ -260,6 +263,8 @@ impl Default for WriteParams { progress: Arc::new(NoopFragmentWriteProgress::new()), commit_handler: None, data_storage_version: None, + #[cfg(test)] + blob_version: None, enable_stable_row_ids: false, enable_v2_manifest_paths: true, session: None, @@ -585,21 +590,6 @@ pub async fn write_fragments_internal( // Make sure the max rows per group is not larger than the max rows per file params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file); - let write_storage_version = if let Some(dataset) = dataset { - let dataset_storage_version = dataset - .manifest() - .data_storage_format - .lance_file_version()?; - match params.mode { - WriteMode::Append | WriteMode::Create => dataset_storage_version, - WriteMode::Overwrite => params - .data_storage_version - .unwrap_or(dataset_storage_version), - } - } else { - params.storage_version_or_default() - }; - let (schema, storage_version) = if let Some(dataset) = dataset { match params.mode { WriteMode::Append | WriteMode::Create => { @@ -622,19 +612,29 @@ pub async fn write_fragments_internal( OnTypeMismatch::Error, )?; // Use the storage version from the dataset, ignoring any version from the user. - (write_schema, write_storage_version) + let data_storage_version = dataset + .manifest() + .data_storage_format + .lance_file_version()?; + (write_schema, data_storage_version) } WriteMode::Overwrite => { // Overwrite, use the schema from the data. If the user specified // a storage version use that. Otherwise use the version from the // dataset. - (converted_schema, write_storage_version) + let data_storage_version = params.data_storage_version.unwrap_or( + dataset + .manifest() + .data_storage_format + .lance_file_version()?, + ); + (converted_schema, data_storage_version) } } } else { // Brand new dataset, use the schema from the data and the storage version // from the user or the default. - (converted_schema, write_storage_version) + (converted_schema, params.storage_version_or_default()) }; if storage_version < LanceFileVersion::V2_2 && schema.fields.iter().any(|f| f.is_blob_v2()) { @@ -822,31 +822,6 @@ pub async fn open_writer_with_options( }) } else { let writer = object_store.create(&full_path).await?; - let has_blob_v2 = schema.fields.iter().any(|f| f.is_blob_v2()); - if storage_version < LanceFileVersion::V2_2 && has_blob_v2 { - return Err(Error::InvalidInput { - source: format!( - "Blob v2 requires file version >= 2.2 (got {:?})", - storage_version - ) - .into(), - location: location!(), - }); - } - if storage_version >= LanceFileVersion::V2_2 - && schema - .fields - .iter() - .any(|f| f.metadata.contains_key(BLOB_META_KEY)) - { - return Err(Error::InvalidInput { - source: format!( - "Legacy blob columns (field metadata key {BLOB_META_KEY:?}) are not supported for file version >= 2.2. Use the blob v2 extension type (ARROW:extension:name = \"lance.blob.v2\") and the new blob APIs (e.g. lance::blob::blob_field / lance::blob::BlobArrayBuilder)." - ) - .into(), - location: location!(), - }); - } let enable_blob_v2 = storage_version >= LanceFileVersion::V2_2; let file_writer = current_writer::FileWriter::try_new( writer, @@ -947,13 +922,11 @@ impl WriterGenerator { ) .await? } else { - open_writer_with_options( + open_writer( &self.object_store, &self.schema, &self.base_dir, self.storage_version, - true, - None, ) .await? }; From f8e3a4947e94b4036ed1d381acf5dedc09cc3423 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 19:12:36 +0800 Subject: [PATCH 18/21] cleanup --- rust/lance/src/dataset/write.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index a165cd46725..c1b36702408 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -193,9 +193,6 @@ pub struct WriteParams { /// If not specified then the latest stable version will be used. pub data_storage_version: Option, - #[cfg(test)] - pub blob_version: Option, - /// Experimental: if set to true, the writer will use stable row ids. /// These row ids are stable after compaction operations, but not after updates. /// This makes compaction more efficient, since with stable row ids no @@ -263,8 +260,6 @@ impl Default for WriteParams { progress: Arc::new(NoopFragmentWriteProgress::new()), commit_handler: None, data_storage_version: None, - #[cfg(test)] - blob_version: None, enable_stable_row_ids: false, enable_v2_manifest_paths: true, session: None, From 4734e8a7677459b61e03a84543d3c349148ee9a1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 19:24:31 +0800 Subject: [PATCH 19/21] Remove blob_version plumbing --- rust/lance-encoding/src/encoder.rs | 27 +--------------- rust/lance-file/src/writer.rs | 1 - rust/lance/src/dataset/cleanup.rs | 52 +++++++++++++++--------------- 3 files changed, 27 insertions(+), 53 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 9e036286d51..fa6b09d3587 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -19,7 +19,7 @@ use arrow_array::{Array, ArrayRef, RecordBatch}; use arrow_schema::DataType; use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; -use lance_core::datatypes::{BlobVersion, Field, Schema}; +use lance_core::datatypes::{Field, Schema}; use lance_core::error::LanceOptionExt; use lance_core::utils::bit::{is_pwr_two, pad_bytes_to}; use lance_core::{Error, Result}; @@ -311,15 +311,9 @@ pub fn default_encoding_strategy_with_params( _ => { let compression_strategy = Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version)); - let blob_version = if version >= LanceFileVersion::V2_2 { - BlobVersion::V2 - } else { - BlobVersion::V1 - }; Ok(Box::new(StructuralEncodingStrategy { compression_strategy, version, - blob_version, })) } } @@ -330,7 +324,6 @@ pub fn default_encoding_strategy_with_params( pub struct StructuralEncodingStrategy { pub compression_strategy: Arc, pub version: LanceFileVersion, - pub blob_version: BlobVersion, } // For some reason, clippy thinks we can add Default to the above derive but @@ -341,33 +334,15 @@ impl Default for StructuralEncodingStrategy { Self { compression_strategy: Arc::new(DefaultCompressionStrategy::new()), version: LanceFileVersion::default(), - blob_version: BlobVersion::V1, } } } impl StructuralEncodingStrategy { pub fn with_version(version: LanceFileVersion) -> Self { - let blob_version = if version >= LanceFileVersion::V2_2 { - BlobVersion::V2 - } else { - BlobVersion::V1 - }; - Self { - compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)), - version, - blob_version, - } - } - - pub fn with_version_and_blob_version( - version: LanceFileVersion, - blob_version: BlobVersion, - ) -> Self { Self { compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)), version, - blob_version, } } diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index de19a2d89ed..4952d9476c4 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -19,7 +19,6 @@ use lance_encoding::decoder::PageEncoding; use lance_encoding::encoder::{ default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage, EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers, - StructuralEncodingStrategy, }; use lance_encoding::repdef::RepDefBuilder; use lance_encoding::version::LanceFileVersion; diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 2b9c3c7f6fa..bec67575672 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -751,11 +751,9 @@ mod tests { use lance_testing::datagen::{some_batch, BatchGenerator, IncrementingInt32}; use mock_instant::thread_local::MockClock; use snafu::location; - use super::*; use crate::blob::{blob_field, BlobArrayBuilder}; use crate::{ - dataset::write::InsertBuilder, dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, index::vector::VectorIndexParams, }; @@ -1123,18 +1121,19 @@ mod tests { let fixture = MockDatasetFixture::try_new().unwrap(); // First version: write a packed blob (sidecar .blob file). - let write_params = WriteParams { - store_params: Some(fixture.os_params()), - commit_handler: Some(Arc::new(RenameCommitHandler)), - mode: WriteMode::Create, - data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), - ..Default::default() - }; - InsertBuilder::new(&fixture.dataset_path) - .with_params(&write_params) - .execute_stream(blob_v2_batch(100 * 1024)) - .await - .unwrap(); + Dataset::write( + blob_v2_batch(100 * 1024), + &fixture.dataset_path, + Some(WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Create, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(); assert_gt!(fixture.count_blob_files().await.unwrap(), 0); // Second version: overwrite with an inline blob (no sidecar). @@ -1167,18 +1166,19 @@ mod tests { async fn cleanup_recent_blob_v2_sidecar_files_when_verified() { let fixture = MockDatasetFixture::try_new().unwrap(); - let write_params = WriteParams { - store_params: Some(fixture.os_params()), - commit_handler: Some(Arc::new(RenameCommitHandler)), - mode: WriteMode::Create, - data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), - ..Default::default() - }; - InsertBuilder::new(&fixture.dataset_path) - .with_params(&write_params) - .execute_stream(blob_v2_batch(100 * 1024)) - .await - .unwrap(); + Dataset::write( + blob_v2_batch(100 * 1024), + &fixture.dataset_path, + Some(WriteParams { + store_params: Some(fixture.os_params()), + commit_handler: Some(Arc::new(RenameCommitHandler)), + mode: WriteMode::Create, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2), + ..Default::default() + }), + ) + .await + .unwrap(); Dataset::write( blob_v2_batch(1024), From 9a9cc477e14ab2da6fa39f396e4d17da308e786b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 19:25:07 +0800 Subject: [PATCH 20/21] Restore encoder blob error messages --- rust/lance-encoding/src/encoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index fa6b09d3587..203b3b99642 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -413,14 +413,14 @@ impl StructuralEncodingStrategy { } DataType::Struct(_) => { return Err(Error::InvalidInput { - source: "Blob struct input requires blob version v2".into(), + source: "Blob v2 struct input requires file version >= 2.2".into(), location: location!(), }); } _ => { return Err(Error::InvalidInput { source: format!( - "Blob encoding only supports Binary/LargeBinary, got {}", + "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}", data_type ) .into(), From 97c176520356e730a875edff80a8c07d47bbbefd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 29 Jan 2026 19:28:46 +0800 Subject: [PATCH 21/21] Format code --- rust/lance/src/dataset/cleanup.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index bec67575672..68e760cc3e3 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -738,9 +738,17 @@ fn tagged_old_versions_cleanup_error( mod tests { use std::{collections::HashMap, sync::Arc}; + use super::*; + use crate::blob::{blob_field, BlobArrayBuilder}; + use crate::{ + dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, + index::vector::VectorIndexParams, + }; + use all_asserts::{assert_gt, assert_lt}; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::assert_contains; + use lance_core::utils::tempfile::TempStrDir; use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy}; use lance_index::{DatasetIndexExt, IndexType}; use lance_io::object_store::{ @@ -751,14 +759,6 @@ mod tests { use lance_testing::datagen::{some_batch, BatchGenerator, IncrementingInt32}; use mock_instant::thread_local::MockClock; use snafu::location; - use super::*; - use crate::blob::{blob_field, BlobArrayBuilder}; - use crate::{ - dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, - index::vector::VectorIndexParams, - }; - use all_asserts::{assert_gt, assert_lt}; - use lance_core::utils::tempfile::TempStrDir; #[derive(Debug)] struct MockObjectStore {