From 69cbd4138d4f84e63f3d7fd01a59721c7249af19 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Dec 2025 21:18:52 +0800 Subject: [PATCH 1/9] Add blob API Signed-off-by: Xuanwo --- rust/lance/src/blob.rs | 238 +++++++++++++++++++++++++++++++++++++++++ rust/lance/src/lib.rs | 2 + 2 files changed, 240 insertions(+) create mode 100644 rust/lance/src/blob.rs diff --git a/rust/lance/src/blob.rs b/rust/lance/src/blob.rs new file mode 100644 index 00000000000..32a24f1b2e9 --- /dev/null +++ b/rust/lance/src/blob.rs @@ -0,0 +1,238 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Convenience builders for Lance blob v2 input columns. +//! +//! Blob v2 expects a column shaped as `Struct` and +//! tagged with `ARROW:extension:name = "lance.blob.v2"`. This module offers a +//! type-safe builder to construct that struct without manually wiring metadata + +use std::sync::Arc; + +use arrow_array::{ + builder::LargeBinaryBuilder, builder::StringBuilder, Array, ArrayRef, StructArray, +}; +use arrow_buffer::NullBufferBuilder; +use arrow_schema::{DataType, Field}; +use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_V2_EXT_NAME}; + +use crate::{Error, Result}; + +/// A typed wrapper around the blob v2 input struct column. +pub struct BlobArray { + inner: StructArray, +} + +impl BlobArray { + /// Construct the Arrow field for a blob v2 column. + pub fn field(name: &str, nullable: bool) -> Field { + let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())] + .into_iter() + .collect(); + Field::new( + name, + DataType::Struct( + vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ] + .into(), + ), + nullable, + ) + .with_metadata(metadata) + } + + /// Borrow the underlying struct array. + pub fn as_struct(&self) -> &StructArray { + &self.inner + } + + /// Number of rows. + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn null_count(&self) -> usize { + self.inner.null_count() + } +} + +impl From for ArrayRef { + fn from(value: BlobArray) -> Self { + Arc::new(value.inner) + } +} + +/// Builder for [`BlobArray`]. +/// +/// The builder enforces that each row contains exactly one of `data` or `uri` (or is null). +pub struct BlobArrayBuilder { + data_builder: LargeBinaryBuilder, + uri_builder: StringBuilder, + validity: NullBufferBuilder, + expected_len: usize, + len: usize, +} + +impl BlobArrayBuilder { + /// Create a new builder with the given row capacity. + pub fn new(capacity: usize) -> Self { + Self { + data_builder: LargeBinaryBuilder::with_capacity(capacity, 0), + uri_builder: StringBuilder::with_capacity(capacity, 0), + validity: NullBufferBuilder::new(capacity), + expected_len: capacity, + len: 0, + } + } + + /// Append a blob backed by raw bytes. + pub fn push_bytes(&mut self, bytes: impl AsRef<[u8]>) -> Result<()> { + self.ensure_capacity()?; + self.validity.append_non_null(); + self.data_builder.append_value(bytes); + self.uri_builder.append_null(); + self.len += 1; + Ok(()) + } + + /// Append a blob referenced by URI. + pub fn push_uri(&mut self, uri: impl Into) -> Result<()> { + self.ensure_capacity()?; + let uri = uri.into(); + if uri.is_empty() { + return Err(Error::invalid_input( + "URI cannot be empty", + snafu::location!(), + )); + } + self.validity.append_non_null(); + self.data_builder.append_null(); + self.uri_builder.append_value(uri); + self.len += 1; + Ok(()) + } + + /// Append an empty blob (inline, zero-length payload). + pub fn push_empty(&mut self) -> Result<()> { + self.ensure_capacity()?; + self.validity.append_non_null(); + self.data_builder.append_value([]); + self.uri_builder.append_null(); + self.len += 1; + Ok(()) + } + + /// Append a null row. + pub fn push_null(&mut self) -> Result<()> { + self.ensure_capacity()?; + self.validity.append_null(); + self.data_builder.append_null(); + self.uri_builder.append_null(); + self.len += 1; + Ok(()) + } + + /// Finish building and return the [`BlobArray`]. + pub fn finish(mut self) -> Result { + if self.len != self.expected_len { + return Err(Error::invalid_input( + format!( + "Expected {} rows but received {}", + self.expected_len, self.len + ), + snafu::location!(), + )); + } + + let data = Arc::new(self.data_builder.finish()); + let uri = Arc::new(self.uri_builder.finish()); + let validity = self.validity.finish(); + + let struct_array = StructArray::try_new( + vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ] + .into(), + vec![data as ArrayRef, uri as ArrayRef], + validity, + )?; + + Ok(BlobArray { + inner: struct_array, + }) + } + + fn ensure_capacity(&self) -> Result<()> { + if self.len >= self.expected_len { + Err(Error::invalid_input( + "BlobArrayBuilder capacity exceeded", + snafu::location!(), + )) + } else { + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::cast::AsArray; + use arrow_array::Array; + + #[test] + fn test_field_metadata() { + let field = BlobArray::field("blob", true); + assert!(field.metadata().get(ARROW_EXT_NAME_KEY).is_some()); + assert_eq!( + field.metadata().get(ARROW_EXT_NAME_KEY).unwrap(), + BLOB_V2_EXT_NAME + ); + } + + #[test] + fn test_builder_basic() { + let mut b = BlobArrayBuilder::new(4); + b.push_bytes(b"hi").unwrap(); + b.push_uri("s3://bucket/key").unwrap(); + b.push_empty().unwrap(); + b.push_null().unwrap(); + + let arr = b.finish().unwrap(); + assert_eq!(arr.len(), 4); + assert_eq!(arr.null_count(), 1); + + let struct_arr = arr.as_struct(); + let data = struct_arr.column(0).as_binary::(); + let uri = struct_arr.column(1).as_string::(); + + assert_eq!(data.value(0), b"hi"); + assert!(uri.is_null(0)); + assert!(data.is_null(1)); + assert_eq!(uri.value(1), "s3://bucket/key"); + assert_eq!(data.value(2).len(), 0); + assert!(uri.is_null(2)); + } + + #[test] + fn test_capacity_error() { + let mut b = BlobArrayBuilder::new(1); + b.push_bytes(b"a").unwrap(); + let err = b.push_bytes(b"b").unwrap_err(); + assert!(err.to_string().contains("capacity exceeded")); + } + + #[test] + fn test_empty_uri_rejected() { + let mut b = BlobArrayBuilder::new(1); + let err = b.push_uri("").unwrap_err(); + assert!(err.to_string().contains("URI cannot be empty")); + } +} diff --git a/rust/lance/src/lib.rs b/rust/lance/src/lib.rs index 3f579994957..1df737156e5 100644 --- a/rust/lance/src/lib.rs +++ b/rust/lance/src/lib.rs @@ -76,6 +76,7 @@ pub use lance_core::{Error, Result}; use std::sync::LazyLock; pub mod arrow; +pub mod blob; pub mod datafusion; pub mod dataset; pub mod index; @@ -84,6 +85,7 @@ pub mod session; pub mod table; pub mod utils; +pub use blob::{BlobArray, BlobArrayBuilder}; pub use dataset::Dataset; use lance_index::vector::DIST_COL; From c1021429e86169b469690120d38c3dea4cb48fc6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Dec 2025 21:42:39 +0800 Subject: [PATCH 2/9] add tests Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 43 +++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index d56b9e2fb8a..5b4843b68cb 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -788,13 +788,20 @@ mod tests { use futures::TryStreamExt; use lance_arrow::DataTypeExt; use lance_io::stream::RecordBatchStream; + use arrow_array::{RecordBatchIterator, UInt32Array}; + use arrow_schema::{Field, Schema, DataType}; use lance_core::{utils::tempfile::TempStrDir, Error, Result}; use lance_datagen::{array, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; use super::data_file_key_from_path; - use crate::{utils::test::TestDatasetGenerator, Dataset}; + use crate::{ + blob::{BlobArray, BlobArrayBuilder}, + dataset::WriteParams, + utils::test::TestDatasetGenerator, + Dataset, + }; struct BlobTestFixture { _test_dir: TempStrDir, @@ -1054,4 +1061,38 @@ mod tests { assert_eq!(data_file_key_from_path("abc.lance"), "abc"); assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz"); } + + #[tokio::test] + async fn test_write_and_take_blobs_with_blob_array_builder() { + let test_dir = TempStrDir::default(); + + // Build a blob column with the new BlobArrayBuilder + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder.push_bytes(b"hello").unwrap(); + blob_builder.push_bytes(b"world").unwrap(); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap().into(); + + let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(vec![0, 1])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + BlobArray::field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); + let dataset = Arc::new(Dataset::write(reader, &test_dir, Some(params)).await.unwrap()); + + let blobs = dataset + .take_blobs_by_indices(&[0, 1], "blob") + .await + .unwrap(); + + assert_eq!(blobs.len(), 2); + let first = blobs[0].read().await.unwrap(); + let second = blobs[1].read().await.unwrap(); + assert_eq!(first.as_ref(), b"hello"); + assert_eq!(second.as_ref(), b"world"); + } } From 82544b05d71c31e80949cabc37132d9dfdf59b34 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Dec 2025 00:27:53 +0800 Subject: [PATCH 3/9] Save Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 26 ++++++++++++++++++++++++- rust/lance-core/src/datatypes/schema.rs | 4 +++- rust/lance-file/src/reader.rs | 5 +++-- rust/lance-file/src/writer.rs | 9 +++++++++ 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index e96f29efa6a..59c61d3c8c5 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -516,6 +516,23 @@ impl Field { .unwrap_or(false) } + /// If the field is a blob, return a new field with the same name and id + /// but with the data type set to a struct of the blob description fields. + /// + /// If the field is not a blob, return the field itself. + pub fn unloaded_mut(&mut self) -> &mut Self { + if self.is_blob_v2() { + self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); + self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone(); + } else if self.is_blob() { + self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); + self.children = BLOB_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); + } + self + } + /// If the field is a blob, return a new field with the same name and id /// but with the data type set to a struct of the blob description fields. /// @@ -744,6 +761,13 @@ impl Field { (&self_type, &other_type), (DataType::Struct(_), DataType::Struct(_)) | (DataType::List(_), DataType::List(_)) ) { + // Blob v2 uses a struct logical type for descriptors, which differs from the logical + // input struct (data/uri). When intersecting schemas for projection we want to keep + // the projected blob layout instead of intersecting by child names. + if ignore_types && self.is_blob() && other.is_blob() { + return Ok(self.clone()); + } + let children = self .children .iter() @@ -1015,7 +1039,7 @@ impl TryFrom<&ArrowField> for Field { let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") } else if is_blob_v2 { - LogicalType::from(super::BLOB_LOGICAL_TYPE) + LogicalType::from("struct") } else { LogicalType::try_from(field.data_type())? }; diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 5a3fb60dfb0..cdcc3cef1e6 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -928,7 +928,9 @@ pub enum BlobHandling { impl BlobHandling { fn should_unload(&self, field: &Field) -> bool { - if !field.data_type().is_binary_like() { + // Blob v2 columns are Structs, so we need to treat any blob-marked field as unloadable + // even if the physical data type is not binary-like. + if !(field.data_type().is_binary_like() || field.is_blob()) { return false; } match self { diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 01f0194d91f..62ae0bfb06c 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -251,10 +251,11 @@ impl ReaderProjection { field_id_to_column_index, &mut column_indices, )?; - Ok(Self { + let projection = Self { schema: Arc::new(schema.clone()), column_indices, - }) + }; + Ok(projection) } /// Creates a projection that reads the entire file diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index d32cd6712e8..2f25f1e8752 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -465,6 +465,15 @@ impl FileWriter { async fn write_global_buffers(&mut self) -> Result> { let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?; schema.metadata = std::mem::take(&mut self.schema_metadata); + // Use descriptor layout for blob v2 in the footer to avoid exposing logical child fields. + // + // TODO(xuanwo): this doesn't work on nested struct, need better solution like fields_per_order_mut? + schema.fields.iter_mut().for_each(|f| { + if f.is_blob_v2() { + let _ = f.unloaded_mut(); + } + }); + let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?; let file_descriptor_bytes = file_descriptor.encode_to_vec(); let file_descriptor_len = file_descriptor_bytes.len() as u64; From b41489acc7769a11cfffb14fc2775031522a3838 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Dec 2025 00:52:51 +0800 Subject: [PATCH 4/9] cargo fmt Signed-off-by: Xuanwo --- rust/lance/src/dataset/blob.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 5b4843b68cb..ff7024f9396 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -785,11 +785,11 @@ mod tests { use arrow::{array::AsArray, datatypes::UInt64Type}; use arrow_array::RecordBatch; + use arrow_array::{RecordBatchIterator, UInt32Array}; + use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; use lance_arrow::DataTypeExt; use lance_io::stream::RecordBatchStream; - use arrow_array::{RecordBatchIterator, UInt32Array}; - use arrow_schema::{Field, Schema, DataType}; use lance_core::{utils::tempfile::TempStrDir, Error, Result}; use lance_datagen::{array, BatchCount, RowCount}; @@ -1082,7 +1082,11 @@ mod tests { let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); - let dataset = Arc::new(Dataset::write(reader, &test_dir, Some(params)).await.unwrap()); + let dataset = Arc::new( + Dataset::write(reader, &test_dir, Some(params)) + .await + .unwrap(), + ); let blobs = dataset .take_blobs_by_indices(&[0, 1], "blob") From 067eda68fa3e350a7b8374170d31a7437825e516 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Dec 2025 18:01:26 +0800 Subject: [PATCH 5/9] Fix metadata handling during convert Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 59c61d3c8c5..f645ed04210 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -21,7 +21,7 @@ use arrow_schema::{DataType, Field as ArrowField}; use deepsize::DeepSizeOf; use lance_arrow::{ json::{is_arrow_json_field, is_json_field}, - DataTypeExt, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME, + DataTypeExt, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME, }; use snafu::location; @@ -1031,8 +1031,8 @@ impl TryFrom<&ArrowField> for Field { if is_blob_v2 { metadata - .entry(BLOB_META_KEY.to_string()) - .or_insert_with(|| "true".to_string()); + .entry(ARROW_EXT_NAME_KEY.to_string()) + .or_insert_with(|| BLOB_V2_EXT_NAME.to_string()); } // Check for JSON extension types (both Arrow and Lance) @@ -1080,11 +1080,6 @@ impl From<&Field> for ArrowField { let mut metadata = field.metadata.clone(); if field.logical_type.is_blob() { - metadata.insert( - ARROW_EXT_NAME_KEY.to_string(), - lance_arrow::BLOB_V2_EXT_NAME.to_string(), - ); - metadata.entry(ARROW_EXT_META_KEY.to_string()).or_default(); metadata .entry(BLOB_META_KEY.to_string()) .or_insert_with(|| "true".to_string()); From 0be3f759a64ac0455fdca2aaed4a59be6faa3c57 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Dec 2025 18:33:08 +0800 Subject: [PATCH 6/9] Remove incorrect tests Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 42 +------------------------- 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index f645ed04210..87170739a00 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -1103,7 +1103,7 @@ mod tests { use arrow_array::{DictionaryArray, StringArray, UInt32Array}; use arrow_schema::{Fields, TimeUnit}; - use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME}; + use lance_arrow::BLOB_META_KEY; use std::collections::HashMap; #[test] fn arrow_field_to_field() { @@ -1588,44 +1588,4 @@ mod tests { assert_eq!(unloaded.children.len(), 5); assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } - - #[test] - fn blob_v2_detection_by_extension() { - let metadata = HashMap::from([ - (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), - (BLOB_META_KEY.to_string(), "true".to_string()), - ]); - let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) - .with_metadata(metadata) - .try_into() - .unwrap(); - assert!(field.is_blob_v2()); - } - - #[test] - fn blob_extension_roundtrip() { - let metadata = HashMap::from([ - (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), - (ARROW_EXT_META_KEY.to_string(), "".to_string()), - ]); - let arrow_field = - ArrowField::new("blob", DataType::LargeBinary, true).with_metadata(metadata); - let field = Field::try_from(&arrow_field).unwrap(); - assert_eq!( - field.logical_type, - LogicalType::from(crate::datatypes::BLOB_LOGICAL_TYPE) - ); - assert!(field.is_blob()); - assert_eq!(field.data_type(), DataType::LargeBinary); - - let roundtrip: ArrowField = ArrowField::from(&field); - assert_eq!( - roundtrip.metadata().get(ARROW_EXT_NAME_KEY), - Some(&BLOB_V2_EXT_NAME.to_string()) - ); - assert_eq!( - roundtrip.metadata().get(BLOB_META_KEY), - Some(&"true".to_string()) - ); - } } From e67d08ea5843ac42dd5bd468f5c96f3b16af40dd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 12 Dec 2025 00:13:10 +0800 Subject: [PATCH 7/9] Address comments Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 7 +++---- rust/lance-file/src/writer.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 87170739a00..a3544999a5a 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -516,11 +516,11 @@ impl Field { .unwrap_or(false) } - /// If the field is a blob, return a new field with the same name and id + /// If the field is a blob, update this field with the same name and id /// but with the data type set to a struct of the blob description fields. /// /// If the field is not a blob, return the field itself. - pub fn unloaded_mut(&mut self) -> &mut Self { + pub fn unloaded_mut(&mut self) { if self.is_blob_v2() { self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); @@ -530,7 +530,6 @@ impl Field { self.children = BLOB_DESC_LANCE_FIELD.children.clone(); self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); } - self } /// If the field is a blob, return a new field with the same name and id @@ -764,7 +763,7 @@ impl Field { // Blob v2 uses a struct logical type for descriptors, which differs from the logical // input struct (data/uri). When intersecting schemas for projection we want to keep // the projected blob layout instead of intersecting by child names. - if ignore_types && self.is_blob() && other.is_blob() { + if self.is_blob() && other.is_blob() { return Ok(self.clone()); } diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 2f25f1e8752..22dfec02b5f 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -470,7 +470,7 @@ impl FileWriter { // TODO(xuanwo): this doesn't work on nested struct, need better solution like fields_per_order_mut? schema.fields.iter_mut().for_each(|f| { if f.is_blob_v2() { - let _ = f.unloaded_mut(); + f.unloaded_mut(); } }); From 90a8ba5c24dfcba607368f0d6ecdb1b61485affd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 12 Dec 2025 13:48:16 +0800 Subject: [PATCH 8/9] Remove BlobArray API Signed-off-by: Xuanwo --- rust/lance/src/blob.rs | 84 +++++++++++----------------------- rust/lance/src/dataset/blob.rs | 6 +-- rust/lance/src/lib.rs | 2 +- 3 files changed, 30 insertions(+), 62 deletions(-) diff --git a/rust/lance/src/blob.rs b/rust/lance/src/blob.rs index 32a24f1b2e9..196fe3866ca 100644 --- a/rust/lance/src/blob.rs +++ b/rust/lance/src/blob.rs @@ -9,66 +9,36 @@ use std::sync::Arc; -use arrow_array::{ - builder::LargeBinaryBuilder, builder::StringBuilder, Array, ArrayRef, StructArray, -}; +use arrow_array::{builder::LargeBinaryBuilder, builder::StringBuilder, ArrayRef, StructArray}; use arrow_buffer::NullBufferBuilder; use arrow_schema::{DataType, Field}; use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_V2_EXT_NAME}; use crate::{Error, Result}; -/// A typed wrapper around the blob v2 input struct column. -pub struct BlobArray { - inner: StructArray, -} - -impl BlobArray { - /// Construct the Arrow field for a blob v2 column. - pub fn field(name: &str, nullable: bool) -> Field { - let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())] - .into_iter() - .collect(); - Field::new( - name, - DataType::Struct( - vec![ - Field::new("data", DataType::LargeBinary, true), - Field::new("uri", DataType::Utf8, true), - ] - .into(), - ), - nullable, - ) - .with_metadata(metadata) - } - - /// Borrow the underlying struct array. - pub fn as_struct(&self) -> &StructArray { - &self.inner - } - - /// Number of rows. - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn null_count(&self) -> usize { - self.inner.null_count() - } -} - -impl From for ArrayRef { - fn from(value: BlobArray) -> Self { - Arc::new(value.inner) - } +/// Construct the Arrow field for a blob v2 column. +/// +/// Blob v2 expects a column shaped as `Struct` and +/// tagged with `ARROW:extension:name = "lance.blob.v2"`. +pub fn blob_field(name: &str, nullable: bool) -> Field { + let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())] + .into_iter() + .collect(); + Field::new( + name, + DataType::Struct( + vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ] + .into(), + ), + nullable, + ) + .with_metadata(metadata) } -/// Builder for [`BlobArray`]. +/// Builder for blob v2 input struct columns. /// /// The builder enforces that each row contains exactly one of `data` or `uri` (or is null). pub struct BlobArrayBuilder { @@ -138,8 +108,8 @@ impl BlobArrayBuilder { Ok(()) } - /// Finish building and return the [`BlobArray`]. - pub fn finish(mut self) -> Result { + /// Finish building and return an Arrow struct array. + pub fn finish(mut self) -> Result { if self.len != self.expected_len { return Err(Error::invalid_input( format!( @@ -164,9 +134,7 @@ impl BlobArrayBuilder { validity, )?; - Ok(BlobArray { - inner: struct_array, - }) + Ok(Arc::new(struct_array)) } fn ensure_capacity(&self) -> Result<()> { @@ -189,7 +157,7 @@ mod tests { #[test] fn test_field_metadata() { - let field = BlobArray::field("blob", true); + let field = blob_field("blob", true); assert!(field.metadata().get(ARROW_EXT_NAME_KEY).is_some()); assert_eq!( field.metadata().get(ARROW_EXT_NAME_KEY).unwrap(), diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index ff7024f9396..a569417d812 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -797,7 +797,7 @@ mod tests { use super::data_file_key_from_path; use crate::{ - blob::{BlobArray, BlobArrayBuilder}, + blob::{blob_field, BlobArrayBuilder}, dataset::WriteParams, utils::test::TestDatasetGenerator, Dataset, @@ -1070,12 +1070,12 @@ mod tests { let mut blob_builder = BlobArrayBuilder::new(2); blob_builder.push_bytes(b"hello").unwrap(); blob_builder.push_bytes(b"world").unwrap(); - let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap().into(); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(vec![0, 1])); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::UInt32, false), - BlobArray::field("blob", true), + blob_field("blob", true), ])); let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); diff --git a/rust/lance/src/lib.rs b/rust/lance/src/lib.rs index 1df737156e5..19ea6e8aebd 100644 --- a/rust/lance/src/lib.rs +++ b/rust/lance/src/lib.rs @@ -85,7 +85,7 @@ pub mod session; pub mod table; pub mod utils; -pub use blob::{BlobArray, BlobArrayBuilder}; +pub use blob::{blob_field, BlobArrayBuilder}; pub use dataset::Dataset; use lance_index::vector::DIST_COL; From 8c86916b20b3b6ad96d86ee4896b4ed4382a45f5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 12 Dec 2025 14:08:17 +0800 Subject: [PATCH 9/9] Add blob type check Signed-off-by: Xuanwo --- rust/lance-core/src/datatypes/field.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index a3544999a5a..4b42c3581ac 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -753,6 +753,17 @@ impl Field { location: location!(), }); } + + if self.is_blob() != other.is_blob() { + return Err(Error::Arrow { + message: format!( + "Attempt to intersect blob and non-blob field: {}", + self.name + ), + location: location!(), + }); + } + let self_type = self.data_type(); let other_type = other.data_type(); @@ -763,7 +774,7 @@ impl Field { // Blob v2 uses a struct logical type for descriptors, which differs from the logical // input struct (data/uri). When intersecting schemas for projection we want to keep // the projected blob layout instead of intersecting by child names. - if self.is_blob() && other.is_blob() { + if self.is_blob() { return Ok(self.clone()); }