Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1492,15 +1492,29 @@ impl Dataset {
blob::take_blobs(self, row_ids, column.as_ref()).await
}

/// Take [BlobFile] by row indices.
///
/// Take [BlobFile] by row addresses.
///
/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`.
/// Use this method when you already have row addresses, for example from
/// a scan with `with_row_address()`. For row IDs (stable identifiers), use
/// [`Self::take_blobs`]. For row indices (offsets), use
/// [`Self::take_blobs_by_indices`].
pub async fn take_blobs_by_addresses(
self: &Arc<Self>,
row_addrs: &[u64],
column: impl AsRef<str>,
) -> Result<Vec<BlobFile>> {
blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await
}

/// Take [BlobFile] by row indices (offsets in the dataset).
pub async fn take_blobs_by_indices(
self: &Arc<Self>,
row_indices: &[u64],
column: impl AsRef<str>,
) -> Result<Vec<BlobFile>> {
let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?;
blob::take_blobs(self, &row_addrs, column.as_ref()).await
blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await
}

/// Get a stream of batches based on iterator of ranges of row numbers.
Expand Down
132 changes: 131 additions & 1 deletion rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use object_store::path::Path;
use snafu::location;
use tokio::sync::Mutex;

use super::Dataset;
use super::take::TakeBuilder;
use super::{Dataset, ProjectionRequest};
use arrow_array::StructArray;
use lance_core::datatypes::{BlobKind, BlobVersion};
use lance_core::{utils::address::RowAddress, Error, Result};
Expand Down Expand Up @@ -256,6 +257,55 @@ pub(super) async fn take_blobs(
}
}

/// Take [BlobFile] by row addresses.
///
/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`.
/// Use this method when you already have row addresses, for example from
/// a scan with `with_row_address()`. For row IDs (stable identifiers), use
/// [`Dataset::take_blobs`]. For row indices (offsets), use
/// [`Dataset::take_blobs_by_indices`].
pub async fn take_blobs_by_addresses(
dataset: &Arc<Dataset>,
row_addrs: &[u64],
column: &str,
) -> Result<Vec<BlobFile>> {
let projection = dataset.schema().project(&[column])?;
let blob_field = &projection.fields[0];
let blob_field_id = blob_field.id;
if !projection.fields[0].is_blob() {
return Err(Error::InvalidInput {
location: location!(),
source: format!("the column '{}' is not a blob column", column).into(),
});
}

// Convert Schema to ProjectionPlan
let projection_request = ProjectionRequest::from(projection);
let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?);

// Use try_new_from_addresses to bypass row ID index lookup.
// This is critical when enable_stable_row_ids=true because row addresses
// (fragment_id << 32 | row_offset) are different from row IDs (sequential integers).
let description_and_addr =
TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)?
.with_row_address(true)
.execute()
.await?;

let descriptions = description_and_addr.column(0).as_struct();
let row_addrs_result = description_and_addr.column(1).as_primitive::<UInt64Type>();
let blob_field_id = blob_field_id as u32;

match dataset.blob_version() {
BlobVersion::V1 => {
collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result)
}
BlobVersion::V2 => {
collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs_result).await
}
}
}

fn collect_blob_files_v1(
dataset: &Arc<Dataset>,
blob_field_id: u32,
Expand Down Expand Up @@ -524,4 +574,84 @@ mod tests {
assert!(batch.column(0).data_type().is_struct());
}
}

/// Test that take_blobs_by_indices works correctly with enable_stable_row_ids=true.
///
/// This is a regression test for a bug where take_blobs_by_indices would fail
/// with "index out of bounds" for fragment 1+ when stable row IDs are enabled.
/// The bug was caused by passing row addresses (from row_offsets_to_row_addresses)
/// to blob::take_blobs which expected row IDs. When stable row IDs are enabled,
/// row addresses (fragment_id << 32 | offset) are different from row IDs
/// (sequential integers), causing the row ID index lookup to fail for fragment 1+.
#[tokio::test]
pub async fn test_take_blobs_by_indices_with_stable_row_ids() {
use crate::dataset::WriteParams;
use arrow_array::RecordBatchIterator;

let test_dir = TempStrDir::default();

// Create test data with blob column
let data = lance_datagen::gen_batch()
.col("filterme", array::step::<UInt64Type>())
.col("blobs", array::blob())
.into_reader_rows(RowCount::from(6), BatchCount::from(1))
.map(|batch| Ok(batch.unwrap()))
.collect::<Result<Vec<_>>>()
.unwrap();

// Write with enable_stable_row_ids=true and force multiple fragments
let write_params = WriteParams {
enable_stable_row_ids: true,
max_rows_per_file: 3, // Force 2 fragments with 3 rows each
..Default::default()
};

let reader = RecordBatchIterator::new(data.clone().into_iter().map(Ok), data[0].schema());
let dataset = Arc::new(
Dataset::write(reader, &test_dir, Some(write_params))
.await
.unwrap(),
);

// Verify we have multiple fragments
let fragments = dataset.fragments();
assert!(
fragments.len() >= 2,
"Expected at least 2 fragments, got {}",
fragments.len()
);

// Test first fragment (indices 0, 1, 2) - this always worked
let blobs = dataset
.take_blobs_by_indices(&[0, 1, 2], "blobs")
.await
.unwrap();
assert_eq!(blobs.len(), 3, "First fragment blobs should have 3 items");

// Verify we can read the blob content
for blob in &blobs {
let content = blob.read().await.unwrap();
assert!(!content.is_empty(), "Blob content should not be empty");
}

// Test second fragment (indices 3, 4, 5) - this was failing before the fix
let blobs = dataset
.take_blobs_by_indices(&[3, 4, 5], "blobs")
.await
.unwrap();
assert_eq!(blobs.len(), 3, "Second fragment blobs should have 3 items");

// Verify we can read the blob content from second fragment
for blob in &blobs {
let content = blob.read().await.unwrap();
assert!(!content.is_empty(), "Blob content should not be empty");
}

// Test mixed indices from both fragments
let blobs = dataset
.take_blobs_by_indices(&[1, 4], "blobs")
.await
.unwrap();
assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items");
}
}
13 changes: 10 additions & 3 deletions rust/lance/src/dataset/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,16 @@ async fn do_take_rows(

if row_addrs.is_empty() {
// It is possible that `row_id_index` returns None when a fragment has been wholly deleted
return Ok(RecordBatch::new_empty(Arc::new(
builder.projection.output_schema()?,
)));
let empty_batch = RecordBatch::new_empty(Arc::new(builder.projection.output_schema()?));
// If row addresses were requested, add an empty row address column.
// This ensures callers that expect the _rowaddr column don't panic.
if builder.with_row_address {
let row_addr_col = Arc::new(UInt64Array::from(Vec::<u64>::new()));
let row_addr_field =
ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false);
return Ok(empty_batch.try_with_column(row_addr_field, row_addr_col)?);
}
return Ok(empty_batch);
}

let row_addr_stats = check_row_addrs(&row_addrs);
Expand Down