diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index e96f29efa6a..4b42c3581ac 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -21,7 +21,7 @@ use arrow_schema::{DataType, Field as ArrowField}; use deepsize::DeepSizeOf; use lance_arrow::{ json::{is_arrow_json_field, is_json_field}, - DataTypeExt, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME, + DataTypeExt, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME, }; use snafu::location; @@ -516,6 +516,22 @@ impl Field { .unwrap_or(false) } + /// If the field is a blob, update this field with the same name and id + /// but with the data type set to a struct of the blob description fields. + /// + /// If the field is not a blob, return the field itself. + pub fn unloaded_mut(&mut self) { + if self.is_blob_v2() { + self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); + self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone(); + } else if self.is_blob() { + self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); + self.children = BLOB_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); + } + } + /// If the field is a blob, return a new field with the same name and id /// but with the data type set to a struct of the blob description fields. /// @@ -737,6 +753,17 @@ impl Field { location: location!(), }); } + + if self.is_blob() != other.is_blob() { + return Err(Error::Arrow { + message: format!( + "Attempt to intersect blob and non-blob field: {}", + self.name + ), + location: location!(), + }); + } + let self_type = self.data_type(); let other_type = other.data_type(); @@ -744,6 +771,13 @@ impl Field { (&self_type, &other_type), (DataType::Struct(_), DataType::Struct(_)) | (DataType::List(_), DataType::List(_)) ) { + // Blob v2 uses a struct logical type for descriptors, which differs from the logical + // input struct (data/uri). When intersecting schemas for projection we want to keep + // the projected blob layout instead of intersecting by child names. + if self.is_blob() { + return Ok(self.clone()); + } + let children = self .children .iter() @@ -1007,15 +1041,15 @@ impl TryFrom<&ArrowField> for Field { if is_blob_v2 { metadata - .entry(BLOB_META_KEY.to_string()) - .or_insert_with(|| "true".to_string()); + .entry(ARROW_EXT_NAME_KEY.to_string()) + .or_insert_with(|| BLOB_V2_EXT_NAME.to_string()); } // Check for JSON extension types (both Arrow and Lance) let logical_type = if is_arrow_json_field(field) || is_json_field(field) { LogicalType::from("json") } else if is_blob_v2 { - LogicalType::from(super::BLOB_LOGICAL_TYPE) + LogicalType::from("struct") } else { LogicalType::try_from(field.data_type())? }; @@ -1056,11 +1090,6 @@ impl From<&Field> for ArrowField { let mut metadata = field.metadata.clone(); if field.logical_type.is_blob() { - metadata.insert( - ARROW_EXT_NAME_KEY.to_string(), - lance_arrow::BLOB_V2_EXT_NAME.to_string(), - ); - metadata.entry(ARROW_EXT_META_KEY.to_string()).or_default(); metadata .entry(BLOB_META_KEY.to_string()) .or_insert_with(|| "true".to_string()); @@ -1084,7 +1113,7 @@ mod tests { use arrow_array::{DictionaryArray, StringArray, UInt32Array}; use arrow_schema::{Fields, TimeUnit}; - use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME}; + use lance_arrow::BLOB_META_KEY; use std::collections::HashMap; #[test] fn arrow_field_to_field() { @@ -1569,44 +1598,4 @@ mod tests { assert_eq!(unloaded.children.len(), 5); assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); } - - #[test] - fn blob_v2_detection_by_extension() { - let metadata = HashMap::from([ - (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), - (BLOB_META_KEY.to_string(), "true".to_string()), - ]); - let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) - .with_metadata(metadata) - .try_into() - .unwrap(); - assert!(field.is_blob_v2()); - } - - #[test] - fn blob_extension_roundtrip() { - let metadata = HashMap::from([ - (ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()), - (ARROW_EXT_META_KEY.to_string(), "".to_string()), - ]); - let arrow_field = - ArrowField::new("blob", DataType::LargeBinary, true).with_metadata(metadata); - let field = Field::try_from(&arrow_field).unwrap(); - assert_eq!( - field.logical_type, - LogicalType::from(crate::datatypes::BLOB_LOGICAL_TYPE) - ); - assert!(field.is_blob()); - assert_eq!(field.data_type(), DataType::LargeBinary); - - let roundtrip: ArrowField = ArrowField::from(&field); - assert_eq!( - roundtrip.metadata().get(ARROW_EXT_NAME_KEY), - Some(&BLOB_V2_EXT_NAME.to_string()) - ); - assert_eq!( - roundtrip.metadata().get(BLOB_META_KEY), - Some(&"true".to_string()) - ); - } } diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 5a3fb60dfb0..cdcc3cef1e6 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -928,7 +928,9 @@ pub enum BlobHandling { impl BlobHandling { fn should_unload(&self, field: &Field) -> bool { - if !field.data_type().is_binary_like() { + // Blob v2 columns are Structs, so we need to treat any blob-marked field as unloadable + // even if the physical data type is not binary-like. + if !(field.data_type().is_binary_like() || field.is_blob()) { return false; } match self { diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 01f0194d91f..62ae0bfb06c 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -251,10 +251,11 @@ impl ReaderProjection { field_id_to_column_index, &mut column_indices, )?; - Ok(Self { + let projection = Self { schema: Arc::new(schema.clone()), column_indices, - }) + }; + Ok(projection) } /// Creates a projection that reads the entire file diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index d32cd6712e8..22dfec02b5f 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -465,6 +465,15 @@ impl FileWriter { async fn write_global_buffers(&mut self) -> Result> { let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?; schema.metadata = std::mem::take(&mut self.schema_metadata); + // Use descriptor layout for blob v2 in the footer to avoid exposing logical child fields. + // + // TODO(xuanwo): this doesn't work on nested struct, need better solution like fields_per_order_mut? + schema.fields.iter_mut().for_each(|f| { + if f.is_blob_v2() { + f.unloaded_mut(); + } + }); + let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?; let file_descriptor_bytes = file_descriptor.encode_to_vec(); let file_descriptor_len = file_descriptor_bytes.len() as u64; diff --git a/rust/lance/src/blob.rs b/rust/lance/src/blob.rs new file mode 100644 index 00000000000..196fe3866ca --- /dev/null +++ b/rust/lance/src/blob.rs @@ -0,0 +1,206 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Convenience builders for Lance blob v2 input columns. +//! +//! Blob v2 expects a column shaped as `Struct` and +//! tagged with `ARROW:extension:name = "lance.blob.v2"`. This module offers a +//! type-safe builder to construct that struct without manually wiring metadata + +use std::sync::Arc; + +use arrow_array::{builder::LargeBinaryBuilder, builder::StringBuilder, ArrayRef, StructArray}; +use arrow_buffer::NullBufferBuilder; +use arrow_schema::{DataType, Field}; +use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_V2_EXT_NAME}; + +use crate::{Error, Result}; + +/// Construct the Arrow field for a blob v2 column. +/// +/// Blob v2 expects a column shaped as `Struct` and +/// tagged with `ARROW:extension:name = "lance.blob.v2"`. +pub fn blob_field(name: &str, nullable: bool) -> Field { + let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())] + .into_iter() + .collect(); + Field::new( + name, + DataType::Struct( + vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ] + .into(), + ), + nullable, + ) + .with_metadata(metadata) +} + +/// Builder for blob v2 input struct columns. +/// +/// The builder enforces that each row contains exactly one of `data` or `uri` (or is null). +pub struct BlobArrayBuilder { + data_builder: LargeBinaryBuilder, + uri_builder: StringBuilder, + validity: NullBufferBuilder, + expected_len: usize, + len: usize, +} + +impl BlobArrayBuilder { + /// Create a new builder with the given row capacity. + pub fn new(capacity: usize) -> Self { + Self { + data_builder: LargeBinaryBuilder::with_capacity(capacity, 0), + uri_builder: StringBuilder::with_capacity(capacity, 0), + validity: NullBufferBuilder::new(capacity), + expected_len: capacity, + len: 0, + } + } + + /// Append a blob backed by raw bytes. + pub fn push_bytes(&mut self, bytes: impl AsRef<[u8]>) -> Result<()> { + self.ensure_capacity()?; + self.validity.append_non_null(); + self.data_builder.append_value(bytes); + self.uri_builder.append_null(); + self.len += 1; + Ok(()) + } + + /// Append a blob referenced by URI. + pub fn push_uri(&mut self, uri: impl Into) -> Result<()> { + self.ensure_capacity()?; + let uri = uri.into(); + if uri.is_empty() { + return Err(Error::invalid_input( + "URI cannot be empty", + snafu::location!(), + )); + } + self.validity.append_non_null(); + self.data_builder.append_null(); + self.uri_builder.append_value(uri); + self.len += 1; + Ok(()) + } + + /// Append an empty blob (inline, zero-length payload). + pub fn push_empty(&mut self) -> Result<()> { + self.ensure_capacity()?; + self.validity.append_non_null(); + self.data_builder.append_value([]); + self.uri_builder.append_null(); + self.len += 1; + Ok(()) + } + + /// Append a null row. + pub fn push_null(&mut self) -> Result<()> { + self.ensure_capacity()?; + self.validity.append_null(); + self.data_builder.append_null(); + self.uri_builder.append_null(); + self.len += 1; + Ok(()) + } + + /// Finish building and return an Arrow struct array. + pub fn finish(mut self) -> Result { + if self.len != self.expected_len { + return Err(Error::invalid_input( + format!( + "Expected {} rows but received {}", + self.expected_len, self.len + ), + snafu::location!(), + )); + } + + let data = Arc::new(self.data_builder.finish()); + let uri = Arc::new(self.uri_builder.finish()); + let validity = self.validity.finish(); + + let struct_array = StructArray::try_new( + vec![ + Field::new("data", DataType::LargeBinary, true), + Field::new("uri", DataType::Utf8, true), + ] + .into(), + vec![data as ArrayRef, uri as ArrayRef], + validity, + )?; + + Ok(Arc::new(struct_array)) + } + + fn ensure_capacity(&self) -> Result<()> { + if self.len >= self.expected_len { + Err(Error::invalid_input( + "BlobArrayBuilder capacity exceeded", + snafu::location!(), + )) + } else { + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::cast::AsArray; + use arrow_array::Array; + + #[test] + fn test_field_metadata() { + let field = blob_field("blob", true); + assert!(field.metadata().get(ARROW_EXT_NAME_KEY).is_some()); + assert_eq!( + field.metadata().get(ARROW_EXT_NAME_KEY).unwrap(), + BLOB_V2_EXT_NAME + ); + } + + #[test] + fn test_builder_basic() { + let mut b = BlobArrayBuilder::new(4); + b.push_bytes(b"hi").unwrap(); + b.push_uri("s3://bucket/key").unwrap(); + b.push_empty().unwrap(); + b.push_null().unwrap(); + + let arr = b.finish().unwrap(); + assert_eq!(arr.len(), 4); + assert_eq!(arr.null_count(), 1); + + let struct_arr = arr.as_struct(); + let data = struct_arr.column(0).as_binary::(); + let uri = struct_arr.column(1).as_string::(); + + assert_eq!(data.value(0), b"hi"); + assert!(uri.is_null(0)); + assert!(data.is_null(1)); + assert_eq!(uri.value(1), "s3://bucket/key"); + assert_eq!(data.value(2).len(), 0); + assert!(uri.is_null(2)); + } + + #[test] + fn test_capacity_error() { + let mut b = BlobArrayBuilder::new(1); + b.push_bytes(b"a").unwrap(); + let err = b.push_bytes(b"b").unwrap_err(); + assert!(err.to_string().contains("capacity exceeded")); + } + + #[test] + fn test_empty_uri_rejected() { + let mut b = BlobArrayBuilder::new(1); + let err = b.push_uri("").unwrap_err(); + assert!(err.to_string().contains("URI cannot be empty")); + } +} diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 7abf9c7d5f1..7238cad866f 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -785,6 +785,8 @@ mod tests { use arrow::{array::AsArray, datatypes::UInt64Type}; use arrow_array::RecordBatch; + use arrow_array::{RecordBatchIterator, UInt32Array}; + use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; use lance_arrow::DataTypeExt; use lance_io::stream::RecordBatchStream; @@ -794,7 +796,12 @@ mod tests { use lance_file::version::LanceFileVersion; use super::data_file_key_from_path; - use crate::{utils::test::TestDatasetGenerator, Dataset}; + use crate::{ + blob::{blob_field, BlobArrayBuilder}, + dataset::WriteParams, + utils::test::TestDatasetGenerator, + Dataset, + }; struct BlobTestFixture { _test_dir: TempStrDir, @@ -1054,4 +1061,42 @@ mod tests { assert_eq!(data_file_key_from_path("abc.lance"), "abc"); assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz"); } + + #[tokio::test] + async fn test_write_and_take_blobs_with_blob_array_builder() { + let test_dir = TempStrDir::default(); + + // Build a blob column with the new BlobArrayBuilder + let mut blob_builder = BlobArrayBuilder::new(2); + blob_builder.push_bytes(b"hello").unwrap(); + blob_builder.push_bytes(b"world").unwrap(); + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); + + let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(vec![0, 1])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + blob_field("blob", true), + ])); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); + let dataset = Arc::new( + Dataset::write(reader, &test_dir, Some(params)) + .await + .unwrap(), + ); + + let blobs = dataset + .take_blobs_by_indices(&[0, 1], "blob") + .await + .unwrap(); + + assert_eq!(blobs.len(), 2); + let first = blobs[0].read().await.unwrap(); + let second = blobs[1].read().await.unwrap(); + assert_eq!(first.as_ref(), b"hello"); + assert_eq!(second.as_ref(), b"world"); + } } diff --git a/rust/lance/src/lib.rs b/rust/lance/src/lib.rs index 3f579994957..19ea6e8aebd 100644 --- a/rust/lance/src/lib.rs +++ b/rust/lance/src/lib.rs @@ -76,6 +76,7 @@ pub use lance_core::{Error, Result}; use std::sync::LazyLock; pub mod arrow; +pub mod blob; pub mod datafusion; pub mod dataset; pub mod index; @@ -84,6 +85,7 @@ pub mod session; pub mod table; pub mod utils; +pub use blob::{blob_field, BlobArrayBuilder}; pub use dataset::Dataset; use lance_index::vector::DIST_COL;