diff --git a/rust/lance-core/src/utils/blob.rs b/rust/lance-core/src/utils/blob.rs index 06cbeb43960..2e38c95ee93 100644 --- a/rust/lance-core/src/utils/blob.rs +++ b/rust/lance-core/src/utils/blob.rs @@ -8,19 +8,33 @@ use object_store::path::Path; /// Layout: `//.raw` /// - `base` is typically the dataset's data directory. /// - `data_file_key` is the stem of the data file (without extension). -pub fn blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path { +pub fn dedicated_blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path { let file_name = format!("{:08x}.raw", blob_id); base.child(data_file_key).child(file_name.as_str()) } +/// Format a packed blob sidecar path for a data file. +/// +/// Layout: `//.pack` +pub fn pack_blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path { + let file_name = format!("{:08x}.pack", blob_id); + base.child(data_file_key).child(file_name.as_str()) +} #[cfg(test)] mod tests { use super::*; #[test] - fn test_blob_path_formatting() { + fn test_dedicated_blob_path_formatting() { let base = Path::from("base"); - let path = blob_path(&base, "deadbeef", 2); + let path = dedicated_blob_path(&base, "deadbeef", 2); assert_eq!(path.to_string(), "base/deadbeef/00000002.raw"); } + + #[test] + fn test_pack_blob_path_formatting() { + let base = Path::from("base"); + let path = pack_blob_path(&base, "cafebabe", 3); + assert_eq!(path.to_string(), "base/cafebabe/00000003.pack"); + } } diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index dded1044267..349d3d4b700 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -299,6 +299,10 @@ impl FieldEncoder for BlobV2StructuralEncoder { .column_by_name("blob_size") .expect("blob_size column must exist") .as_primitive::(); + let packed_position_col = struct_arr + .column_by_name("position") + .expect("position column must exist") + .as_primitive::(); let row_count = struct_arr.len(); @@ -329,6 +333,13 @@ impl FieldEncoder for BlobV2StructuralEncoder { 0, uri_col.value(i).to_string(), ), + BlobKind::Packed => ( + BlobKind::Packed as u8, + packed_position_col.value(i), + blob_size_col.value(i), + blob_id_col.value(i), + "".to_string(), + ), BlobKind::Inline => { let data_val = data_col.value(i); let blob_len = data_val.len() as u64; @@ -343,12 +354,6 @@ impl FieldEncoder for BlobV2StructuralEncoder { "".to_string(), ) } - BlobKind::Packed => { - return Err(Error::InvalidInput { - source: "Packed blob kind is not supported for v2 encoder".into(), - location: location!(), - }); - } } }; @@ -506,6 +511,7 @@ mod tests { let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true)); let kind_array = UInt8Array::from(vec![ BlobKind::Inline as u8, @@ -520,6 +526,7 @@ mod tests { ]); let blob_id_array = UInt32Array::from(vec![0, 0, 0]); let blob_size_array = UInt64Array::from(vec![0, 0, 0]); + let position_array = UInt64Array::from(vec![0, 0, 0]); let struct_array = StructArray::from(vec![ (kind_field, Arc::new(kind_array) as ArrayRef), @@ -527,6 +534,7 @@ mod tests { (uri_field, Arc::new(uri_array) as ArrayRef), (blob_id_field, Arc::new(blob_id_array) as ArrayRef), (blob_size_field, Arc::new(blob_size_array) as ArrayRef), + (position_field, Arc::new(position_array) as ArrayRef), ]); let expected_descriptor = StructArray::from(vec![ @@ -579,12 +587,14 @@ mod tests { let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true)); let kind_array = UInt8Array::from(vec![BlobKind::Dedicated as u8, BlobKind::Inline as u8]); let data_array = LargeBinaryArray::from(vec![None, Some(b"abc".as_ref())]); let uri_array = StringArray::from(vec![Option::<&str>::None, None]); let blob_id_array = UInt32Array::from(vec![42, 0]); let blob_size_array = UInt64Array::from(vec![12, 0]); + let position_array = UInt64Array::from(vec![0, 0]); let struct_array = StructArray::from(vec![ (kind_field, Arc::new(kind_array) as ArrayRef), @@ -592,6 +602,7 @@ mod tests { (uri_field, Arc::new(uri_array) as ArrayRef), (blob_id_field, Arc::new(blob_id_array) as ArrayRef), (blob_size_field, Arc::new(blob_size_array) as ArrayRef), + (position_field, Arc::new(position_array) as ArrayRef), ]); let expected_descriptor = StructArray::from(vec![ @@ -628,4 +639,64 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_blob_v2_packed_round_trip() { + let blob_metadata = + HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]); + + let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true)); + let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true)); + let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true)); + let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true)); + let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true)); + let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true)); + + let kind_array = UInt8Array::from(vec![BlobKind::Packed as u8]); + let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]); + let uri_array = StringArray::from(vec![None::<&str>]); + let blob_id_array = UInt32Array::from(vec![7]); + let blob_size_array = UInt64Array::from(vec![5]); + let position_array = UInt64Array::from(vec![10]); + + let struct_array = StructArray::from(vec![ + (kind_field, Arc::new(kind_array) as ArrayRef), + (data_field, Arc::new(data_array) as ArrayRef), + (uri_field, Arc::new(uri_array) as ArrayRef), + (blob_id_field, Arc::new(blob_id_array) as ArrayRef), + (blob_size_field, Arc::new(blob_size_array) as ArrayRef), + (position_field, Arc::new(position_array) as ArrayRef), + ]); + + let expected_descriptor = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("kind", DataType::UInt8, false)), + Arc::new(UInt8Array::from(vec![BlobKind::Packed as u8])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("position", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![10])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("size", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![5])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)), + Arc::new(UInt32Array::from(vec![7])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)), + Arc::new(StringArray::from(vec![""])) as ArrayRef, + ), + ]); + + check_round_trip_encoding_of_data_with_expected( + vec![Arc::new(struct_array)], + Some(Arc::new(expected_descriptor)), + &TestCases::default().with_min_file_version(LanceFileVersion::V2_2), + blob_metadata, + ) + .await; + } } diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 55f3dad8a04..d56b9e2fb8a 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -20,7 +20,7 @@ use super::take::TakeBuilder; use super::{Dataset, ProjectionRequest}; use arrow_array::StructArray; use lance_core::datatypes::{BlobKind, BlobVersion}; -use lance_core::utils::blob::blob_path; +use lance_core::utils::blob::{dedicated_blob_path, pack_blob_path}; use lance_core::{utils::address::RowAddress, Error, Result}; use lance_io::traits::Reader; @@ -33,7 +33,91 @@ pub fn blob_version_from_config(config: &HashMap) -> BlobVersion .unwrap_or(BlobVersion::V1) } -const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; +const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff +const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff +const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GiB per .pack sidecar + +// Maintains rolling `.pack` sidecar files for packed blobs. +// Layout: data/{data_file_key}/{blob_id:08x}.pack where each file is an +// unframed concatenation of blob payloads; descriptors store (blob_id, +// position, size) to locate each slice. A dedicated struct keeps path state +// and rolling size separate from the per-batch preprocessor logic, so we can +// reuse the same writer across rows and close/roll files cleanly on finish. +struct PackWriter { + object_store: ObjectStore, + data_dir: Path, + data_file_key: String, + max_pack_size: usize, + current_blob_id: Option, + writer: Option, + current_size: usize, +} + +impl PackWriter { + fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self { + Self { + object_store, + data_dir, + data_file_key, + max_pack_size: PACK_FILE_MAX_SIZE, + current_blob_id: None, + writer: None, + current_size: 0, + } + } + + async fn start_new_pack(&mut self, blob_id: u32) -> Result<()> { + let path = pack_blob_path(&self.data_dir, &self.data_file_key, blob_id); + let writer = self.object_store.create(&path).await?; + self.writer = Some(writer); + self.current_blob_id = Some(blob_id); + self.current_size = 0; + Ok(()) + } + + /// Append `data` to the current `.pack` file, rolling to a new file when + /// `max_pack_size` would be exceeded. + /// + /// alloc_blob_id: called only when a new pack file is opened; returns the + /// blob_id used as the file name. + /// + /// Returns `(blob_id, position)` where + /// position is the start offset of this payload in that pack file. + async fn write_with_allocator( + &mut self, + alloc_blob_id: &mut F, + data: &[u8], + ) -> Result<(u32, u64)> + where + F: FnMut() -> u32, + { + let len = data.len(); + if self + .current_blob_id + .map(|_| self.current_size + len > self.max_pack_size) + .unwrap_or(true) + { + let blob_id = alloc_blob_id(); + self.finish().await?; + self.start_new_pack(blob_id).await?; + } + + let writer = self.writer.as_mut().expect("pack writer is initialized"); + let position = self.current_size as u64; + writer.write_all(data).await?; + self.current_size += len; + Ok((self.current_blob_id.expect("pack blob id"), position)) + } + + async fn finish(&mut self) -> Result<()> { + if let Some(mut writer) = self.writer.take() { + writer.shutdown().await?; + } + self.current_blob_id = None; + self.current_size = 0; + Ok(()) + } +} /// Preprocesses blob v2 columns on the write path so the encoder only sees lightweight descriptors: /// @@ -45,16 +129,23 @@ pub struct BlobPreprocessor { data_dir: Path, data_file_key: String, local_counter: u32, + pack_writer: PackWriter, } impl BlobPreprocessor { pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self { + let pack_writer = PackWriter::new( + object_store.clone(), + data_dir.clone(), + data_file_key.clone(), + ); Self { object_store, data_dir, data_file_key, // Start at 1 to avoid a potential all-zero blob_id value. local_counter: 1, + pack_writer, } } @@ -64,14 +155,27 @@ impl BlobPreprocessor { id } - async fn write_blob(&self, blob_id: u32, data: &[u8]) -> Result { - let path = blob_path(&self.data_dir, &self.data_file_key, blob_id); + async fn write_dedicated(&mut self, blob_id: u32, data: &[u8]) -> Result { + let path = dedicated_blob_path(&self.data_dir, &self.data_file_key, blob_id); let mut writer = self.object_store.create(&path).await?; writer.write_all(data).await?; writer.shutdown().await?; Ok(path) } + async fn write_packed(&mut self, data: &[u8]) -> Result<(u32, u64)> { + let (counter, pack_writer) = (&mut self.local_counter, &mut self.pack_writer); + pack_writer + .write_with_allocator( + &mut || { + let id = *counter; + *counter += 1; + id + }, + data, + ) + .await + } pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result { let mut new_columns = Vec::with_capacity(batch.num_columns()); let mut new_fields = Vec::with_capacity(batch.num_columns()); @@ -110,6 +214,8 @@ impl BlobPreprocessor { let mut blob_size_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); let mut kind_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); + let mut position_builder = + PrimitiveBuilder::::with_capacity(struct_arr.len()); let struct_nulls = struct_arr.nulls(); @@ -120,21 +226,36 @@ impl BlobPreprocessor { blob_id_builder.append_null(); blob_size_builder.append_null(); kind_builder.append_null(); + position_builder.append_null(); continue; } let has_data = !data_col.is_null(i); let has_uri = !uri_col.is_null(i); + let data_len = if has_data { data_col.value(i).len() } else { 0 }; - if has_data && data_col.value(i).len() > DEDICATED_THRESHOLD { + if has_data && data_len > DEDICATED_THRESHOLD { let blob_id = self.next_blob_id(); - self.write_blob(blob_id, data_col.value(i)).await?; + self.write_dedicated(blob_id, data_col.value(i)).await?; kind_builder.append_value(BlobKind::Dedicated as u8); data_builder.append_null(); uri_builder.append_null(); blob_id_builder.append_value(blob_id); - blob_size_builder.append_value(data_col.value(i).len() as u64); + blob_size_builder.append_value(data_len as u64); + position_builder.append_null(); + continue; + } + + if has_data && data_len > INLINE_MAX { + let (pack_blob_id, position) = self.write_packed(data_col.value(i)).await?; + + kind_builder.append_value(BlobKind::Packed as u8); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(pack_blob_id); + blob_size_builder.append_value(data_len as u64); + position_builder.append_value(position); continue; } @@ -145,6 +266,7 @@ impl BlobPreprocessor { uri_builder.append_value(uri_val); blob_id_builder.append_null(); blob_size_builder.append_null(); + position_builder.append_null(); continue; } @@ -155,12 +277,14 @@ impl BlobPreprocessor { uri_builder.append_null(); blob_id_builder.append_null(); blob_size_builder.append_null(); + position_builder.append_null(); } else { data_builder.append_null(); uri_builder.append_null(); blob_id_builder.append_null(); blob_size_builder.append_null(); kind_builder.append_null(); + position_builder.append_null(); } } @@ -170,6 +294,7 @@ impl BlobPreprocessor { arrow_schema::Field::new("uri", ArrowDataType::Utf8, true), arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true), + arrow_schema::Field::new("position", ArrowDataType::UInt64, true), ]; let struct_array = arrow_array::StructArray::try_new( @@ -180,6 +305,7 @@ impl BlobPreprocessor { Arc::new(uri_builder.finish()), Arc::new(blob_id_builder.finish()), Arc::new(blob_size_builder.finish()), + Arc::new(position_builder.finish()), ], struct_nulls.cloned(), )?; @@ -206,6 +332,10 @@ impl BlobPreprocessor { RecordBatch::try_new(new_schema, new_columns) .map_err(|e| Error::invalid_input(e.to_string(), location!())) } + + pub(crate) async fn finish(&mut self) -> Result<()> { + self.pack_writer.finish().await + } } pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool { @@ -283,7 +413,17 @@ impl BlobFile { reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), } } - + pub fn new_packed(dataset: Arc, path: Path, position: u64, size: u64) -> Self { + Self { + object_store: dataset.object_store.clone(), + path, + position, + size, + kind: BlobKind::Packed, + uri: None, + reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), + } + } pub async fn new_external( uri: String, size: u64, @@ -592,9 +732,31 @@ async fn collect_blob_files_v2( })?; let data_file_key = data_file_key_from_path(data_file.path.as_str()); - let path = blob_path(&dataset.data_dir(), data_file_key, blob_id); + let path = dedicated_blob_path(&dataset.data_dir(), data_file_key, blob_id); files.push(BlobFile::new_dedicated(dataset.clone(), path, size)); } + BlobKind::Packed => { + let blob_id = blob_ids.value(idx); + let size = sizes.value(idx); + let position = positions.value(idx); + let frag_id = RowAddress::from(*row_addr).fragment_id(); + let frag = + dataset + .get_fragment(frag_id as usize) + .ok_or_else(|| Error::Internal { + message: "Fragment not found".to_string(), + location: location!(), + })?; + let data_file = + frag.data_file_for_field(blob_field_id) + .ok_or_else(|| Error::Internal { + message: "Data file not found for blob field".to_string(), + location: location!(), + })?; + let data_file_key = data_file_key_from_path(data_file.path.as_str()); + let path = pack_blob_path(&dataset.data_dir(), data_file_key, blob_id); + files.push(BlobFile::new_packed(dataset.clone(), path, position, size)); + } BlobKind::External => { let uri = blob_uris.value(idx).to_string(); let size = sizes.value(idx); @@ -606,12 +768,6 @@ async fn collect_blob_files_v2( .unwrap_or_else(|| Arc::new(ObjectStoreParams::default())); files.push(BlobFile::new_external(uri, size, registry, params).await?); } - other => { - return Err(Error::NotSupported { - source: format!("Blob kind {:?} is not supported", other).into(), - location: location!(), - }); - } } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 6f675dbcb6e..585596e513e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -739,6 +739,9 @@ impl GenericWriter for V2WriterAdapter { Ok(self.writer.tell().await?) } async fn finish(&mut self) -> Result<(u32, DataFile)> { + if let Some(pre) = self.preprocessor.as_mut() { + pre.finish().await?; + } let field_ids = self .writer .field_id_to_column_indices()