From 482cd94129a61c5d43f39b7f6c3c9215e9b5a8c1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 1 Dec 2025 20:07:01 +0800 Subject: [PATCH 01/12] refactor: align with blob v2 logical types change Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 11 +- rust/lance-encoding/src/encoder.rs | 18 +- .../src/encodings/logical/blob.rs | 179 +++++++++++++++++- rust/lance/src/dataset/blob.rs | 43 +++-- 4 files changed, 222 insertions(+), 29 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 6f4c8a5f8f1..bc65000904e 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -43,12 +43,11 @@ use crate::{ pub const LANCE_UNENFORCED_PRIMARY_KEY: &str = "lance-schema:unenforced-primary-key"; fn has_blob_v2_extension(field: &ArrowField) -> bool { - field.data_type() == &DataType::LargeBinary - && field - .metadata() - .get(ARROW_EXT_NAME_KEY) - .map(|name| name == BLOB_V2_EXT_NAME) - .unwrap_or(false) + field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BLOB_V2_EXT_NAME) + .unwrap_or(false) } #[derive(Debug, Default)] diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index ad5b8b2235f..683664a595c 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -28,7 +28,7 @@ use crate::buffer::LanceBuffer; use crate::compression::{CompressionStrategy, DefaultCompressionStrategy}; use crate::compression_config::CompressionParams; use crate::decoder::PageEncoding; -use crate::encodings::logical::blob::BlobStructuralEncoder; +use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder}; use crate::encodings::logical::list::ListStructuralEncoder; use crate::encodings::logical::primitive::PrimitiveStructuralEncoder; use crate::encodings::logical::r#struct::StructStructuralEncoder; @@ -385,10 +385,24 @@ impl StructuralEncodingStrategy { self.compression_strategy.clone(), )?)); } + DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => { + return Ok(Box::new(BlobV2StructuralEncoder::new( + field, + column_index.next_column_index(field.id as u32), + options, + self.compression_strategy.clone(), + )?)); + } + DataType::Struct(_) => { + return Err(Error::InvalidInput { + source: "Blob v2 struct input requires file version >= 2.2".into(), + location: location!(), + }); + } _ => { return Err(Error::InvalidInput { source: format!( - "Blob encoding only supports Binary/LargeBinary, got {}", + "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}", data_type ) .into(), diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index a3379523f82..c389b367915 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -3,11 +3,18 @@ use std::{collections::HashMap, sync::Arc}; -use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array}; +use arrow_array::{ + builder::{PrimitiveBuilder, StringBuilder}, + cast::AsArray, + types::{UInt32Type, UInt64Type, UInt8Type}, + Array, ArrayRef, StructArray, UInt64Array, +}; use arrow_buffer::Buffer; use arrow_schema::{DataType, Field as ArrowField, Fields}; use futures::{future::BoxFuture, FutureExt}; -use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result}; +use lance_core::{ + datatypes::Field, datatypes::BLOB_V2_DESC_FIELDS, error::LanceOptionExt, Error, Result, +}; use snafu::location; use crate::{ @@ -221,6 +228,174 @@ impl FieldEncoder for BlobStructuralEncoder { } } +/// Blob v2 structural encoder - emits RFC 2.2 descriptor struct +pub struct BlobV2StructuralEncoder { + descriptor_encoder: Box, +} + +impl BlobV2StructuralEncoder { + pub fn new( + field: &Field, + column_index: u32, + options: &crate::encoder::EncodingOptions, + compression_strategy: Arc, + ) -> Result { + let mut descriptor_metadata = HashMap::with_capacity(1); + descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string()); + + let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone()); + + let descriptor_field = Field::try_from( + ArrowField::new(&field.name, descriptor_data_type, field.nullable) + .with_metadata(descriptor_metadata), + )?; + + let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new( + options, + compression_strategy, + column_index, + descriptor_field, + Arc::new(HashMap::new()), + )?); + + Ok(Self { descriptor_encoder }) + } +} + +impl FieldEncoder for BlobV2StructuralEncoder { + fn maybe_encode( + &mut self, + array: ArrayRef, + external_buffers: &mut OutOfLineBuffers, + _repdef: RepDefBuilder, + row_number: u64, + num_rows: u64, + ) -> Result> { + // Supported input: Struct + let DataType::Struct(fields) = array.data_type() else { + return Err(Error::InvalidInput { + source: "Blob v2 requires struct input".into(), + location: location!(), + }); + }; + + let struct_arr = array.as_struct(); + let mut data_idx = None; + let mut uri_idx = None; + for (idx, field) in fields.iter().enumerate() { + match field.name().as_str() { + "data" => data_idx = Some(idx), + "uri" => uri_idx = Some(idx), + _ => {} + } + } + let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct must contain 'data' and 'uri' fields".into(), + location: location!(), + })?; + + let data_col = struct_arr.column(data_idx).as_binary::(); + let uri_col = struct_arr.column(uri_idx).as_string::(); + + // Validate XOR(data, uri) + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { + continue; + } + let data_is_set = !data_col.is_null(i); + let uri_is_set = !uri_col.is_null(i); + if data_is_set == uri_is_set { + return Err(Error::InvalidInput { + source: "Each blob row must set exactly one of data or uri".into(), + location: location!(), + }); + } + if uri_is_set { + return Err(Error::NotSupported { + source: "External blob (uri) is not supported yet".into(), + location: location!(), + }); + } + } + + let binary_array = data_col; + + let mut kind_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut position_builder = + PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut size_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); + + for i in 0..binary_array.len() { + let is_null_row = match array.data_type() { + DataType::Struct(_) => array.is_null(i), + _ => binary_array.is_null(i), + }; + if is_null_row { + kind_builder.append_null(); + position_builder.append_null(); + size_builder.append_null(); + blob_id_builder.append_null(); + uri_builder.append_null(); + continue; + } + + let value = binary_array.value(i); + kind_builder.append_value(0); + + if value.is_empty() { + position_builder.append_value(0); + size_builder.append_value(0); + } else { + let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); + position_builder.append_value(position); + size_builder.append_value(value.len() as u64); + } + + blob_id_builder.append_null(); + uri_builder.append_null(); + } + + let children: Vec = vec![ + Arc::new(kind_builder.finish()), + Arc::new(position_builder.finish()), + Arc::new(size_builder.finish()), + Arc::new(blob_id_builder.finish()), + Arc::new(uri_builder.finish()), + ]; + + let descriptor_array = Arc::new(StructArray::try_new( + BLOB_V2_DESC_FIELDS.clone(), + children, + None, + )?) as ArrayRef; + + self.descriptor_encoder.maybe_encode( + descriptor_array, + external_buffers, + RepDefBuilder::default(), + row_number, + num_rows, + ) + } + + fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result> { + self.descriptor_encoder.flush(external_buffers) + } + + fn finish( + &mut self, + external_buffers: &mut OutOfLineBuffers, + ) -> BoxFuture<'_, Result>> { + self.descriptor_encoder.finish(external_buffers) + } + + fn num_columns(&self) -> u32 { + self.descriptor_encoder.num_columns() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 2d0cfc2ec5e..34f644c5d22 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -4,8 +4,7 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; -use arrow::datatypes::{UInt64Type, UInt8Type}; -use arrow_schema::DataType; +use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; @@ -189,7 +188,7 @@ pub(super) async fn take_blobs( let projection = dataset.schema().project(&[column])?; let blob_field = &projection.fields[0]; let blob_field_id = blob_field.id; - if blob_field.data_type() != DataType::LargeBinary || !projection.fields[0].is_blob() { + if !projection.fields[0].is_blob() { return Err(Error::InvalidInput { location: location!(), source: format!("the column '{}' is not a blob column", column).into(), @@ -246,32 +245,38 @@ fn collect_blob_files_v2( let kinds = descriptions.column(0).as_primitive::(); let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); + let _blob_ids = descriptions.column(3).as_primitive::(); + let _uris = descriptions.column(4).as_string::(); let mut files = Vec::with_capacity(row_addrs.len()); for (idx, row_addr) in row_addrs.values().iter().enumerate() { - if positions.is_null(idx) || sizes.is_null(idx) { + if kinds.is_null(idx) { + // Null row continue; } - - if !kinds.is_null(idx) { - let kind = kinds.value(idx); - if kind != INLINE_BLOB_KIND { + let kind = kinds.value(idx); + match kind { + INLINE_BLOB_KIND => { + if positions.is_null(idx) || sizes.is_null(idx) { + continue; + } + let position = positions.value(idx); + let size = sizes.value(idx); + files.push(BlobFile::new( + dataset.clone(), + blob_field_id, + *row_addr, + position, + size, + )); + } + other => { return Err(Error::NotSupported { - source: format!("Blob kind {} is not supported", kind).into(), + source: format!("Blob kind {} is not supported", other).into(), location: location!(), }); } } - - let position = positions.value(idx); - let size = sizes.value(idx); - files.push(BlobFile::new( - dataset.clone(), - blob_field_id, - *row_addr, - position, - size, - )); } Ok(files) From e8f7744cc373c7b9859e7c8e46d7b8553df97ee8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 1 Dec 2025 20:09:06 +0800 Subject: [PATCH 02/12] Fix comments Signed-off-by: Xuanwo --- rust/lance-encoding/src/encodings/logical/blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index c389b367915..e2658e42827 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -228,7 +228,7 @@ impl FieldEncoder for BlobStructuralEncoder { } } -/// Blob v2 structural encoder - emits RFC 2.2 descriptor struct +/// Blob v2 structural encoder pub struct BlobV2StructuralEncoder { descriptor_encoder: Box, } From b9934b94e6c4c588dbddf70d6ae5e731106b8954 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 2 Dec 2025 00:42:15 +0800 Subject: [PATCH 03/12] feat: add external blob read support Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 97 ++++++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 15 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 34f644c5d22..b85483dd4e7 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -5,6 +5,7 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; +use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; @@ -16,6 +17,8 @@ use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version"; +const INLINE_BLOB_KIND: u8 = 0; +const EXTERNAL_BLOB_KIND: u8 = 3; pub fn blob_version_from_config(config: &HashMap) -> BlobVersion { config @@ -35,21 +38,29 @@ enum ReaderState { Closed, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlobKind { + Inline, + External, +} + /// A file-like object that represents a blob in a dataset #[derive(Debug)] pub struct BlobFile { - dataset: Arc, + object_store: Arc, + path: Path, reader: Arc>, - data_file: Path, position: u64, size: u64, + kind: BlobKind, + uri: Option, } impl BlobFile { /// Create a new BlobFile /// /// See [`crate::dataset::Dataset::take_blobs`] - pub fn new( + pub fn new_inline( dataset: Arc, field_id: u32, row_addr: u64, @@ -61,14 +72,39 @@ impl BlobFile { let data_file = frag.data_file_for_field(field_id).unwrap(); let data_file = dataset.data_dir().child(data_file.path.as_str()); Self { - dataset, - data_file, + object_store: dataset.object_store.clone(), + path: data_file, position, size, + kind: BlobKind::Inline, + uri: None, reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), } } + pub async fn new_external( + uri: String, + size: Option, + registry: Arc, + params: Arc, + ) -> Result { + let (object_store, path) = + ObjectStore::from_uri_and_params(registry, &uri, ¶ms).await?; + let size = match size { + Some(sz) => sz, + None => object_store.size(&path).await?, + }; + Ok(Self { + object_store, + path, + position: 0, + size, + kind: BlobKind::External, + uri: Some(uri), + reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), + }) + } + /// Close the blob file, releasing any associated resources pub async fn close(&self) -> Result<()> { let mut reader = self.reader.lock().await; @@ -91,7 +127,7 @@ impl BlobFile { ) -> Result { let mut reader = self.reader.lock().await; if let ReaderState::Uninitialized(cursor) = *reader { - let opened = self.dataset.object_store.open(&self.data_file).await?; + let opened = self.object_store.open(&self.path).await?; let opened = Arc::::from(opened); *reader = ReaderState::Open((cursor, opened.clone())); } @@ -178,6 +214,22 @@ impl BlobFile { pub fn size(&self) -> u64 { self.size } + + pub fn position(&self) -> u64 { + self.position + } + + pub fn data_path(&self) -> &Path { + &self.path + } + + pub fn kind(&self) -> BlobKind { + self.kind + } + + pub fn uri(&self) -> Option<&str> { + self.uri.as_deref() + } } pub(super) async fn take_blobs( @@ -205,12 +257,12 @@ pub(super) async fn take_blobs( match dataset.blob_version() { BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), - BlobVersion::V2 => collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs), + BlobVersion::V2 => { + collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await + } } } -const INLINE_BLOB_KIND: u8 = 0; - fn collect_blob_files_v1( dataset: &Arc, blob_field_id: u32, @@ -231,12 +283,12 @@ fn collect_blob_files_v1( Some((*row_addr, position, size)) }) .map(|(row_addr, position, size)| { - BlobFile::new(dataset.clone(), blob_field_id, row_addr, position, size) + BlobFile::new_inline(dataset.clone(), blob_field_id, row_addr, position, size) }) .collect()) } -fn collect_blob_files_v2( +async fn collect_blob_files_v2( dataset: &Arc, blob_field_id: u32, descriptions: &StructArray, @@ -262,7 +314,7 @@ fn collect_blob_files_v2( } let position = positions.value(idx); let size = sizes.value(idx); - files.push(BlobFile::new( + files.push(BlobFile::new_inline( dataset.clone(), blob_field_id, *row_addr, @@ -270,6 +322,21 @@ fn collect_blob_files_v2( size, )); } + EXTERNAL_BLOB_KIND => { + let uri = _uris.value(idx).to_string(); + let size = if sizes.is_null(idx) { + None + } else { + Some(sizes.value(idx)) + }; + let registry = dataset.session.store_registry(); + let params = dataset + .store_params + .as_ref() + .map(|p| Arc::new((**p).clone())) + .unwrap_or_else(|| Arc::new(ObjectStoreParams::default())); + files.push(BlobFile::new_external(uri, size, registry, params).await?); + } other => { return Err(Error::NotSupported { source: format!("Blob kind {} is not supported", other).into(), @@ -394,9 +461,9 @@ mod tests { let blobs2 = fixture.dataset.take_blobs(&row_ids, "blobs").await.unwrap(); for (blob1, blob2) in blobs.iter().zip(blobs2.iter()) { - assert_eq!(blob1.position, blob2.position); - assert_eq!(blob1.size, blob2.size); - assert_eq!(blob1.data_file, blob2.data_file); + assert_eq!(blob1.position(), blob2.position()); + assert_eq!(blob1.size(), blob2.size()); + assert_eq!(blob1.data_path(), blob2.data_path()); } } From 94110c518e007f3addadf8d8c6624b07aa5d6f4f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 2 Dec 2025 00:53:54 +0800 Subject: [PATCH 04/12] Add blob encoding Signed-off-by: Xuanwo --- .../src/encodings/logical/blob.rs | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index e2658e42827..dcbbedb29b4 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -310,12 +310,6 @@ impl FieldEncoder for BlobV2StructuralEncoder { location: location!(), }); } - if uri_is_set { - return Err(Error::NotSupported { - source: "External blob (uri) is not supported yet".into(), - location: location!(), - }); - } } let binary_array = data_col; @@ -327,12 +321,8 @@ impl FieldEncoder for BlobV2StructuralEncoder { let mut blob_id_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); - for i in 0..binary_array.len() { - let is_null_row = match array.data_type() { - DataType::Struct(_) => array.is_null(i), - _ => binary_array.is_null(i), - }; - if is_null_row { + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { kind_builder.append_null(); position_builder.append_null(); size_builder.append_null(); @@ -341,20 +331,30 @@ impl FieldEncoder for BlobV2StructuralEncoder { continue; } - let value = binary_array.value(i); - kind_builder.append_value(0); - - if value.is_empty() { - position_builder.append_value(0); - size_builder.append_value(0); + let data_is_set = !data_col.is_null(i); + if data_is_set { + let value = binary_array.value(i); + kind_builder.append_value(0); + if value.is_empty() { + position_builder.append_value(0); + size_builder.append_value(0); + } else { + let position = + external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); + position_builder.append_value(position); + size_builder.append_value(value.len() as u64); + } + blob_id_builder.append_null(); + uri_builder.append_null(); } else { - let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); - position_builder.append_value(position); - size_builder.append_value(value.len() as u64); + // external uri + let uri = uri_col.value(i); + kind_builder.append_value(3); + position_builder.append_null(); + size_builder.append_null(); + blob_id_builder.append_null(); + uri_builder.append_value(uri); } - - blob_id_builder.append_null(); - uri_builder.append_null(); } let children: Vec = vec![ From fcab037971d87ea24c1a224796c0406a0d9b3db0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 2 Dec 2025 13:51:58 +0800 Subject: [PATCH 05/12] Add test Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 2 +- .../src/encodings/logical/blob.rs | 44 ++++++++++++++++--- rust/lance/src/dataset/blob.rs | 7 ++- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index bc65000904e..c604dd8474e 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -1004,7 +1004,7 @@ impl TryFrom<&ArrowField> for Field { // Check for JSON extension types (both Arrow and Lance) let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") - } else if is_blob_v2 { + } else if is_blob_v2 && !matches!(field.data_type(), DataType::Struct(_)) { LogicalType::from(super::BLOB_LOGICAL_TYPE) } else { LogicalType::try_from(field.data_type())? diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index dcbbedb29b4..7dc24117645 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -344,15 +344,15 @@ impl FieldEncoder for BlobV2StructuralEncoder { position_builder.append_value(position); size_builder.append_value(value.len() as u64); } - blob_id_builder.append_null(); - uri_builder.append_null(); + blob_id_builder.append_value(0); + uri_builder.append_value(""); } else { // external uri let uri = uri_col.value(i); kind_builder.append_value(3); - position_builder.append_null(); - size_builder.append_null(); - blob_id_builder.append_null(); + position_builder.append_value(0); + size_builder.append_value(0); + blob_id_builder.append_value(0); uri_builder.append_value(uri); } } @@ -401,10 +401,14 @@ mod tests { use super::*; use crate::{ compression::DefaultCompressionStrategy, + constants::PACKED_STRUCT_META_KEY, encoder::{ColumnIndexSequence, EncodingOptions}, testing::{check_round_trip_encoding_of_data, TestCases}, + version::LanceFileVersion, }; - use arrow_array::LargeBinaryArray; + use arrow_array::{ArrayRef, LargeBinaryArray, StringArray, StructArray}; + use arrow_schema::{DataType, Field as ArrowField, Fields}; + use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME}; #[test] fn test_blob_encoder_creation() { @@ -487,4 +491,32 @@ mod tests { // Use the standard test harness check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await; } + + #[tokio::test] + async fn test_blob_v2_external_round_trip() { + let blob_metadata = + HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + + let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); + let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + + let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]); + let uri_array = StringArray::from(vec![ + None, + Some("file:///tmp/external.bin"), + Some("s3://bucket/blob"), + ]); + + let struct_array = StructArray::from(vec![ + (data_field, Arc::new(data_array) as ArrayRef), + (uri_field, Arc::new(uri_array) as ArrayRef), + ]); + + check_round_trip_encoding_of_data( + vec![Arc::new(struct_array)], + &TestCases::default(), + blob_metadata, + ) + .await; + } } diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index b85483dd4e7..6a2431297d0 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -327,7 +327,12 @@ async fn collect_blob_files_v2( let size = if sizes.is_null(idx) { None } else { - Some(sizes.value(idx)) + let value = sizes.value(idx); + if value == 0 { + None + } else { + Some(value) + } }; let registry = dataset.session.store_registry(); let params = dataset From 22d4df2906f9c183498c14b155bbb218b3aab442 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 2 Dec 2025 20:26:10 +0800 Subject: [PATCH 06/12] Save work Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes.rs | 11 ++-- rust/lance-core/src/datatypes/field.rs | 4 +- rust/lance-encoding/src/decoder.rs | 21 +++++++ .../src/encodings/logical/blob.rs | 62 ++++++++++++++----- rust/lance-encoding/src/testing.rs | 59 +++++++++++++++--- 5 files changed, 128 insertions(+), 29 deletions(-) diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 76e6924ff75..3255072105c 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -49,10 +49,10 @@ pub static BLOB_DESC_LANCE_FIELD: LazyLock = pub static BLOB_V2_DESC_FIELDS: LazyLock = LazyLock::new(|| { Fields::from(vec![ ArrowField::new("kind", DataType::UInt8, false), - ArrowField::new("position", DataType::UInt64, true), - ArrowField::new("size", DataType::UInt64, true), - ArrowField::new("blob_id", DataType::UInt32, true), - ArrowField::new("blob_uri", DataType::Utf8, true), + ArrowField::new("position", DataType::UInt64, false), + ArrowField::new("size", DataType::UInt64, false), + ArrowField::new("blob_id", DataType::UInt32, false), + ArrowField::new("blob_uri", DataType::Utf8, false), ]) }); @@ -60,8 +60,9 @@ pub static BLOB_V2_DESC_TYPE: LazyLock = LazyLock::new(|| DataType::Struct(BLOB_V2_DESC_FIELDS.clone())); pub static BLOB_V2_DESC_FIELD: LazyLock = LazyLock::new(|| { - ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([ + ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), false).with_metadata(HashMap::from([ (lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()), + ("lance-encoding:packed".to_string(), "true".to_string()), ])) }); diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index c604dd8474e..5084b659013 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -513,15 +513,17 @@ impl Field { /// /// If the field is not a blob, return the field itself. pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self { - if self.data_type().is_binary_like() && self.is_blob() { + if self.is_blob() { match version { BlobVersion::V2 => { self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone(); } BlobVersion::V1 => { self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); self.children = BLOB_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); } } } diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 70730d21371..56ed576fd17 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -712,6 +712,27 @@ impl CoreFieldDecoderStrategy { } match &data_type { DataType::Struct(fields) => { + let is_blob_v2_desc = field.children.len() == 5 + && field.children[0].name == "kind" + && field.children[1].name == "position" + && field.children[2].name == "size" + && field.children[3].name == "blob_id" + && field.children[4].name == "blob_uri"; + + if field.is_blob() || is_blob_v2_desc { + let column_info = column_infos.expect_next()?; + let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( + column_info.as_ref(), + self.decompressor_strategy.as_ref(), + self.cache_repetition_index, + &lance_core::datatypes::BLOB_V2_DESC_LANCE_FIELD, + )?); + + column_infos.next_top_level(); + + return Ok(scheduler); + } + if field.is_packed_struct() { // Packed struct let column_info = column_infos.expect_next()?; diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 7dc24117645..fffb6340756 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -58,7 +58,7 @@ impl BlobStructuralEncoder { // Use the original field's name for the descriptor let descriptor_field = Field::try_from( - ArrowField::new(&field.name, descriptor_data_type, field.nullable) + ArrowField::new(&field.name, descriptor_data_type, false) .with_metadata(descriptor_metadata), )?; @@ -267,7 +267,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { &mut self, array: ArrayRef, external_buffers: &mut OutOfLineBuffers, - _repdef: RepDefBuilder, + mut repdef: RepDefBuilder, row_number: u64, num_rows: u64, ) -> Result> { @@ -280,6 +280,11 @@ impl FieldEncoder for BlobV2StructuralEncoder { }; let struct_arr = array.as_struct(); + if let Some(validity) = struct_arr.nulls() { + repdef.add_validity_bitmap(validity.clone()); + } else { + repdef.add_no_null(struct_arr.len()); + } let mut data_idx = None; let mut uri_idx = None; for (idx, field) in fields.iter().enumerate() { @@ -323,11 +328,12 @@ impl FieldEncoder for BlobV2StructuralEncoder { for i in 0..struct_arr.len() { if struct_arr.is_null(i) { - kind_builder.append_null(); - position_builder.append_null(); - size_builder.append_null(); - blob_id_builder.append_null(); - uri_builder.append_null(); + // Packed struct does not support nullable fields; use empty/default values and rely on rep/def. + kind_builder.append_value(0); + position_builder.append_value(0); + size_builder.append_value(0); + blob_id_builder.append_value(0); + uri_builder.append_value(""); continue; } @@ -374,7 +380,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { self.descriptor_encoder.maybe_encode( descriptor_array, external_buffers, - RepDefBuilder::default(), + repdef, row_number, num_rows, ) @@ -401,14 +407,15 @@ mod tests { use super::*; use crate::{ compression::DefaultCompressionStrategy, - constants::PACKED_STRUCT_META_KEY, encoder::{ColumnIndexSequence, EncodingOptions}, - testing::{check_round_trip_encoding_of_data, TestCases}, + testing::{ + check_round_trip_encoding_of_data, check_round_trip_encoding_of_data_with_expected, + TestCases, + }, version::LanceFileVersion, }; - use arrow_array::{ArrayRef, LargeBinaryArray, StringArray, StructArray}; - use arrow_schema::{DataType, Field as ArrowField, Fields}; - use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME}; + use arrow_array::{ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array}; + use arrow_schema::{DataType, Field as ArrowField}; #[test] fn test_blob_encoder_creation() { @@ -512,9 +519,34 @@ mod tests { (uri_field, Arc::new(uri_array) as ArrayRef), ]); - check_round_trip_encoding_of_data( + let expected_descriptor = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("kind", DataType::UInt8, false)), + Arc::new(UInt8Array::from(vec![0, 3, 3])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("position", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("size", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)), + Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), + Arc::new(StringArray::from(vec!["", "file:///tmp/external.bin", "s3://bucket/blob"])) + as ArrayRef, + ), + ]); + + check_round_trip_encoding_of_data_with_expected( vec![Arc::new(struct_array)], - &TestCases::default(), + Some(Arc::new(expected_descriptor)), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), blob_metadata, ) .await; diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 8987f5a31b3..3b6c43a3d2a 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -14,7 +14,7 @@ use crate::{ use arrow_array::{make_array, Array, StructArray, UInt64Array}; use arrow_data::transform::{Capacities, MutableArrayData}; use arrow_ord::ord::make_comparator; -use arrow_schema::{DataType, Field, FieldRef, Schema, SortOptions}; +use arrow_schema::{DataType, Field, Field as ArrowField, FieldRef, Schema, SortOptions}; use arrow_select::concat::concat; use bytes::{Bytes, BytesMut}; use futures::{future::BoxFuture, FutureExt, StreamExt}; @@ -83,6 +83,12 @@ fn column_indices_from_schema_helper( // In the old style, every field except FSL gets its own column. In the new style only primitive // leaf fields get their own column. for field in fields { + if is_structural_encoding && field.metadata().contains_key("lance-encoding:packed") { + column_indices.push(*column_counter); + *column_counter += 1; + continue; + } + match field.data_type() { DataType::Struct(fields) => { if !is_structural_encoding { @@ -698,6 +704,15 @@ pub async fn check_round_trip_encoding_of_data( data: Vec>, test_cases: &TestCases, metadata: HashMap, +) { + check_round_trip_encoding_of_data_with_expected(data, None, test_cases, metadata).await +} + +pub async fn check_round_trip_encoding_of_data_with_expected( + data: Vec>, + expected_override: Option>, + test_cases: &TestCases, + metadata: HashMap, ) { let example_data = data.first().expect("Data must have at least one array"); let mut field = Field::new("", example_data.data_type().clone(), true); @@ -725,8 +740,15 @@ pub async fn check_round_trip_encoding_of_data( "Testing round trip encoding of data with file version {} and page size {}", file_version, page_size ); - check_round_trip_encoding_inner(encoder, &field, data.clone(), test_cases, file_version) - .await + check_round_trip_encoding_inner( + encoder, + &field, + data.clone(), + expected_override.clone(), + test_cases, + file_version, + ) + .await } } } @@ -795,6 +817,7 @@ async fn check_round_trip_encoding_inner( mut encoder: Box, field: &Field, data: Vec>, + expected_override: Option>, test_cases: &TestCases, file_version: LanceFileVersion, ) { @@ -902,8 +925,6 @@ async fn check_round_trip_encoding_inner( let scheduler = Arc::new(SimulatedScheduler::new(encoded_data)) as Arc; - let schema = Schema::new(vec![field.clone()]); - let num_rows = data.iter().map(|arr| arr.len() as u64).sum::(); let concat_data = if test_cases.skip_validation { None @@ -924,8 +945,29 @@ async fn check_round_trip_encoding_inner( Some(concat(&data.iter().map(|arr| arr.as_ref()).collect::>()).unwrap()) }; + let expected_data = expected_override.clone().or_else(|| concat_data.clone()); + let is_structural_encoding = file_version >= LanceFileVersion::V2_1; + let decode_field = if is_structural_encoding { + let mut lance_field = lance_core::datatypes::Field::try_from(field).unwrap(); + if lance_field.is_blob() && matches!(lance_field.data_type(), DataType::Struct(_)) { + lance_field = + lance_field.into_unloaded_with_version(lance_core::datatypes::BlobVersion::V2); + let mut arrow_field = ArrowField::from(&lance_field); + let mut metadata = arrow_field.metadata().clone(); + metadata.insert("lance-encoding:packed".to_string(), "true".to_string()); + arrow_field = arrow_field.with_metadata(metadata); + arrow_field + } else { + field.clone() + } + } else { + field.clone() + }; + + let schema = Schema::new(vec![decode_field]); + debug!("Testing full decode"); let scheduler_copy = scheduler.clone(); test_decode( @@ -933,7 +975,7 @@ async fn check_round_trip_encoding_inner( test_cases.batch_size, &schema, &column_infos, - concat_data.clone(), + expected_data.clone(), scheduler_copy.clone(), is_structural_encoding, |mut decode_scheduler, tx| { @@ -954,9 +996,9 @@ async fn check_round_trip_encoding_inner( for range in &test_cases.ranges { debug!("Testing decode of range {:?}", range); let num_rows = range.end - range.start; - let expected = concat_data + let expected = expected_data .as_ref() - .map(|concat_data| concat_data.slice(range.start as usize, num_rows as usize)); + .map(|arr| arr.slice(range.start as usize, num_rows as usize)); let scheduler = scheduler.clone(); let range = range.clone(); test_decode( @@ -1129,6 +1171,7 @@ async fn check_round_trip_random( encoder_factory(file_version), &field, data, + None, test_cases, file_version, ) From 6d38da9569ce5af8dc311d6a2a7e66faf0dbeb3a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 2 Dec 2025 20:43:11 +0800 Subject: [PATCH 07/12] Fix build Signed-off-by: Xuanwo --- rust/lance-encoding/src/decoder.rs | 21 ------------------- .../src/encodings/logical/blob.rs | 11 +++++++--- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 56ed576fd17..70730d21371 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -712,27 +712,6 @@ impl CoreFieldDecoderStrategy { } match &data_type { DataType::Struct(fields) => { - let is_blob_v2_desc = field.children.len() == 5 - && field.children[0].name == "kind" - && field.children[1].name == "position" - && field.children[2].name == "size" - && field.children[3].name == "blob_id" - && field.children[4].name == "blob_uri"; - - if field.is_blob() || is_blob_v2_desc { - let column_info = column_infos.expect_next()?; - let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( - column_info.as_ref(), - self.decompressor_strategy.as_ref(), - self.cache_repetition_index, - &lance_core::datatypes::BLOB_V2_DESC_LANCE_FIELD, - )?); - - column_infos.next_top_level(); - - return Ok(scheduler); - } - if field.is_packed_struct() { // Packed struct let column_info = column_infos.expect_next()?; diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index fffb6340756..ca1fa5ace42 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -414,7 +414,9 @@ mod tests { }, version::LanceFileVersion, }; - use arrow_array::{ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array}; + use arrow_array::{ + ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array, + }; use arrow_schema::{DataType, Field as ArrowField}; #[test] @@ -538,8 +540,11 @@ mod tests { ), ( Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), - Arc::new(StringArray::from(vec!["", "file:///tmp/external.bin", "s3://bucket/blob"])) - as ArrayRef, + Arc::new(StringArray::from(vec![ + "", + "file:///tmp/external.bin", + "s3://bucket/blob", + ])) as ArrayRef, ), ]); From 68ffcb1b41aa0f298b25e44e56a37f744c94072c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 13:41:04 +0800 Subject: [PATCH 08/12] Address comments Signed-off-by: Xuanwo --- rust/lance-encoding/src/encodings/logical/blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index ca1fa5ace42..97bbcd325c8 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -58,7 +58,7 @@ impl BlobStructuralEncoder { // Use the original field's name for the descriptor let descriptor_field = Field::try_from( - ArrowField::new(&field.name, descriptor_data_type, false) + ArrowField::new(&field.name, descriptor_data_type, field.nullable) .with_metadata(descriptor_metadata), )?; From 34af2ed979a8fb246b466a2d549f910af4f0c085 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 13:43:34 +0800 Subject: [PATCH 09/12] polish Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 2 +- rust/lance-encoding/src/encodings/logical/blob.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 5084b659013..cd6377098f0 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -1006,7 +1006,7 @@ impl TryFrom<&ArrowField> for Field { // Check for JSON extension types (both Arrow and Lance) let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") - } else if is_blob_v2 && !matches!(field.data_type(), DataType::Struct(_)) { + } else if is_blob_v2 { LogicalType::from(super::BLOB_LOGICAL_TYPE) } else { LogicalType::try_from(field.data_type())? diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 97bbcd325c8..6c9dbfb609e 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -328,7 +328,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { for i in 0..struct_arr.len() { if struct_arr.is_null(i) { - // Packed struct does not support nullable fields; use empty/default values and rely on rep/def. + // Packed struct does not support nullable fields; use empty/default values instead. kind_builder.append_value(0); position_builder.append_value(0); size_builder.append_value(0); From 77398ea58f2d10fb16ed8c3435c527297108f930 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 14:00:06 +0800 Subject: [PATCH 10/12] Refactor into using blob kind Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes.rs | 31 +++++++++++++++++++ .../src/encodings/logical/blob.rs | 11 +++++-- rust/lance/src/dataset/blob.rs | 18 +++-------- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 3255072105c..f6400c5a5ae 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -416,3 +416,34 @@ pub fn lance_supports_nulls(datatype: &DataType) -> bool { | DataType::FixedSizeList(_, _) ) } + +/// Physical storage mode for blob v2 descriptors (one byte, stored in the packed struct column). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum BlobKind { + /// Stored in the main data file’s out-of-line buffer; `position`/`size` point into that file. + Inline = 0, + /// Stored in a shared packed blob file; `position`/`size` locate the slice, `blob_id` selects the file. + Packed = 1, + /// Stored in a dedicated raw blob file; `blob_id` identifies the file, `size` is the full file length. + Dedicated = 2, + /// Not stored by Lance; `blob_uri` holds an absolute external URI, offsets are zero. + External = 3, +} + +impl TryFrom for BlobKind { + type Error = Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Self::Inline), + 1 => Ok(Self::Packed), + 2 => Ok(Self::Dedicated), + 3 => Ok(Self::External), + other => Err(Error::InvalidInput { + source: format!("Unknown blob kind {other:?}").into(), + location: location!(), + }), + } + } +} diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 6c9dbfb609e..6d7b3a51e96 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -26,6 +26,7 @@ use crate::{ format::ProtobufUtils21, repdef::{DefinitionInterpretation, RepDefBuilder}, }; +use lance_core::datatypes::BlobKind; /// Blob structural encoder - stores large binary data in external buffers /// @@ -340,7 +341,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { let data_is_set = !data_col.is_null(i); if data_is_set { let value = binary_array.value(i); - kind_builder.append_value(0); + kind_builder.append_value(BlobKind::Inline as u8); if value.is_empty() { position_builder.append_value(0); size_builder.append_value(0); @@ -355,7 +356,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { } else { // external uri let uri = uri_col.value(i); - kind_builder.append_value(3); + kind_builder.append_value(BlobKind::External as u8); position_builder.append_value(0); size_builder.append_value(0); blob_id_builder.append_value(0); @@ -524,7 +525,11 @@ mod tests { let expected_descriptor = StructArray::from(vec![ ( Arc::new(ArrowField::new("kind", DataType::UInt8, false)), - Arc::new(UInt8Array::from(vec![0, 3, 3])) as ArrayRef, + Arc::new(UInt8Array::from(vec![ + BlobKind::Inline as u8, + BlobKind::External as u8, + BlobKind::External as u8, + ])) as ArrayRef, ), ( Arc::new(ArrowField::new("position", DataType::UInt64, false)), diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 6a2431297d0..8c6adf1db54 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -12,13 +12,11 @@ use tokio::sync::Mutex; use super::Dataset; use arrow_array::{Array, StructArray}; -use lance_core::datatypes::BlobVersion; +use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version"; -const INLINE_BLOB_KIND: u8 = 0; -const EXTERNAL_BLOB_KIND: u8 = 3; pub fn blob_version_from_config(config: &HashMap) -> BlobVersion { config @@ -38,12 +36,6 @@ enum ReaderState { Closed, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BlobKind { - Inline, - External, -} - /// A file-like object that represents a blob in a dataset #[derive(Debug)] pub struct BlobFile { @@ -306,9 +298,9 @@ async fn collect_blob_files_v2( // Null row continue; } - let kind = kinds.value(idx); + let kind = BlobKind::try_from(kinds.value(idx))?; match kind { - INLINE_BLOB_KIND => { + BlobKind::Inline => { if positions.is_null(idx) || sizes.is_null(idx) { continue; } @@ -322,7 +314,7 @@ async fn collect_blob_files_v2( size, )); } - EXTERNAL_BLOB_KIND => { + BlobKind::External => { let uri = _uris.value(idx).to_string(); let size = if sizes.is_null(idx) { None @@ -344,7 +336,7 @@ async fn collect_blob_files_v2( } other => { return Err(Error::NotSupported { - source: format!("Blob kind {} is not supported", other).into(), + source: format!("Blob kind {:?} is not supported", other).into(), location: location!(), }); } From fbcdb5061ef48e8939c37d5e767a3d0bf76b2fea Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 14:05:34 +0800 Subject: [PATCH 11/12] Fix Signed-off-by: Xuanwo --- rust/lance-encoding/src/encodings/logical/blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index 6d7b3a51e96..e79e89921b1 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -330,7 +330,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { for i in 0..struct_arr.len() { if struct_arr.is_null(i) { // Packed struct does not support nullable fields; use empty/default values instead. - kind_builder.append_value(0); + kind_builder.append_value(BlobKind::Inline as u8); position_builder.append_value(0); size_builder.append_value(0); blob_id_builder.append_value(0); From 70bab99751a7620c6efd5b38fca3dd7e03047ebe Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 3 Dec 2025 14:16:24 +0800 Subject: [PATCH 12/12] Fix blob Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 37 +++++++++++++--------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 8c6adf1db54..effb5cfe641 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -11,7 +11,7 @@ use snafu::location; use tokio::sync::Mutex; use super::Dataset; -use arrow_array::{Array, StructArray}; +use arrow_array::StructArray; use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; @@ -76,15 +76,16 @@ impl BlobFile { pub async fn new_external( uri: String, - size: Option, + size: u64, registry: Arc, params: Arc, ) -> Result { let (object_store, path) = ObjectStore::from_uri_and_params(registry, &uri, ¶ms).await?; - let size = match size { - Some(sz) => sz, - None => object_store.size(&path).await?, + let size = if size > 0 { + size + } else { + object_store.size(&path).await? }; Ok(Self { object_store, @@ -290,20 +291,19 @@ async fn collect_blob_files_v2( let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); let _blob_ids = descriptions.column(3).as_primitive::(); - let _uris = descriptions.column(4).as_string::(); + let blob_uris = descriptions.column(4).as_string::(); let mut files = Vec::with_capacity(row_addrs.len()); for (idx, row_addr) in row_addrs.values().iter().enumerate() { - if kinds.is_null(idx) { - // Null row + let kind = BlobKind::try_from(kinds.value(idx))?; + + // Struct is non-nullable; null rows are encoded as inline with zero position/size and empty uri + if matches!(kind, BlobKind::Inline) && positions.value(idx) == 0 && sizes.value(idx) == 0 { continue; } - let kind = BlobKind::try_from(kinds.value(idx))?; + match kind { BlobKind::Inline => { - if positions.is_null(idx) || sizes.is_null(idx) { - continue; - } let position = positions.value(idx); let size = sizes.value(idx); files.push(BlobFile::new_inline( @@ -315,17 +315,8 @@ async fn collect_blob_files_v2( )); } BlobKind::External => { - let uri = _uris.value(idx).to_string(); - let size = if sizes.is_null(idx) { - None - } else { - let value = sizes.value(idx); - if value == 0 { - None - } else { - Some(value) - } - }; + let uri = blob_uris.value(idx).to_string(); + let size = sizes.value(idx); let registry = dataset.session.store_registry(); let params = dataset .store_params