diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 6f4c8a5f8f1..bc65000904e 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -43,12 +43,11 @@ use crate::{ pub const LANCE_UNENFORCED_PRIMARY_KEY: &str = "lance-schema:unenforced-primary-key"; fn has_blob_v2_extension(field: &ArrowField) -> bool { - field.data_type() == &DataType::LargeBinary - && field - .metadata() - .get(ARROW_EXT_NAME_KEY) - .map(|name| name == BLOB_V2_EXT_NAME) - .unwrap_or(false) + field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|name| name == BLOB_V2_EXT_NAME) + .unwrap_or(false) } #[derive(Debug, Default)] diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index ad5b8b2235f..683664a595c 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -28,7 +28,7 @@ use crate::buffer::LanceBuffer; use crate::compression::{CompressionStrategy, DefaultCompressionStrategy}; use crate::compression_config::CompressionParams; use crate::decoder::PageEncoding; -use crate::encodings::logical::blob::BlobStructuralEncoder; +use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder}; use crate::encodings::logical::list::ListStructuralEncoder; use crate::encodings::logical::primitive::PrimitiveStructuralEncoder; use crate::encodings::logical::r#struct::StructStructuralEncoder; @@ -385,10 +385,24 @@ impl StructuralEncodingStrategy { self.compression_strategy.clone(), )?)); } + DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => { + return Ok(Box::new(BlobV2StructuralEncoder::new( + field, + column_index.next_column_index(field.id as u32), + options, + self.compression_strategy.clone(), + )?)); + } + DataType::Struct(_) => { + return Err(Error::InvalidInput { + source: "Blob v2 struct input requires file version >= 2.2".into(), + location: location!(), + }); + } _ => { return Err(Error::InvalidInput { source: format!( - "Blob encoding only supports Binary/LargeBinary, got {}", + "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}", data_type ) .into(), diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index a3379523f82..e2658e42827 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -3,11 +3,18 @@ use std::{collections::HashMap, sync::Arc}; -use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray, UInt64Array}; +use arrow_array::{ + builder::{PrimitiveBuilder, StringBuilder}, + cast::AsArray, + types::{UInt32Type, UInt64Type, UInt8Type}, + Array, ArrayRef, StructArray, UInt64Array, +}; use arrow_buffer::Buffer; use arrow_schema::{DataType, Field as ArrowField, Fields}; use futures::{future::BoxFuture, FutureExt}; -use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result}; +use lance_core::{ + datatypes::Field, datatypes::BLOB_V2_DESC_FIELDS, error::LanceOptionExt, Error, Result, +}; use snafu::location; use crate::{ @@ -221,6 +228,174 @@ impl FieldEncoder for BlobStructuralEncoder { } } +/// Blob v2 structural encoder +pub struct BlobV2StructuralEncoder { + descriptor_encoder: Box, +} + +impl BlobV2StructuralEncoder { + pub fn new( + field: &Field, + column_index: u32, + options: &crate::encoder::EncodingOptions, + compression_strategy: Arc, + ) -> Result { + let mut descriptor_metadata = HashMap::with_capacity(1); + descriptor_metadata.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string()); + + let descriptor_data_type = DataType::Struct(BLOB_V2_DESC_FIELDS.clone()); + + let descriptor_field = Field::try_from( + ArrowField::new(&field.name, descriptor_data_type, field.nullable) + .with_metadata(descriptor_metadata), + )?; + + let descriptor_encoder = Box::new(PrimitiveStructuralEncoder::try_new( + options, + compression_strategy, + column_index, + descriptor_field, + Arc::new(HashMap::new()), + )?); + + Ok(Self { descriptor_encoder }) + } +} + +impl FieldEncoder for BlobV2StructuralEncoder { + fn maybe_encode( + &mut self, + array: ArrayRef, + external_buffers: &mut OutOfLineBuffers, + _repdef: RepDefBuilder, + row_number: u64, + num_rows: u64, + ) -> Result> { + // Supported input: Struct + let DataType::Struct(fields) = array.data_type() else { + return Err(Error::InvalidInput { + source: "Blob v2 requires struct input".into(), + location: location!(), + }); + }; + + let struct_arr = array.as_struct(); + let mut data_idx = None; + let mut uri_idx = None; + for (idx, field) in fields.iter().enumerate() { + match field.name().as_str() { + "data" => data_idx = Some(idx), + "uri" => uri_idx = Some(idx), + _ => {} + } + } + let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput { + source: "Blob v2 struct must contain 'data' and 'uri' fields".into(), + location: location!(), + })?; + + let data_col = struct_arr.column(data_idx).as_binary::(); + let uri_col = struct_arr.column(uri_idx).as_string::(); + + // Validate XOR(data, uri) + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { + continue; + } + let data_is_set = !data_col.is_null(i); + let uri_is_set = !uri_col.is_null(i); + if data_is_set == uri_is_set { + return Err(Error::InvalidInput { + source: "Each blob row must set exactly one of data or uri".into(), + location: location!(), + }); + } + if uri_is_set { + return Err(Error::NotSupported { + source: "External blob (uri) is not supported yet".into(), + location: location!(), + }); + } + } + + let binary_array = data_col; + + let mut kind_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut position_builder = + PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut size_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut blob_id_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); + let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); + + for i in 0..binary_array.len() { + let is_null_row = match array.data_type() { + DataType::Struct(_) => array.is_null(i), + _ => binary_array.is_null(i), + }; + if is_null_row { + kind_builder.append_null(); + position_builder.append_null(); + size_builder.append_null(); + blob_id_builder.append_null(); + uri_builder.append_null(); + continue; + } + + let value = binary_array.value(i); + kind_builder.append_value(0); + + if value.is_empty() { + position_builder.append_value(0); + size_builder.append_value(0); + } else { + let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); + position_builder.append_value(position); + size_builder.append_value(value.len() as u64); + } + + blob_id_builder.append_null(); + uri_builder.append_null(); + } + + let children: Vec = vec![ + Arc::new(kind_builder.finish()), + Arc::new(position_builder.finish()), + Arc::new(size_builder.finish()), + Arc::new(blob_id_builder.finish()), + Arc::new(uri_builder.finish()), + ]; + + let descriptor_array = Arc::new(StructArray::try_new( + BLOB_V2_DESC_FIELDS.clone(), + children, + None, + )?) as ArrayRef; + + self.descriptor_encoder.maybe_encode( + descriptor_array, + external_buffers, + RepDefBuilder::default(), + row_number, + num_rows, + ) + } + + fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result> { + self.descriptor_encoder.flush(external_buffers) + } + + fn finish( + &mut self, + external_buffers: &mut OutOfLineBuffers, + ) -> BoxFuture<'_, Result>> { + self.descriptor_encoder.finish(external_buffers) + } + + fn num_columns(&self) -> u32 { + self.descriptor_encoder.num_columns() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 2d0cfc2ec5e..34f644c5d22 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -4,8 +4,7 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; -use arrow::datatypes::{UInt64Type, UInt8Type}; -use arrow_schema::DataType; +use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; @@ -189,7 +188,7 @@ pub(super) async fn take_blobs( let projection = dataset.schema().project(&[column])?; let blob_field = &projection.fields[0]; let blob_field_id = blob_field.id; - if blob_field.data_type() != DataType::LargeBinary || !projection.fields[0].is_blob() { + if !projection.fields[0].is_blob() { return Err(Error::InvalidInput { location: location!(), source: format!("the column '{}' is not a blob column", column).into(), @@ -246,32 +245,38 @@ fn collect_blob_files_v2( let kinds = descriptions.column(0).as_primitive::(); let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); + let _blob_ids = descriptions.column(3).as_primitive::(); + let _uris = descriptions.column(4).as_string::(); let mut files = Vec::with_capacity(row_addrs.len()); for (idx, row_addr) in row_addrs.values().iter().enumerate() { - if positions.is_null(idx) || sizes.is_null(idx) { + if kinds.is_null(idx) { + // Null row continue; } - - if !kinds.is_null(idx) { - let kind = kinds.value(idx); - if kind != INLINE_BLOB_KIND { + let kind = kinds.value(idx); + match kind { + INLINE_BLOB_KIND => { + if positions.is_null(idx) || sizes.is_null(idx) { + continue; + } + let position = positions.value(idx); + let size = sizes.value(idx); + files.push(BlobFile::new( + dataset.clone(), + blob_field_id, + *row_addr, + position, + size, + )); + } + other => { return Err(Error::NotSupported { - source: format!("Blob kind {} is not supported", kind).into(), + source: format!("Blob kind {} is not supported", other).into(), location: location!(), }); } } - - let position = positions.value(idx); - let size = sizes.value(idx); - files.push(BlobFile::new( - dataset.clone(), - blob_field_id, - *row_addr, - position, - size, - )); } Ok(files)