Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions rust/lance-core/src/utils/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,25 @@

use object_store::path::Path;

/// Format a dedicated blob sidecar path for a data file.
/// Format a blob sidecar path for a data file.
///
/// Layout: `<base>/<data_file_key>/<blob_id>.raw`
/// Layout: `<base>/<data_file_key>/<blob_id>.blob`
/// - `base` is typically the dataset's data directory.
/// - `data_file_key` is the stem of the data file (without extension).
pub fn dedicated_blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
let file_name = format!("{:08x}.raw", blob_id);
/// - `blob_id` is the hex-encoded identifier assigned during write.
pub fn blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
let file_name = format!("{:08x}.blob", blob_id);
base.child(data_file_key).child(file_name.as_str())
Comment thread
Xuanwo marked this conversation as resolved.
}

/// Format a packed blob sidecar path for a data file.
///
/// Layout: `<base>/<data_file_key>/<blob_id>.pack`
pub fn pack_blob_path(base: &Path, data_file_key: &str, blob_id: u32) -> Path {
let file_name = format!("{:08x}.pack", blob_id);
base.child(data_file_key).child(file_name.as_str())
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_dedicated_blob_path_formatting() {
let base = Path::from("base");
let path = dedicated_blob_path(&base, "deadbeef", 2);
assert_eq!(path.to_string(), "base/deadbeef/00000002.raw");
}

#[test]
fn test_pack_blob_path_formatting() {
fn test_blob_path_formatting() {
let base = Path::from("base");
let path = pack_blob_path(&base, "cafebabe", 3);
assert_eq!(path.to_string(), "base/cafebabe/00000003.pack");
let path = blob_path(&base, "deadbeef", 2);
assert_eq!(path.to_string(), "base/deadbeef/00000002.blob");
}
}
16 changes: 8 additions & 8 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::take::TakeBuilder;
use super::{Dataset, ProjectionRequest};
use arrow_array::StructArray;
use lance_core::datatypes::{BlobKind, BlobVersion};
use lance_core::utils::blob::{dedicated_blob_path, pack_blob_path};
use lance_core::utils::blob::blob_path;
use lance_core::{utils::address::RowAddress, Error, Result};
use lance_io::traits::Reader;

Expand All @@ -37,8 +37,8 @@ const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff
const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff
const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GiB per .pack sidecar

// Maintains rolling `.pack` sidecar files for packed blobs.
// Layout: data/{data_file_key}/{blob_id:08x}.pack where each file is an
// Maintains rolling `.blob` sidecar files for packed blobs.
// Layout: data/{data_file_key}/{blob_id:08x}.blob where each file is an
// unframed concatenation of blob payloads; descriptors store (blob_id,
// position, size) to locate each slice. A dedicated struct keeps path state
// and rolling size separate from the per-batch preprocessor logic, so we can
Expand Down Expand Up @@ -67,15 +67,15 @@ impl PackWriter {
}

async fn start_new_pack(&mut self, blob_id: u32) -> Result<()> {
let path = pack_blob_path(&self.data_dir, &self.data_file_key, blob_id);
let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
let writer = self.object_store.create(&path).await?;
self.writer = Some(writer);
self.current_blob_id = Some(blob_id);
self.current_size = 0;
Ok(())
}

/// Append `data` to the current `.pack` file, rolling to a new file when
/// Append `data` to the current `.blob` file, rolling to a new file when
/// `max_pack_size` would be exceeded.
///
/// alloc_blob_id: called only when a new pack file is opened; returns the
Expand Down Expand Up @@ -156,7 +156,7 @@ impl BlobPreprocessor {
}

async fn write_dedicated(&mut self, blob_id: u32, data: &[u8]) -> Result<Path> {
let path = dedicated_blob_path(&self.data_dir, &self.data_file_key, blob_id);
let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
let mut writer = self.object_store.create(&path).await?;
writer.write_all(data).await?;
writer.shutdown().await?;
Expand Down Expand Up @@ -732,7 +732,7 @@ async fn collect_blob_files_v2(
})?;

let data_file_key = data_file_key_from_path(data_file.path.as_str());
let path = dedicated_blob_path(&dataset.data_dir(), data_file_key, blob_id);
let path = blob_path(&dataset.data_dir(), data_file_key, blob_id);
files.push(BlobFile::new_dedicated(dataset.clone(), path, size));
}
BlobKind::Packed => {
Expand All @@ -754,7 +754,7 @@ async fn collect_blob_files_v2(
location: location!(),
})?;
let data_file_key = data_file_key_from_path(data_file.path.as_str());
let path = pack_blob_path(&dataset.data_dir(), data_file_key, blob_id);
let path = blob_path(&dataset.data_dir(), data_file_key, blob_id);
files.push(BlobFile::new_packed(dataset.clone(), path, position, size));
}
BlobKind::External => {
Expand Down