diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9de986ce022..722ba7c97e1 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1492,15 +1492,29 @@ impl Dataset { blob::take_blobs(self, row_ids, column.as_ref()).await } - /// Take [BlobFile] by row indices. - /// + /// Take [BlobFile] by row addresses. + /// + /// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`. + /// Use this method when you already have row addresses, for example from + /// a scan with `with_row_address()`. For row IDs (stable identifiers), use + /// [`Self::take_blobs`]. For row indices (offsets), use + /// [`Self::take_blobs_by_indices`]. + pub async fn take_blobs_by_addresses( + self: &Arc, + row_addrs: &[u64], + column: impl AsRef, + ) -> Result> { + blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await + } + + /// Take [BlobFile] by row indices (offsets in the dataset). pub async fn take_blobs_by_indices( self: &Arc, row_indices: &[u64], column: impl AsRef, ) -> Result> { let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?; - blob::take_blobs(self, &row_addrs, column.as_ref()).await + blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await } /// Get a stream of batches based on iterator of ranges of row numbers. diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index effb5cfe641..e34b82954fc 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -10,7 +10,8 @@ use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; -use super::Dataset; +use super::take::TakeBuilder; +use super::{Dataset, ProjectionRequest}; use arrow_array::StructArray; use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::{utils::address::RowAddress, Error, Result}; @@ -256,6 +257,55 @@ pub(super) async fn take_blobs( } } +/// Take [BlobFile] by row addresses. +/// +/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`. +/// Use this method when you already have row addresses, for example from +/// a scan with `with_row_address()`. For row IDs (stable identifiers), use +/// [`Dataset::take_blobs`]. For row indices (offsets), use +/// [`Dataset::take_blobs_by_indices`]. +pub async fn take_blobs_by_addresses( + dataset: &Arc, + row_addrs: &[u64], + column: &str, +) -> Result> { + let projection = dataset.schema().project(&[column])?; + let blob_field = &projection.fields[0]; + let blob_field_id = blob_field.id; + if !projection.fields[0].is_blob() { + return Err(Error::InvalidInput { + location: location!(), + source: format!("the column '{}' is not a blob column", column).into(), + }); + } + + // Convert Schema to ProjectionPlan + let projection_request = ProjectionRequest::from(projection); + let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?); + + // Use try_new_from_addresses to bypass row ID index lookup. + // This is critical when enable_stable_row_ids=true because row addresses + // (fragment_id << 32 | row_offset) are different from row IDs (sequential integers). + let description_and_addr = + TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)? + .with_row_address(true) + .execute() + .await?; + + let descriptions = description_and_addr.column(0).as_struct(); + let row_addrs_result = description_and_addr.column(1).as_primitive::(); + let blob_field_id = blob_field_id as u32; + + match dataset.blob_version() { + BlobVersion::V1 => { + collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result) + } + BlobVersion::V2 => { + collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs_result).await + } + } +} + fn collect_blob_files_v1( dataset: &Arc, blob_field_id: u32, @@ -524,4 +574,84 @@ mod tests { assert!(batch.column(0).data_type().is_struct()); } } + + /// Test that take_blobs_by_indices works correctly with enable_stable_row_ids=true. + /// + /// This is a regression test for a bug where take_blobs_by_indices would fail + /// with "index out of bounds" for fragment 1+ when stable row IDs are enabled. + /// The bug was caused by passing row addresses (from row_offsets_to_row_addresses) + /// to blob::take_blobs which expected row IDs. When stable row IDs are enabled, + /// row addresses (fragment_id << 32 | offset) are different from row IDs + /// (sequential integers), causing the row ID index lookup to fail for fragment 1+. + #[tokio::test] + pub async fn test_take_blobs_by_indices_with_stable_row_ids() { + use crate::dataset::WriteParams; + use arrow_array::RecordBatchIterator; + + let test_dir = TempStrDir::default(); + + // Create test data with blob column + let data = lance_datagen::gen_batch() + .col("filterme", array::step::()) + .col("blobs", array::blob()) + .into_reader_rows(RowCount::from(6), BatchCount::from(1)) + .map(|batch| Ok(batch.unwrap())) + .collect::>>() + .unwrap(); + + // Write with enable_stable_row_ids=true and force multiple fragments + let write_params = WriteParams { + enable_stable_row_ids: true, + max_rows_per_file: 3, // Force 2 fragments with 3 rows each + ..Default::default() + }; + + let reader = RecordBatchIterator::new(data.clone().into_iter().map(Ok), data[0].schema()); + let dataset = Arc::new( + Dataset::write(reader, &test_dir, Some(write_params)) + .await + .unwrap(), + ); + + // Verify we have multiple fragments + let fragments = dataset.fragments(); + assert!( + fragments.len() >= 2, + "Expected at least 2 fragments, got {}", + fragments.len() + ); + + // Test first fragment (indices 0, 1, 2) - this always worked + let blobs = dataset + .take_blobs_by_indices(&[0, 1, 2], "blobs") + .await + .unwrap(); + assert_eq!(blobs.len(), 3, "First fragment blobs should have 3 items"); + + // Verify we can read the blob content + for blob in &blobs { + let content = blob.read().await.unwrap(); + assert!(!content.is_empty(), "Blob content should not be empty"); + } + + // Test second fragment (indices 3, 4, 5) - this was failing before the fix + let blobs = dataset + .take_blobs_by_indices(&[3, 4, 5], "blobs") + .await + .unwrap(); + assert_eq!(blobs.len(), 3, "Second fragment blobs should have 3 items"); + + // Verify we can read the blob content from second fragment + for blob in &blobs { + let content = blob.read().await.unwrap(); + assert!(!content.is_empty(), "Blob content should not be empty"); + } + + // Test mixed indices from both fragments + let blobs = dataset + .take_blobs_by_indices(&[1, 4], "blobs") + .await + .unwrap(); + assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items"); + } } diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 11114818e69..f39531f9853 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -136,9 +136,16 @@ async fn do_take_rows( if row_addrs.is_empty() { // It is possible that `row_id_index` returns None when a fragment has been wholly deleted - return Ok(RecordBatch::new_empty(Arc::new( - builder.projection.output_schema()?, - ))); + let empty_batch = RecordBatch::new_empty(Arc::new(builder.projection.output_schema()?)); + // If row addresses were requested, add an empty row address column. + // This ensures callers that expect the _rowaddr column don't panic. + if builder.with_row_address { + let row_addr_col = Arc::new(UInt64Array::from(Vec::::new())); + let row_addr_field = + ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); + return Ok(empty_batch.try_with_column(row_addr_field, row_addr_col)?); + } + return Ok(empty_batch); } let row_addr_stats = check_row_addrs(&row_addrs);