From 98f0c6a380077233b4e55f8395fc9d35a0dc5f28 Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Tue, 2 Dec 2025 13:06:36 -0800 Subject: [PATCH 1/4] fix: take_blobs_by_indices fails with stable row IDs on fragment 1+ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When `enable_stable_row_ids=true`, `take_blobs_by_indices` was failing with "index out of bounds" for rows in fragment 1+. The bug was caused by passing row addresses to `blob::take_blobs` which expected row IDs. Root cause: - `take_blobs_by_indices` converts indices to row addresses - It passed these addresses to `take_blobs` which calls `take_builder` - `TakeBuilder.get_row_addrs()` looked up the values in the row ID index - For fragment 0: addresses (0,1,2) matched row IDs (0,1,2) by accident - For fragment 1+: addresses (4294967296+) didn't match any row IDs - This caused empty results and missing `_rowaddr` column → panic Fix: - Add `take_blobs_by_addresses()` that uses `TakeBuilder::try_new_from_addresses` to bypass the row ID index lookup - Update `take_blobs_by_indices` to call the new function - Add defensive fix in `do_take_rows` to include `_rowaddr` column in empty batches 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance/src/dataset.rs | 6 +- rust/lance/src/dataset/blob.rs | 137 ++++++++++++++++++++++++++++++++- rust/lance/src/dataset/take.rs | 13 +++- 3 files changed, 151 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 4fecf55b25e..63be5de8955 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1494,13 +1494,17 @@ impl Dataset { /// Take [BlobFile] by row indices. /// + /// This method converts row indices (offsets in the dataset) to row addresses, + /// then retrieves the blob data. Unlike `take_blobs` which expects row IDs, + /// this method works correctly with `enable_stable_row_ids=true` because it + /// uses row addresses directly instead of looking them up via the row ID index. pub async fn take_blobs_by_indices( self: &Arc, row_indices: &[u64], column: impl AsRef, ) -> Result> { let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?; - blob::take_blobs(self, &row_addrs, column.as_ref()).await + blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await } /// Get a stream of batches based on iterator of ranges of row numbers. diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 34f644c5d22..b745e54325e 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -9,7 +9,8 @@ use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; -use super::Dataset; +use super::take::TakeBuilder; +use super::{Dataset, ProjectionRequest}; use arrow_array::{Array, StructArray}; use lance_core::datatypes::BlobVersion; use lance_core::{utils::address::RowAddress, Error, Result}; @@ -209,6 +210,57 @@ pub(super) async fn take_blobs( } } +/// Internal function for taking blobs by row addresses (not row IDs). +/// +/// This is used by `take_blobs_by_indices` which already has row addresses +/// computed from row indices. Using this function bypasses the row ID index +/// lookup, which is necessary when `enable_stable_row_ids=true` because +/// row addresses are not the same as row IDs in that case. +pub(super) async fn take_blobs_by_addresses( + dataset: &Arc, + row_addrs: &[u64], + column: &str, +) -> Result> { + let projection = dataset.schema().project(&[column])?; + let blob_field = &projection.fields[0]; + let blob_field_id = blob_field.id; + if !projection.fields[0].is_blob() { + return Err(Error::InvalidInput { + location: location!(), + source: format!("the column '{}' is not a blob column", column).into(), + }); + } + + // Convert Schema to ProjectionPlan + let projection_request = ProjectionRequest::from(projection); + let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?); + + // Use try_new_from_addresses to bypass row ID index lookup. + // This is critical when enable_stable_row_ids=true because row addresses + // (fragment_id << 32 | row_offset) are different from row IDs (sequential integers). + let description_and_addr = TakeBuilder::try_new_from_addresses( + dataset.clone(), + row_addrs.to_vec(), + projection_plan, + )? + .with_row_address(true) + .execute() + .await?; + + let descriptions = description_and_addr.column(0).as_struct(); + let row_addrs_result = description_and_addr.column(1).as_primitive::(); + let blob_field_id = blob_field_id as u32; + + match dataset.blob_version() { + BlobVersion::V1 => { + collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result) + } + BlobVersion::V2 => { + collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs_result) + } + } +} + const INLINE_BLOB_KIND: u8 = 0; fn collect_blob_files_v1( @@ -469,4 +521,87 @@ mod tests { assert!(batch.column(0).data_type().is_struct()); } } + + /// Test that take_blobs_by_indices works correctly with enable_stable_row_ids=true. + /// + /// This is a regression test for a bug where take_blobs_by_indices would fail + /// with "index out of bounds" for fragment 1+ when stable row IDs are enabled. + /// The bug was caused by passing row addresses (from row_offsets_to_row_addresses) + /// to blob::take_blobs which expected row IDs. When stable row IDs are enabled, + /// row addresses (fragment_id << 32 | offset) are different from row IDs + /// (sequential integers), causing the row ID index lookup to fail for fragment 1+. + #[tokio::test] + pub async fn test_take_blobs_by_indices_with_stable_row_ids() { + use arrow_array::RecordBatchIterator; + use crate::dataset::WriteParams; + + let test_dir = TempStrDir::default(); + + // Create test data with blob column + let data = lance_datagen::gen_batch() + .col("filterme", array::step::()) + .col("blobs", array::blob()) + .into_reader_rows(RowCount::from(6), BatchCount::from(1)) + .map(|batch| Ok(batch.unwrap())) + .collect::>>() + .unwrap(); + + // Write with enable_stable_row_ids=true and force multiple fragments + let write_params = WriteParams { + enable_stable_row_ids: true, + max_rows_per_file: 3, // Force 2 fragments with 3 rows each + ..Default::default() + }; + + let reader = RecordBatchIterator::new( + data.clone().into_iter().map(Ok), + data[0].schema(), + ); + let dataset = Arc::new( + Dataset::write(reader, &test_dir, Some(write_params)) + .await + .unwrap(), + ); + + // Verify we have multiple fragments + let fragments = dataset.fragments(); + assert!( + fragments.len() >= 2, + "Expected at least 2 fragments, got {}", + fragments.len() + ); + + // Test first fragment (indices 0, 1, 2) - this always worked + let blobs = dataset + .take_blobs_by_indices(&[0, 1, 2], "blobs") + .await + .unwrap(); + assert_eq!(blobs.len(), 3, "First fragment blobs should have 3 items"); + + // Verify we can read the blob content + for blob in &blobs { + let content = blob.read().await.unwrap(); + assert!(!content.is_empty(), "Blob content should not be empty"); + } + + // Test second fragment (indices 3, 4, 5) - this was failing before the fix + let blobs = dataset + .take_blobs_by_indices(&[3, 4, 5], "blobs") + .await + .unwrap(); + assert_eq!(blobs.len(), 3, "Second fragment blobs should have 3 items"); + + // Verify we can read the blob content from second fragment + for blob in &blobs { + let content = blob.read().await.unwrap(); + assert!(!content.is_empty(), "Blob content should not be empty"); + } + + // Test mixed indices from both fragments + let blobs = dataset + .take_blobs_by_indices(&[1, 4], "blobs") + .await + .unwrap(); + assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items"); + } } diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 11114818e69..f39531f9853 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -136,9 +136,16 @@ async fn do_take_rows( if row_addrs.is_empty() { // It is possible that `row_id_index` returns None when a fragment has been wholly deleted - return Ok(RecordBatch::new_empty(Arc::new( - builder.projection.output_schema()?, - ))); + let empty_batch = RecordBatch::new_empty(Arc::new(builder.projection.output_schema()?)); + // If row addresses were requested, add an empty row address column. + // This ensures callers that expect the _rowaddr column don't panic. + if builder.with_row_address { + let row_addr_col = Arc::new(UInt64Array::from(Vec::::new())); + let row_addr_field = + ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); + return Ok(empty_batch.try_with_column(row_addr_field, row_addr_col)?); + } + return Ok(empty_batch); } let row_addr_stats = check_row_addrs(&row_addrs); From a8a6705b0f84523dc821854f9633e6c19d5432ad Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Tue, 2 Dec 2025 14:11:47 -0800 Subject: [PATCH 2/4] style: apply cargo fmt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance/src/dataset/blob.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index b745e54325e..d72320d15f1 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -238,14 +238,11 @@ pub(super) async fn take_blobs_by_addresses( // Use try_new_from_addresses to bypass row ID index lookup. // This is critical when enable_stable_row_ids=true because row addresses // (fragment_id << 32 | row_offset) are different from row IDs (sequential integers). - let description_and_addr = TakeBuilder::try_new_from_addresses( - dataset.clone(), - row_addrs.to_vec(), - projection_plan, - )? - .with_row_address(true) - .execute() - .await?; + let description_and_addr = + TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)? + .with_row_address(true) + .execute() + .await?; let descriptions = description_and_addr.column(0).as_struct(); let row_addrs_result = description_and_addr.column(1).as_primitive::(); @@ -532,8 +529,8 @@ mod tests { /// (sequential integers), causing the row ID index lookup to fail for fragment 1+. #[tokio::test] pub async fn test_take_blobs_by_indices_with_stable_row_ids() { - use arrow_array::RecordBatchIterator; use crate::dataset::WriteParams; + use arrow_array::RecordBatchIterator; let test_dir = TempStrDir::default(); @@ -553,10 +550,7 @@ mod tests { ..Default::default() }; - let reader = RecordBatchIterator::new( - data.clone().into_iter().map(Ok), - data[0].schema(), - ); + let reader = RecordBatchIterator::new(data.clone().into_iter().map(Ok), data[0].schema()); let dataset = Arc::new( Dataset::write(reader, &test_dir, Some(write_params)) .await From 7300fa85fd5fc10cfea1c21ad892bc559295f031 Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Tue, 2 Dec 2025 19:08:11 -0800 Subject: [PATCH 3/4] refactor: make take_blobs_by_addresses public API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR feedback: - Make take_blobs_by_addresses public (was pub(super)) - Add Dataset::take_blobs_by_addresses public method - Simplify take_blobs_by_indices doc to remove internal details - Add proper public documentation for take_blobs_by_addresses This allows callers to use row addresses directly when they already have them, providing flexibility alongside take_blobs (for row IDs) and take_blobs_by_indices (for row indices/offsets). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance/src/dataset.rs | 20 +++++++++++++++----- rust/lance/src/dataset/blob.rs | 13 +++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 63be5de8955..66fc5d061a6 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1492,12 +1492,22 @@ impl Dataset { blob::take_blobs(self, row_ids, column.as_ref()).await } - /// Take [BlobFile] by row indices. + /// Take [BlobFile] by row addresses. /// - /// This method converts row indices (offsets in the dataset) to row addresses, - /// then retrieves the blob data. Unlike `take_blobs` which expects row IDs, - /// this method works correctly with `enable_stable_row_ids=true` because it - /// uses row addresses directly instead of looking them up via the row ID index. + /// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`. + /// Use this method when you already have row addresses, for example from + /// a scan with `with_row_address()`. For row IDs (stable identifiers), use + /// [`Self::take_blobs`]. For row indices (offsets), use + /// [`Self::take_blobs_by_indices`]. + pub async fn take_blobs_by_addresses( + self: &Arc, + row_addrs: &[u64], + column: impl AsRef, + ) -> Result> { + blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await + } + + /// Take [BlobFile] by row indices (offsets in the dataset). pub async fn take_blobs_by_indices( self: &Arc, row_indices: &[u64], diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index d72320d15f1..08a952a1441 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -210,13 +210,14 @@ pub(super) async fn take_blobs( } } -/// Internal function for taking blobs by row addresses (not row IDs). +/// Take [BlobFile] by row addresses. /// -/// This is used by `take_blobs_by_indices` which already has row addresses -/// computed from row indices. Using this function bypasses the row ID index -/// lookup, which is necessary when `enable_stable_row_ids=true` because -/// row addresses are not the same as row IDs in that case. -pub(super) async fn take_blobs_by_addresses( +/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`. +/// Use this method when you already have row addresses, for example from +/// a scan with `with_row_address()`. For row IDs (stable identifiers), use +/// [`Dataset::take_blobs`]. For row indices (offsets), use +/// [`Dataset::take_blobs_by_indices`]. +pub async fn take_blobs_by_addresses( dataset: &Arc, row_addrs: &[u64], column: &str, From 81c1530e7808bc607de0eb65d35e18fa96557505 Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Wed, 3 Dec 2025 11:45:04 -0800 Subject: [PATCH 4/4] style: apply cargo fmt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance/src/dataset/blob.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 7e26a7c0c6d..e34b82954fc 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -306,7 +306,6 @@ pub async fn take_blobs_by_addresses( } } - fn collect_blob_files_v1( dataset: &Arc, blob_field_id: u32,