-
Notifications
You must be signed in to change notification settings - Fork 638
feat(blob_v2): add external blob support #5385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
482cd94
e8f7744
b9934b9
94110c5
fcab037
22d4df2
6d38da9
c72e94e
68ffcb1
34af2ed
77398ea
fbcdb50
70bab99
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ use crate::{ | |
| format::ProtobufUtils21, | ||
| repdef::{DefinitionInterpretation, RepDefBuilder}, | ||
| }; | ||
| use lance_core::datatypes::BlobKind; | ||
|
|
||
| /// Blob structural encoder - stores large binary data in external buffers | ||
| /// | ||
|
|
@@ -267,7 +268,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { | |
| &mut self, | ||
| array: ArrayRef, | ||
| external_buffers: &mut OutOfLineBuffers, | ||
| _repdef: RepDefBuilder, | ||
| mut repdef: RepDefBuilder, | ||
| row_number: u64, | ||
| num_rows: u64, | ||
| ) -> Result<Vec<EncodeTask>> { | ||
|
|
@@ -280,6 +281,11 @@ impl FieldEncoder for BlobV2StructuralEncoder { | |
| }; | ||
|
|
||
| let struct_arr = array.as_struct(); | ||
| if let Some(validity) = struct_arr.nulls() { | ||
| repdef.add_validity_bitmap(validity.clone()); | ||
| } else { | ||
| repdef.add_no_null(struct_arr.len()); | ||
| } | ||
| let mut data_idx = None; | ||
| let mut uri_idx = None; | ||
| for (idx, field) in fields.iter().enumerate() { | ||
|
|
@@ -310,12 +316,6 @@ impl FieldEncoder for BlobV2StructuralEncoder { | |
| location: location!(), | ||
| }); | ||
| } | ||
| if uri_is_set { | ||
| return Err(Error::NotSupported { | ||
| source: "External blob (uri) is not supported yet".into(), | ||
| location: location!(), | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| let binary_array = data_col; | ||
|
|
@@ -327,34 +327,41 @@ impl FieldEncoder for BlobV2StructuralEncoder { | |
| let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(binary_array.len()); | ||
| let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0); | ||
|
|
||
| for i in 0..binary_array.len() { | ||
| let is_null_row = match array.data_type() { | ||
| DataType::Struct(_) => array.is_null(i), | ||
| _ => binary_array.is_null(i), | ||
| }; | ||
| if is_null_row { | ||
| kind_builder.append_null(); | ||
| position_builder.append_null(); | ||
| size_builder.append_null(); | ||
| blob_id_builder.append_null(); | ||
| uri_builder.append_null(); | ||
| for i in 0..struct_arr.len() { | ||
| if struct_arr.is_null(i) { | ||
| // Packed struct does not support nullable fields; use empty/default values instead. | ||
| kind_builder.append_value(BlobKind::Inline as u8); | ||
| position_builder.append_value(0); | ||
| size_builder.append_value(0); | ||
| blob_id_builder.append_value(0); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like blob_id is always set to zero. Can you remind me what this is for again?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
And blob_id reprents the id used by packed & dedicated blobs which are both not implemented yet.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see the enum now. So the |
||
| uri_builder.append_value(""); | ||
| continue; | ||
| } | ||
|
|
||
| let value = binary_array.value(i); | ||
| kind_builder.append_value(0); | ||
|
|
||
| if value.is_empty() { | ||
| let data_is_set = !data_col.is_null(i); | ||
| if data_is_set { | ||
| let value = binary_array.value(i); | ||
| kind_builder.append_value(BlobKind::Inline as u8); | ||
| if value.is_empty() { | ||
| position_builder.append_value(0); | ||
| size_builder.append_value(0); | ||
| } else { | ||
| let position = | ||
| external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); | ||
| position_builder.append_value(position); | ||
| size_builder.append_value(value.len() as u64); | ||
| } | ||
| blob_id_builder.append_value(0); | ||
| uri_builder.append_value(""); | ||
| } else { | ||
| // external uri | ||
| let uri = uri_col.value(i); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to check if the URI is empty?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea, we can check before writing. |
||
| kind_builder.append_value(BlobKind::External as u8); | ||
| position_builder.append_value(0); | ||
| size_builder.append_value(0); | ||
| } else { | ||
| let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value))); | ||
| position_builder.append_value(position); | ||
| size_builder.append_value(value.len() as u64); | ||
| blob_id_builder.append_value(0); | ||
| uri_builder.append_value(uri); | ||
| } | ||
|
|
||
| blob_id_builder.append_null(); | ||
| uri_builder.append_null(); | ||
| } | ||
|
|
||
| let children: Vec<ArrayRef> = vec![ | ||
|
|
@@ -374,7 +381,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { | |
| self.descriptor_encoder.maybe_encode( | ||
| descriptor_array, | ||
| external_buffers, | ||
| RepDefBuilder::default(), | ||
| repdef, | ||
| row_number, | ||
| num_rows, | ||
| ) | ||
|
|
@@ -402,9 +409,16 @@ mod tests { | |
| use crate::{ | ||
| compression::DefaultCompressionStrategy, | ||
| encoder::{ColumnIndexSequence, EncodingOptions}, | ||
| testing::{check_round_trip_encoding_of_data, TestCases}, | ||
| testing::{ | ||
| check_round_trip_encoding_of_data, check_round_trip_encoding_of_data_with_expected, | ||
| TestCases, | ||
| }, | ||
| version::LanceFileVersion, | ||
| }; | ||
| use arrow_array::{ | ||
| ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array, | ||
| }; | ||
| use arrow_array::LargeBinaryArray; | ||
| use arrow_schema::{DataType, Field as ArrowField}; | ||
|
|
||
| #[test] | ||
| fn test_blob_encoder_creation() { | ||
|
|
@@ -487,4 +501,64 @@ mod tests { | |
| // Use the standard test harness | ||
| check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await; | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_blob_v2_external_round_trip() { | ||
| let blob_metadata = | ||
| HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); | ||
|
|
||
| let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); | ||
| let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); | ||
|
|
||
| let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]); | ||
| let uri_array = StringArray::from(vec![ | ||
| None, | ||
| Some("file:///tmp/external.bin"), | ||
| Some("s3://bucket/blob"), | ||
| ]); | ||
|
|
||
| let struct_array = StructArray::from(vec![ | ||
| (data_field, Arc::new(data_array) as ArrayRef), | ||
| (uri_field, Arc::new(uri_array) as ArrayRef), | ||
| ]); | ||
|
|
||
| let expected_descriptor = StructArray::from(vec![ | ||
| ( | ||
| Arc::new(ArrowField::new("kind", DataType::UInt8, false)), | ||
| Arc::new(UInt8Array::from(vec![ | ||
| BlobKind::Inline as u8, | ||
| BlobKind::External as u8, | ||
| BlobKind::External as u8, | ||
| ])) as ArrayRef, | ||
| ), | ||
| ( | ||
| Arc::new(ArrowField::new("position", DataType::UInt64, false)), | ||
| Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef, | ||
| ), | ||
| ( | ||
| Arc::new(ArrowField::new("size", DataType::UInt64, false)), | ||
| Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef, | ||
| ), | ||
| ( | ||
| Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)), | ||
| Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef, | ||
| ), | ||
| ( | ||
| Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), | ||
| Arc::new(StringArray::from(vec![ | ||
| "", | ||
| "file:///tmp/external.bin", | ||
| "s3://bucket/blob", | ||
| ])) as ArrayRef, | ||
| ), | ||
| ]); | ||
|
|
||
| check_round_trip_encoding_of_data_with_expected( | ||
| vec![Arc::new(struct_array)], | ||
| Some(Arc::new(expected_descriptor)), | ||
| &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), | ||
| blob_metadata, | ||
| ) | ||
| .await; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes forwards / backwards compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe so. No blob v2 files have been written so far. It's part of file format 2.2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I had forgotten we added a dedicated encoded / decoder for v2.