Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions rust/lance-core/src/utils/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,33 @@ use object_store::path::Path;
/// Layout: `<base>/<data_file_key>/<blob_id>.raw`
/// - `base` is typically the dataset's data directory.
/// - `data_file_key` is the stem of the data file (without extension).
pub fn blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
pub fn dedicated_blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
let file_name = format!("{:08x}.raw", blob_id);
base.child(data_file_key).child(file_name.as_str())
}

/// Format a packed blob sidecar path for a data file.
///
/// Layout: `<base>/<data_file_key>/<blob_id>.pack`
pub fn pack_blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
let file_name = format!("{:08x}.pack", blob_id);
base.child(data_file_key).child(file_name.as_str())
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_blob_path_formatting() {
fn test_dedicated_blob_path_formatting() {
let base = Path::from("base");
let path = blob_path(&base, "deadbeef", 2);
let path = dedicated_blob_path(&base, "deadbeef", 2);
assert_eq!(path.to_string(), "base/deadbeef/00000002.raw");
}

#[test]
fn test_pack_blob_path_formatting() {
let base = Path::from("base");
let path = pack_blob_path(&base, "cafebabe", 3);
assert_eq!(path.to_string(), "base/cafebabe/00000003.pack");
}
}
83 changes: 77 additions & 6 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ impl FieldEncoder for BlobV2StructuralEncoder {
.column_by_name("blob_size")
.expect("blob_size column must exist")
.as_primitive::<UInt64Type>();
let packed_position_col = struct_arr
.column_by_name("position")
.expect("position column must exist")
.as_primitive::<UInt64Type>();

let row_count = struct_arr.len();

Expand Down Expand Up @@ -329,6 +333,13 @@ impl FieldEncoder for BlobV2StructuralEncoder {
0,
uri_col.value(i).to_string(),
),
BlobKind::Packed => (
BlobKind::Packed as u8,
packed_position_col.value(i),
blob_size_col.value(i),
blob_id_col.value(i),
"".to_string(),
),
BlobKind::Inline => {
let data_val = data_col.value(i);
let blob_len = data_val.len() as u64;
Expand All @@ -343,12 +354,6 @@ impl FieldEncoder for BlobV2StructuralEncoder {
"".to_string(),
)
}
BlobKind::Packed => {
return Err(Error::InvalidInput {
source: "Packed blob kind is not supported for v2 encoder".into(),
location: location!(),
});
}
}
};

Expand Down Expand Up @@ -506,6 +511,7 @@ mod tests {
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));

let kind_array = UInt8Array::from(vec![
BlobKind::Inline as u8,
Expand All @@ -520,13 +526,15 @@ mod tests {
]);
let blob_id_array = UInt32Array::from(vec![0, 0, 0]);
let blob_size_array = UInt64Array::from(vec![0, 0, 0]);
let position_array = UInt64Array::from(vec![0, 0, 0]);

let struct_array = StructArray::from(vec![
(kind_field, Arc::new(kind_array) as ArrayRef),
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
(blob_id_field, Arc::new(blob_id_array) as ArrayRef),
(blob_size_field, Arc::new(blob_size_array) as ArrayRef),
(position_field, Arc::new(position_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
Expand Down Expand Up @@ -579,19 +587,22 @@ mod tests {
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));

let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]);
let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]);
let uri_array = StringArray::from(vec![Option::<&str>::None, None]);
let blob_id_array = UInt32Array::from(vec![42, 0]);
let blob_size_array = UInt64Array::from(vec![12, 0]);
let position_array = UInt64Array::from(vec![0, 0]);

let struct_array = StructArray::from(vec![
(kind_field, Arc::new(kind_array) as ArrayRef),
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
(blob_id_field, Arc::new(blob_id_array) as ArrayRef),
(blob_size_field, Arc::new(blob_size_array) as ArrayRef),
(position_field, Arc::new(position_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
Expand Down Expand Up @@ -628,4 +639,64 @@ mod tests {
)
.await;
}

#[tokio::test]
async fn test_blob_v2_packed_round_trip() {
let blob_metadata =
HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);

let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));

let kind_array = UInt8Array::from(vec![BlobKind::Packed as u8]);
let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
let uri_array = StringArray::from(vec![None::<&str>]);
let blob_id_array = UInt32Array::from(vec![7]);
let blob_size_array = UInt64Array::from(vec![5]);
let position_array = UInt64Array::from(vec![10]);

let struct_array = StructArray::from(vec![
(kind_field, Arc::new(kind_array) as ArrayRef),
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
(blob_id_field, Arc::new(blob_id_array) as ArrayRef),
(blob_size_field, Arc::new(blob_size_array) as ArrayRef),
(position_field, Arc::new(position_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
(
Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
Arc::new(UInt8Array::from(vec![BlobKind::Packed as u8])) as ArrayRef,
),
(
Arc::new(ArrowField::new("position", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![10])) as ArrayRef,
),
(
Arc::new(ArrowField::new("size", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![5])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
Arc::new(UInt32Array::from(vec![7])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![""])) as ArrayRef,
),
]);

check_round_trip_encoding_of_data_with_expected(
vec![Arc::new(struct_array)],
Some(Arc::new(expected_descriptor)),
&TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
blob_metadata,
)
.await;
}
}
Loading