Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rust/lance-arrow/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub trait FieldExt {

/// Check if the field is marked as a blob
fn is_blob(&self) -> bool;

/// Check if the field is marked as a blob
fn is_blob_v2(&self) -> bool;
}

impl FieldExt for Field {
Expand Down Expand Up @@ -108,6 +111,14 @@ impl FieldExt for Field {
.map(|value| value == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}

fn is_blob_v2(&self) -> bool {
let field_metadata = self.metadata();
field_metadata
.get(ARROW_EXT_NAME_KEY)
.map(|value| value == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}
}

/// Extends the functionality of [arrow_schema::Schema].
Expand Down
21 changes: 21 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,14 @@ impl Field {
.unwrap_or(false)
}

/// Returns true if the field is explicitly marked as blob v2 extension.
pub fn is_blob_v2(&self) -> bool {
self.metadata
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
Expand Down Expand Up @@ -1562,6 +1570,19 @@ mod tests {
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}

#[test]
fn blob_v2_detection_by_extension() {
let metadata = HashMap::from([
(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()),
(BLOB_META_KEY.to_string(), "true".to_string()),
]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
assert!(field.is_blob_v2());
}

#[test]
fn blob_extension_roundtrip() {
let metadata = HashMap::from([
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod address;
pub mod assume;
pub mod backoff;
pub mod bit;
pub mod blob;
pub mod cpu;
pub mod deletion;
pub mod futures;
Expand Down
26 changes: 26 additions & 0 deletions rust/lance-core/src/utils/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use object_store::path::Path;

/// Format a dedicated blob sidecar path for a data file.
///
/// Layout: `<base>/<data_file_key>/<blob_id>.raw`
/// - `base` is typically the dataset's data directory.
/// - `data_file_key` is the stem of the data file (without extension).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I hadn't expected path to include data_file_key but I don't see any harm in it and it could maybe be useful for things like cleanup?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I picked up this design to make the GC much easier: we can just remove all blob files under the same data_file_key once we decide to remove that data file.

pub fn blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
let file_name = format!("{:08x}.raw", blob_id);
base.child(data_file_key).child(file_name.as_str())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_blob_path_formatting() {
let base = Path::from("base");
let path = blob_path(&base, "deadbeef", 2);
assert_eq!(path.to_string(), "base/deadbeef/00000002.raw");
}
}
233 changes: 150 additions & 83 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,98 +272,92 @@ impl FieldEncoder for BlobV2StructuralEncoder {
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
// Supported input: Struct<data:LargeBinary?, uri:Utf8?>
let DataType::Struct(fields) = array.data_type() else {
return Err(Error::InvalidInput {
source: "Blob v2 requires struct<data, uri> input".into(),
location: location!(),
});
};

let struct_arr = array.as_struct();
if let Some(validity) = struct_arr.nulls() {
repdef.add_validity_bitmap(validity.clone());
} else {
repdef.add_no_null(struct_arr.len());
}
let mut data_idx = None;
let mut uri_idx = None;
for (idx, field) in fields.iter().enumerate() {
match field.name().as_str() {
"data" => data_idx = Some(idx),
"uri" => uri_idx = Some(idx),
_ => {}
}
}
let (data_idx, uri_idx) = data_idx.zip(uri_idx).ok_or_else(|| Error::InvalidInput {
source: "Blob v2 struct must contain 'data' and 'uri' fields".into(),
location: location!(),
})?;

let data_col = struct_arr.column(data_idx).as_binary::<i64>();
let uri_col = struct_arr.column(uri_idx).as_string::<i32>();

// Validate XOR(data, uri)
for i in 0..struct_arr.len() {
if struct_arr.is_null(i) {
continue;
}
let data_is_set = !data_col.is_null(i);
let uri_is_set = !uri_col.is_null(i);
if data_is_set == uri_is_set {
return Err(Error::InvalidInput {
source: "Each blob row must set exactly one of data or uri".into(),
location: location!(),
});
}
}

let binary_array = data_col;

let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(binary_array.len());
let mut position_builder =
PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(binary_array.len());
let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(binary_array.len());
let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0);

for i in 0..struct_arr.len() {
if struct_arr.is_null(i) {
// Packed struct does not support nullable fields; use empty/default values instead.
kind_builder.append_value(BlobKind::Inline as u8);
position_builder.append_value(0);
size_builder.append_value(0);
blob_id_builder.append_value(0);
uri_builder.append_value("");
continue;
}

let data_is_set = !data_col.is_null(i);
if data_is_set {
let value = binary_array.value(i);
kind_builder.append_value(BlobKind::Inline as u8);
if value.is_empty() {
position_builder.append_value(0);
size_builder.append_value(0);
Comment thread
Xuanwo marked this conversation as resolved.
let kind_col = struct_arr
.column_by_name("kind")
.expect("kind column must exist")
.as_primitive::<UInt8Type>();
let data_col = struct_arr
.column_by_name("data")
.expect("data column must exist")
.as_binary::<i64>();
let uri_col = struct_arr
.column_by_name("uri")
.expect("uri column must exist")
.as_string::<i32>();
let blob_id_col = struct_arr
.column_by_name("blob_id")
.expect("blob_id column must exist")
.as_primitive::<UInt32Type>();
let blob_size_col = struct_arr
.column_by_name("blob_size")
.expect("blob_size column must exist")
.as_primitive::<UInt64Type>();

let row_count = struct_arr.len();

let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(row_count);
let mut position_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(row_count);
let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16);

for i in 0..row_count {
let (kind_value, position_value, size_value, blob_id_value, uri_value) =
if struct_arr.is_null(i) || kind_col.is_null(i) {
(BlobKind::Inline as u8, 0, 0, 0, "".to_string())
} else {
let position =
external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
position_builder.append_value(position);
size_builder.append_value(value.len() as u64);
}
blob_id_builder.append_value(0);
uri_builder.append_value("");
} else {
// external uri
let uri = uri_col.value(i);
kind_builder.append_value(BlobKind::External as u8);
position_builder.append_value(0);
size_builder.append_value(0);
blob_id_builder.append_value(0);
uri_builder.append_value(uri);
}
let kind_val = BlobKind::try_from(kind_col.value(i))?;
match kind_val {
BlobKind::Dedicated => (
BlobKind::Dedicated as u8,
0,
blob_size_col.value(i),
blob_id_col.value(i),
"".to_string(),
),
BlobKind::External => (
BlobKind::External as u8,
0,
0,
0,
uri_col.value(i).to_string(),
),
BlobKind::Inline => {
let data_val = data_col.value(i);
let blob_len = data_val.len() as u64;
let position = external_buffers
.add_buffer(LanceBuffer::from(Buffer::from(data_val)));

(
BlobKind::Inline as u8,
position,
blob_len,
0,
"".to_string(),
)
}
Comment thread
Xuanwo marked this conversation as resolved.
BlobKind::Packed => {
return Err(Error::InvalidInput {
source: "Packed blob kind is not supported for v2 encoder".into(),
location: location!(),
});
}
}
};

kind_builder.append_value(kind_value);
position_builder.append_value(position_value);
size_builder.append_value(size_value);
blob_id_builder.append_value(blob_id_value);
uri_builder.append_value(uri_value);
}

let children: Vec<ArrayRef> = vec![
Arc::new(kind_builder.finish()),
Arc::new(position_builder.finish()),
Expand Down Expand Up @@ -507,19 +501,32 @@ mod tests {
let blob_metadata =
HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);

let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));

let kind_array = UInt8Array::from(vec![
BlobKind::Inline as u8,
BlobKind::External as u8,
BlobKind::External as u8,
]);
let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]);
let uri_array = StringArray::from(vec![
None,
Some("file:///tmp/external.bin"),
Some("s3://bucket/blob"),
]);
let blob_id_array = UInt32Array::from(vec![0, 0, 0]);
let blob_size_array = UInt64Array::from(vec![0, 0, 0]);

let struct_array = StructArray::from(vec![
(kind_field, Arc::new(kind_array) as ArrayRef),
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
(blob_id_field, Arc::new(blob_id_array) as ArrayRef),
(blob_size_field, Arc::new(blob_size_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
Expand Down Expand Up @@ -561,4 +568,64 @@ mod tests {
)
.await;
}

#[tokio::test]
async fn test_blob_v2_dedicated_round_trip() {
let blob_metadata =
HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);

let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));

let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]);
let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]);
let uri_array = StringArray::from(vec![Option::<&str>::None, None]);
let blob_id_array = UInt32Array::from(vec![42, 0]);
let blob_size_array = UInt64Array::from(vec![12, 0]);

let struct_array = StructArray::from(vec![
(kind_field, Arc::new(kind_array) as ArrayRef),
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
(blob_id_field, Arc::new(blob_id_array) as ArrayRef),
(blob_size_field, Arc::new(blob_size_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
(
Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
Arc::new(UInt8Array::from(vec![
BlobKind::Dedicated as u8,
BlobKind::Inline as u8,
])) as ArrayRef,
),
(
Arc::new(ArrowField::new("position", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![0, 0])) as ArrayRef,
),
(
Arc::new(ArrowField::new("size", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![12, 3])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
Arc::new(UInt32Array::from(vec![42, 0])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["", ""])) as ArrayRef,
),
]);

check_round_trip_encoding_of_data_with_expected(
vec![Arc::new(struct_array)],
Some(Arc::new(expected_descriptor)),
&TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
blob_metadata,
)
.await;
}
}
Loading