diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 76e6924ff75..f6400c5a5ae 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -49,10 +49,10 @@ pub static BLOB_DESC_LANCE_FIELD: LazyLock = pub static BLOB_V2_DESC_FIELDS: LazyLock = LazyLock::new(|| { Fields::from(vec![ ArrowField::new("kind", DataType::UInt8, false), - ArrowField::new("position", DataType::UInt64, true), - ArrowField::new("size", DataType::UInt64, true), - ArrowField::new("blob_id", DataType::UInt32, true), - ArrowField::new("blob_uri", DataType::Utf8, true), + ArrowField::new("position", DataType::UInt64, false), + ArrowField::new("size", DataType::UInt64, false), + ArrowField::new("blob_id", DataType::UInt32, false), + ArrowField::new("blob_uri", DataType::Utf8, false), ]) }); @@ -60,8 +60,9 @@ pub static BLOB_V2_DESC_TYPE: LazyLock = LazyLock::new(|| DataType::Struct(BLOB_V2_DESC_FIELDS.clone())); pub static BLOB_V2_DESC_FIELD: LazyLock = LazyLock::new(|| { - ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([ + ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), false).with_metadata(HashMap::from([ (lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()), + ("lance-encoding:packed".to_string(), "true".to_string()), ])) }); @@ -415,3 +416,34 @@ pub fn lance_supports_nulls(datatype: &DataType) -> bool { | DataType::FixedSizeList(_, _) ) } + +/// Physical storage mode for blob v2 descriptors (one byte, stored in the packed struct column). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum BlobKind { + /// Stored in the main data file’s out-of-line buffer; `position`/`size` point into that file. + Inline = 0, + /// Stored in a shared packed blob file; `position`/`size` locate the slice, `blob_id` selects the file. + Packed = 1, + /// Stored in a dedicated raw blob file; `blob_id` identifies the file, `size` is the full file length. + Dedicated = 2, + /// Not stored by Lance; `blob_uri` holds an absolute external URI, offsets are zero. + External = 3, +} + +impl TryFrom for BlobKind { + type Error = Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Self::Inline), + 1 => Ok(Self::Packed), + 2 => Ok(Self::Dedicated), + 3 => Ok(Self::External), + other => Err(Error::InvalidInput { + source: format!("Unknown blob kind {other:?}").into(), + location: location!(), + }), + } + } +} diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index bc65000904e..cd6377098f0 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -513,15 +513,17 @@ impl Field { /// /// If the field is not a blob, return the field itself. pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self { - if self.data_type().is_binary_like() && self.is_blob() { + if self.is_blob() { match version { BlobVersion::V2 => { self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone(); } BlobVersion::V1 => { self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); self.children = BLOB_DESC_LANCE_FIELD.children.clone(); + self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone(); } } } diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index e2658e42827..e79e89921b1 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -26,6 +26,7 @@ use crate::{ format::ProtobufUtils21, repdef::{DefinitionInterpretation, RepDefBuilder}, }; +use lance_core::datatypes::BlobKind; /// Blob structural encoder - stores large binary data in external buffers /// @@ -267,7 +268,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { &mut self, array: ArrayRef, external_buffers: &mut OutOfLineBuffers, - _repdef: RepDefBuilder, + mut repdef: RepDefBuilder, row_number: u64, num_rows: u64, ) -> Result> { @@ -280,6 +281,11 @@ impl FieldEncoder for BlobV2StructuralEncoder { }; let struct_arr = array.as_struct(); + if let Some(validity) = struct_arr.nulls() { + repdef.add_validity_bitmap(validity.clone()); + } else { + repdef.add_no_null(struct_arr.len()); + } let mut data_idx = None; let mut uri_idx = None; for (idx, field) in fields.iter().enumerate() { @@ -310,12 +316,6 @@ impl FieldEncoder for BlobV2StructuralEncoder { location: location!(), }); } - if uri_is_set { - return Err(Error::NotSupported { - source: "External blob (uri) is not supported yet".into(), - location: location!(), - }); - } } let binary_array = data_col; @@ -327,34 +327,41 @@ impl FieldEncoder for BlobV2StructuralEncoder { let mut blob_id_builder = PrimitiveBuilder::::with_capacity(binary_array.len()); let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); - for i in 0..binary_array.len() { - let is_null_row = match array.data_type() { - DataType::Struct(_) => array.is_null(i), - _ => binary_array.is_null(i), - }; - if is_null_row { - kind_builder.append_null(); - position_builder.append_null(); - size_builder.append_null(); - blob_id_builder.append_null(); - uri_builder.append_null(); + for i in 0..struct_arr.len() { + if struct_arr.is_null(i) { + // Packed struct does not support nullable fields; use empty/default values instead. + kind_builder.append_value(BlobKind::Inline as u8); + position_builder.append_value(0); + size_builder.append_value(0); + blob_id_builder.append_value(0); + uri_builder.append_value(""); continue; } - let value = binary_array.value(i); - kind_builder.append_value(0); - - if value.is_empty() { + let data_is_set = !data_col.is_null(i); + if data_is_set { + let value = binary_array.value(i); + kind_builder.append_value(BlobKind::Inline as u8); + if value.is_empty() { + position_builder.append_value(0); + size_builder.append_value(0); + } else { + let position = + external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); + position_builder.append_value(position); + size_builder.append_value(value.len() as u64); + } + blob_id_builder.append_value(0); + uri_builder.append_value(""); + } else { + // external uri + let uri = uri_col.value(i); + kind_builder.append_value(BlobKind::External as u8); position_builder.append_value(0); size_builder.append_value(0); - } else { - let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); - position_builder.append_value(position); - size_builder.append_value(value.len() as u64); + blob_id_builder.append_value(0); + uri_builder.append_value(uri); } - - blob_id_builder.append_null(); - uri_builder.append_null(); } let children: Vec = vec![ @@ -374,7 +381,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { self.descriptor_encoder.maybe_encode( descriptor_array, external_buffers, - RepDefBuilder::default(), + repdef, row_number, num_rows, ) @@ -402,9 +409,16 @@ mod tests { use crate::{ compression::DefaultCompressionStrategy, encoder::{ColumnIndexSequence, EncodingOptions}, - testing::{check_round_trip_encoding_of_data, TestCases}, + testing::{ + check_round_trip_encoding_of_data, check_round_trip_encoding_of_data_with_expected, + TestCases, + }, + version::LanceFileVersion, + }; + use arrow_array::{ + ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array, }; - use arrow_array::LargeBinaryArray; + use arrow_schema::{DataType, Field as ArrowField}; #[test] fn test_blob_encoder_creation() { @@ -487,4 +501,64 @@ mod tests { // Use the standard test harness check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await; } + + #[tokio::test] + async fn test_blob_v2_external_round_trip() { + let blob_metadata = + HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + + let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); + let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + + let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]); + let uri_array = StringArray::from(vec![ + None, + Some("file:///tmp/external.bin"), + Some("s3://bucket/blob"), + ]); + + let struct_array = StructArray::from(vec![ + (data_field, Arc::new(data_array) as ArrayRef), + (uri_field, Arc::new(uri_array) as ArrayRef), + ]); + + let expected_descriptor = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("kind", DataType::UInt8, false)), + Arc::new(UInt8Array::from(vec![ + BlobKind::Inline as u8, + BlobKind::External as u8, + BlobKind::External as u8, + ])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("position", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("size", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)), + Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), + Arc::new(StringArray::from(vec![ + "", + "file:///tmp/external.bin", + "s3://bucket/blob", + ])) as ArrayRef, + ), + ]); + + check_round_trip_encoding_of_data_with_expected( + vec![Arc::new(struct_array)], + Some(Arc::new(expected_descriptor)), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + blob_metadata, + ) + .await; + } } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 8987f5a31b3..3b6c43a3d2a 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -14,7 +14,7 @@ use crate::{ use arrow_array::{make_array, Array, StructArray, UInt64Array}; use arrow_data::transform::{Capacities, MutableArrayData}; use arrow_ord::ord::make_comparator; -use arrow_schema::{DataType, Field, FieldRef, Schema, SortOptions}; +use arrow_schema::{DataType, Field, Field as ArrowField, FieldRef, Schema, SortOptions}; use arrow_select::concat::concat; use bytes::{Bytes, BytesMut}; use futures::{future::BoxFuture, FutureExt, StreamExt}; @@ -83,6 +83,12 @@ fn column_indices_from_schema_helper( // In the old style, every field except FSL gets its own column. In the new style only primitive // leaf fields get their own column. for field in fields { + if is_structural_encoding && field.metadata().contains_key("lance-encoding:packed") { + column_indices.push(*column_counter); + *column_counter += 1; + continue; + } + match field.data_type() { DataType::Struct(fields) => { if !is_structural_encoding { @@ -698,6 +704,15 @@ pub async fn check_round_trip_encoding_of_data( data: Vec>, test_cases: &TestCases, metadata: HashMap, +) { + check_round_trip_encoding_of_data_with_expected(data, None, test_cases, metadata).await +} + +pub async fn check_round_trip_encoding_of_data_with_expected( + data: Vec>, + expected_override: Option>, + test_cases: &TestCases, + metadata: HashMap, ) { let example_data = data.first().expect("Data must have at least one array"); let mut field = Field::new("", example_data.data_type().clone(), true); @@ -725,8 +740,15 @@ pub async fn check_round_trip_encoding_of_data( "Testing round trip encoding of data with file version {} and page size {}", file_version, page_size ); - check_round_trip_encoding_inner(encoder, &field, data.clone(), test_cases, file_version) - .await + check_round_trip_encoding_inner( + encoder, + &field, + data.clone(), + expected_override.clone(), + test_cases, + file_version, + ) + .await } } } @@ -795,6 +817,7 @@ async fn check_round_trip_encoding_inner( mut encoder: Box, field: &Field, data: Vec>, + expected_override: Option>, test_cases: &TestCases, file_version: LanceFileVersion, ) { @@ -902,8 +925,6 @@ async fn check_round_trip_encoding_inner( let scheduler = Arc::new(SimulatedScheduler::new(encoded_data)) as Arc; - let schema = Schema::new(vec![field.clone()]); - let num_rows = data.iter().map(|arr| arr.len() as u64).sum::(); let concat_data = if test_cases.skip_validation { None @@ -924,8 +945,29 @@ async fn check_round_trip_encoding_inner( Some(concat(&data.iter().map(|arr| arr.as_ref()).collect::>()).unwrap()) }; + let expected_data = expected_override.clone().or_else(|| concat_data.clone()); + let is_structural_encoding = file_version >= LanceFileVersion::V2_1; + let decode_field = if is_structural_encoding { + let mut lance_field = lance_core::datatypes::Field::try_from(field).unwrap(); + if lance_field.is_blob() && matches!(lance_field.data_type(), DataType::Struct(_)) { + lance_field = + lance_field.into_unloaded_with_version(lance_core::datatypes::BlobVersion::V2); + let mut arrow_field = ArrowField::from(&lance_field); + let mut metadata = arrow_field.metadata().clone(); + metadata.insert("lance-encoding:packed".to_string(), "true".to_string()); + arrow_field = arrow_field.with_metadata(metadata); + arrow_field + } else { + field.clone() + } + } else { + field.clone() + }; + + let schema = Schema::new(vec![decode_field]); + debug!("Testing full decode"); let scheduler_copy = scheduler.clone(); test_decode( @@ -933,7 +975,7 @@ async fn check_round_trip_encoding_inner( test_cases.batch_size, &schema, &column_infos, - concat_data.clone(), + expected_data.clone(), scheduler_copy.clone(), is_structural_encoding, |mut decode_scheduler, tx| { @@ -954,9 +996,9 @@ async fn check_round_trip_encoding_inner( for range in &test_cases.ranges { debug!("Testing decode of range {:?}", range); let num_rows = range.end - range.start; - let expected = concat_data + let expected = expected_data .as_ref() - .map(|concat_data| concat_data.slice(range.start as usize, num_rows as usize)); + .map(|arr| arr.slice(range.start as usize, num_rows as usize)); let scheduler = scheduler.clone(); let range = range.clone(); test_decode( @@ -1129,6 +1171,7 @@ async fn check_round_trip_random( encoder_factory(file_version), &field, data, + None, test_cases, file_version, ) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 34f644c5d22..effb5cfe641 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -5,13 +5,14 @@ use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc}; use arrow::array::AsArray; use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type}; +use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; use super::Dataset; -use arrow_array::{Array, StructArray}; -use lance_core::datatypes::BlobVersion; +use arrow_array::StructArray; +use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; @@ -38,18 +39,20 @@ enum ReaderState { /// A file-like object that represents a blob in a dataset #[derive(Debug)] pub struct BlobFile { - dataset: Arc, + object_store: Arc, + path: Path, reader: Arc>, - data_file: Path, position: u64, size: u64, + kind: BlobKind, + uri: Option, } impl BlobFile { /// Create a new BlobFile /// /// See [`crate::dataset::Dataset::take_blobs`] - pub fn new( + pub fn new_inline( dataset: Arc, field_id: u32, row_addr: u64, @@ -61,14 +64,40 @@ impl BlobFile { let data_file = frag.data_file_for_field(field_id).unwrap(); let data_file = dataset.data_dir().child(data_file.path.as_str()); Self { - dataset, - data_file, + object_store: dataset.object_store.clone(), + path: data_file, position, size, + kind: BlobKind::Inline, + uri: None, reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), } } + pub async fn new_external( + uri: String, + size: u64, + registry: Arc, + params: Arc, + ) -> Result { + let (object_store, path) = + ObjectStore::from_uri_and_params(registry, &uri, ¶ms).await?; + let size = if size > 0 { + size + } else { + object_store.size(&path).await? + }; + Ok(Self { + object_store, + path, + position: 0, + size, + kind: BlobKind::External, + uri: Some(uri), + reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), + }) + } + /// Close the blob file, releasing any associated resources pub async fn close(&self) -> Result<()> { let mut reader = self.reader.lock().await; @@ -91,7 +120,7 @@ impl BlobFile { ) -> Result { let mut reader = self.reader.lock().await; if let ReaderState::Uninitialized(cursor) = *reader { - let opened = self.dataset.object_store.open(&self.data_file).await?; + let opened = self.object_store.open(&self.path).await?; let opened = Arc::::from(opened); *reader = ReaderState::Open((cursor, opened.clone())); } @@ -178,6 +207,22 @@ impl BlobFile { pub fn size(&self) -> u64 { self.size } + + pub fn position(&self) -> u64 { + self.position + } + + pub fn data_path(&self) -> &Path { + &self.path + } + + pub fn kind(&self) -> BlobKind { + self.kind + } + + pub fn uri(&self) -> Option<&str> { + self.uri.as_deref() + } } pub(super) async fn take_blobs( @@ -205,12 +250,12 @@ pub(super) async fn take_blobs( match dataset.blob_version() { BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), - BlobVersion::V2 => collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs), + BlobVersion::V2 => { + collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await + } } } -const INLINE_BLOB_KIND: u8 = 0; - fn collect_blob_files_v1( dataset: &Arc, blob_field_id: u32, @@ -231,12 +276,12 @@ fn collect_blob_files_v1( Some((*row_addr, position, size)) }) .map(|(row_addr, position, size)| { - BlobFile::new(dataset.clone(), blob_field_id, row_addr, position, size) + BlobFile::new_inline(dataset.clone(), blob_field_id, row_addr, position, size) }) .collect()) } -fn collect_blob_files_v2( +async fn collect_blob_files_v2( dataset: &Arc, blob_field_id: u32, descriptions: &StructArray, @@ -246,23 +291,22 @@ fn collect_blob_files_v2( let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); let _blob_ids = descriptions.column(3).as_primitive::(); - let _uris = descriptions.column(4).as_string::(); + let blob_uris = descriptions.column(4).as_string::(); let mut files = Vec::with_capacity(row_addrs.len()); for (idx, row_addr) in row_addrs.values().iter().enumerate() { - if kinds.is_null(idx) { - // Null row + let kind = BlobKind::try_from(kinds.value(idx))?; + + // Struct is non-nullable; null rows are encoded as inline with zero position/size and empty uri + if matches!(kind, BlobKind::Inline) && positions.value(idx) == 0 && sizes.value(idx) == 0 { continue; } - let kind = kinds.value(idx); + match kind { - INLINE_BLOB_KIND => { - if positions.is_null(idx) || sizes.is_null(idx) { - continue; - } + BlobKind::Inline => { let position = positions.value(idx); let size = sizes.value(idx); - files.push(BlobFile::new( + files.push(BlobFile::new_inline( dataset.clone(), blob_field_id, *row_addr, @@ -270,9 +314,20 @@ fn collect_blob_files_v2( size, )); } + BlobKind::External => { + let uri = blob_uris.value(idx).to_string(); + let size = sizes.value(idx); + let registry = dataset.session.store_registry(); + let params = dataset + .store_params + .as_ref() + .map(|p| Arc::new((**p).clone())) + .unwrap_or_else(|| Arc::new(ObjectStoreParams::default())); + files.push(BlobFile::new_external(uri, size, registry, params).await?); + } other => { return Err(Error::NotSupported { - source: format!("Blob kind {} is not supported", other).into(), + source: format!("Blob kind {:?} is not supported", other).into(), location: location!(), }); } @@ -394,9 +449,9 @@ mod tests { let blobs2 = fixture.dataset.take_blobs(&row_ids, "blobs").await.unwrap(); for (blob1, blob2) in blobs.iter().zip(blobs2.iter()) { - assert_eq!(blob1.position, blob2.position); - assert_eq!(blob1.size, blob2.size); - assert_eq!(blob1.data_file, blob2.data_file); + assert_eq!(blob1.position(), blob2.position()); + assert_eq!(blob1.size(), blob2.size()); + assert_eq!(blob1.data_path(), blob2.data_path()); } }