From d4e487049bc1eeae3013177842128fdbcd084314 Mon Sep 17 00:00:00 2001 From: Enwei Date: Fri, 27 Mar 2026 10:38:02 +0000 Subject: [PATCH 1/4] perf: optimize stable row_id index build from O(rows) to O(fragments) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix two performance bottlenecks in RowIdIndex construction that caused extreme cold start latency when `enable_stable_row_id` is enabled: 1. `decompose_sequence` (lance-table/src/rowids/index.rs): Previously expanded every Range segment element-by-element to check deletions, then re-compressed back. For Range segments with no deletions, this is pure waste — input is Range, output is Range. Add O(1) fast path that constructs index chunks directly. Complexity: O(total_rows) → O(num_fragments). 2. `load_row_id_index` (lance/src/dataset/rowids.rs): Used linear search (.find()) over all fragments for each fragment_id, giving O(N²) total. Also spawned all N futures via try_join_all and called get_deletion_vector() even when no deletion file exists. Fix: HashMap for O(1) lookup, buffer_unordered for controlled concurrency, skip deletion vector load when unnecessary. Benchmarks on real S3 datasets: - 968M rows, 3540 fragments: 18.9s → 150ms (126x faster) - 4.26B rows, 18243 fragments: 220s → 89ms (2471x faster) All existing tests pass. Correctness verified against production data with boundary checks, random sampling, and data consistency validation. Co-Authored-By: Claude Opus 4.6 (1M context) --- rust/lance-table/src/rowids/OPTIMIZATION.md | 112 +++++++++++++ rust/lance-table/src/rowids/index.rs | 176 ++++++++++++++++++-- rust/lance/src/dataset/rowids.rs | 40 +++-- 3 files changed, 296 insertions(+), 32 deletions(-) create mode 100644 rust/lance-table/src/rowids/OPTIMIZATION.md diff --git a/rust/lance-table/src/rowids/OPTIMIZATION.md b/rust/lance-table/src/rowids/OPTIMIZATION.md new file mode 100644 index 00000000000..dbe6362b0cd --- /dev/null +++ b/rust/lance-table/src/rowids/OPTIMIZATION.md @@ -0,0 +1,112 @@ +# RowIdIndex Build Optimization + +## Problem + +When `enable_stable_row_id` is enabled, the first `take_rows()` call triggers a full `RowIdIndex` build. On large datasets this cold start was extremely slow: + +| Dataset | Rows | Fragments | Cold start (before) | +|---------|------|-----------|-------------------| +| 968M rows | 968,938,257 | 3,540 | 18.9 seconds | +| 4.26B rows | 4,261,682,117 | 18,243 | 220 seconds (3.7 min) | + +## Root Causes + +### 1. O(total_rows) segment expansion in `decompose_sequence` + +**File:** `rust/lance-table/src/rowids/index.rs` + +`decompose_sequence` builds the row_id-to-address mapping for each fragment. The original code expanded every `U64Segment` element-by-element: + +```rust +segment.iter() // Range(0..273711) expands to 273711 individual u64s + .enumerate() + .filter_map(...) // checks deletion_vector.contains() for each row + .collect::>() // allocates Vec of all active (row_id, address) pairs +``` + +Then it split the pairs into two `Vec` and called `U64Segment::from_iter()` to re-compress back into a segment. + +For a `Range(0..273711)` with no deletions, this means: + +- Iterate 273,711 values +- Check deletion vector 273,711 times (all pass) +- Allocate and fill a `Vec<(u64, u64)>` with 273,711 entries +- Split into two `Vec`, each with 273,711 entries +- `from_iter` re-discovers that both are contiguous ranges + +**The input is a Range, the output is a Range, but 273K iterations happen in between.** + +Across 18,243 fragments averaging 233K rows each, this totals **4.26 billion iterations** with ~32 GB of temporary allocations. + +### 2. O(N²) fragment lookup in `load_row_id_index` + +**File:** `rust/lance/src/dataset/rowids.rs` + +`load_row_id_index` converts row_id sequences into `FragmentRowIdIndex` structs. The original code did: + +```rust +try_join_all(sequences.into_iter().map(|(fragment_id, sequence)| { + async move { + let fragments = dataset.get_fragments(); // returns all N fragments + let fragment = fragments.iter() + .find(|f| f.id() as u32 == fragment_id); // O(N) linear search + fragment.get_deletion_vector().await // called even without deletion file + } +})) +``` + +Three compounding issues: + +- **O(N) linear search × N fragments = O(N²):** For 18,243 fragments, this is 333 million comparisons. +- **`try_join_all` spawns all N futures at once:** Overwhelms the async runtime scheduler with 18K concurrent tasks. +- **`get_deletion_vector()` called unconditionally:** Even fragments with no deletion file pay the async overhead. + +## Solution + +### Fix 1: O(1) fast path for Range segments without deletions + +When a fragment has no deletions and its row_id sequence is a `Range`, we construct the index chunk directly without iterating: + +```rust +U64Segment::Range(range) => { + let row_id_segment = U64Segment::Range(range.clone()); + let address_segment = U64Segment::Range(start_address..start_address + len); + // Done in O(1), no iteration needed +} +``` + +This reduces `decompose_sequence` from O(total_rows) to O(num_fragments) for the common case. + +### Fix 2: HashMap lookup + conditional deletion vector loading + +```rust +// O(1) lookup via HashMap instead of O(N) linear search +let fragment_map: HashMap = fragments.iter() + .map(|f| (f.id() as u32, f)).collect(); + +// buffer_unordered instead of try_join_all for controlled concurrency +futures::stream::iter(sequences.into_iter().map(|(fragment_id, sequence)| { + let fragment = fragment_map.get(&fragment_id).expect("Fragment should exist"); + // Skip async deletion vector load when there's no deletion file + if has_deletion_file { + fragment.get_deletion_vector().await + } else { + DeletionVector::default() + } +})) +.buffer_unordered(io_parallelism()) +``` + +## Results + +| Dataset | Before | After | Speedup | +|---------|--------|-------|---------| +| 968M rows, 3,540 fragments | 18.9s | 150ms | **126x** | +| 4.26B rows, 18,243 fragments | 220s | 89ms | **2,471x** | + +All existing tests pass. Correctness verified against real S3 datasets with: + +- Boundary row_ids (first/last row of each fragment) +- 1,000 random row_ids across the full range +- Data consistency between `take()` and `take_rows()` +- Out-of-range row_id handling diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 0bf4eb76027..42185bbdd4f 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -97,6 +97,7 @@ fn decompose_sequence( ) -> Vec<(RangeInclusive, (U64Segment, U64Segment))> { let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into(); let mut current_offset = 0u32; + let no_deletions = frag_index.deletion_vector.is_empty(); frag_index .row_id_sequence @@ -105,38 +106,117 @@ fn decompose_sequence( .filter_map(|segment| { let segment_len = segment.len(); - let active_pairs: Vec<(u64, u64)> = segment - .iter() - .enumerate() - .filter_map(|(i, row_id)| { - let row_offset = current_offset + i as u32; - if !frag_index.deletion_vector.contains(row_offset) { - let address = start_address + i as u64; - Some((row_id, address)) - } else { - None - } - }) - .collect(); + let result = if no_deletions { + // Fast path: no deletions, avoid per-row iteration. + // Directly construct index chunks from the segment metadata. + decompose_segment_no_deletions(segment, start_address) + } else { + // Slow path: has deletions, must check each row. + decompose_segment_with_deletions( + segment, + start_address, + current_offset, + &frag_index.deletion_vector, + ) + }; current_offset += segment_len as u32; start_address += segment_len as u64; - if active_pairs.is_empty() { + result + }) + .flatten() + .collect() +} + +/// Fast path: decompose a segment when there are no deletions. +/// This avoids iterating over every row and instead operates on segment +/// metadata directly, reducing complexity from O(rows) to O(1) for Range +/// segments. +fn decompose_segment_no_deletions( + segment: &U64Segment, + start_address: u64, +) -> Option, (U64Segment, U64Segment))>> { + match segment { + U64Segment::Range(range) => { + if range.is_empty() { return None; } + // Row IDs are range.start..range.end, addresses are start_address..start_address+len. + // Both are contiguous ranges, so we can construct them directly in O(1). + let len = range.end - range.start; + let row_id_segment = U64Segment::Range(range.clone()); + let address_segment = U64Segment::Range(start_address..start_address + len); + let coverage = range.start..=range.end - 1; + Some(vec![(coverage, (row_id_segment, address_segment))]) + } + _ => { + // For non-Range segments (RangeWithHoles, RangeWithBitmap, SortedArray, Array), + // the address mapping is non-trivial (addresses use positional offsets within + // the original range, not within the active set). Fall back to per-element iteration. + // This is still fast since there are no deletions to check. + let segment_len = segment.len(); + if segment_len == 0 { + return None; + } + + let active_pairs: Vec<(u64, u64)> = segment + .iter() + .enumerate() + .map(|(i, row_id)| { + let address = start_address + i as u64; + (row_id, address) + }) + .collect(); let row_ids: Vec = active_pairs.iter().map(|(rid, _)| *rid).collect(); let addresses: Vec = active_pairs.iter().map(|(_, addr)| *addr).collect(); - let row_id_segment = U64Segment::from_iter(row_ids.iter().copied()); - let address_segment = U64Segment::from_iter(addresses.iter().copied()); + let row_id_segment = U64Segment::from_iter(row_ids.into_iter()); + let address_segment = U64Segment::from_iter(addresses.into_iter()); let coverage = row_id_segment.range()?; - Some((coverage, (row_id_segment, address_segment))) + Some(vec![(coverage, (row_id_segment, address_segment))]) + } + } +} + +/// Slow path: decompose a segment when there are deletions. +/// Must check each row against the deletion vector. +fn decompose_segment_with_deletions( + segment: &U64Segment, + start_address: u64, + current_offset: u32, + deletion_vector: &DeletionVector, +) -> Option, (U64Segment, U64Segment))>> { + let active_pairs: Vec<(u64, u64)> = segment + .iter() + .enumerate() + .filter_map(|(i, row_id)| { + let row_offset = current_offset + i as u32; + if !deletion_vector.contains(row_offset) { + let address = start_address + i as u64; + Some((row_id, address)) + } else { + None + } }) - .collect() + .collect(); + + if active_pairs.is_empty() { + return None; + } + + let row_ids: Vec = active_pairs.iter().map(|(rid, _)| *rid).collect(); + let addresses: Vec = active_pairs.iter().map(|(_, addr)| *addr).collect(); + + let row_id_segment = U64Segment::from_iter(row_ids.into_iter()); + let address_segment = U64Segment::from_iter(addresses.into_iter()); + + let coverage = row_id_segment.range()?; + + Some(vec![(coverage, (row_id_segment, address_segment))]) } type IndexChunk = (RangeInclusive, (U64Segment, U64Segment)); @@ -547,6 +627,66 @@ mod tests { }) } + #[test] + fn test_large_range_segments_no_deletions() { + // Simulates a real-world scenario: many fragments with large Range segments + // and no deletions. Before optimization, this would iterate over all rows + // (O(total_rows)). After optimization, it's O(num_fragments). + let rows_per_fragment = 250_000u64; + let num_fragments = 100u32; + let mut offset = 0u64; + + let fragment_indices: Vec = (0..num_fragments) + .map(|frag_id| { + let start = offset; + offset += rows_per_fragment; + FragmentRowIdIndex { + fragment_id: frag_id, + row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range( + start..start + rows_per_fragment, + )])), + deletion_vector: Arc::new(DeletionVector::default()), + } + }) + .collect(); + + let start = std::time::Instant::now(); + let index = RowIdIndex::new(&fragment_indices).unwrap(); + let elapsed = start.elapsed(); + + // Verify correctness at boundaries + assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0))); + assert_eq!( + index.get(rows_per_fragment - 1), + Some(RowAddress::new_from_parts(0, rows_per_fragment as u32 - 1)) + ); + assert_eq!( + index.get(rows_per_fragment), + Some(RowAddress::new_from_parts(1, 0)) + ); + let last_row = num_fragments as u64 * rows_per_fragment - 1; + assert_eq!( + index.get(last_row), + Some(RowAddress::new_from_parts( + num_fragments - 1, + rows_per_fragment as u32 - 1 + )) + ); + assert_eq!(index.get(last_row + 1), None); + + // With the optimization, building an index for 25M rows across 100 fragments + // should complete in well under 1 second (typically < 1ms). + assert!( + elapsed.as_secs() < 1, + "Index build took {:?} for {} fragments x {} rows = {} total rows. \ + This suggests the O(rows) -> O(fragments) optimization is not working.", + elapsed, + num_fragments, + rows_per_fragment, + num_fragments as u64 * rows_per_fragment, + ); + } + proptest::proptest! { #[test] fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) { diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 447868fca99..19684dcf50f 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -92,19 +92,28 @@ async fn load_row_id_index(dataset: &Dataset) -> Result>() .await?; - let fragment_indices: Vec<_> = - futures::future::try_join_all(sequences.into_iter().map(|(fragment_id, sequence)| { - let dataset = dataset.clone(); + // Build a lookup map for O(1) fragment access instead of O(N) linear search. + let fragments = dataset.get_fragments(); + let fragment_map: std::collections::HashMap = + fragments.iter().map(|f| (f.id() as u32, f)).collect(); + + // Load deletion vectors. Use buffered concurrency to avoid spawning all + // futures at once (which can overwhelm the runtime for large datasets). + let fragment_indices: Vec<_> = futures::stream::iter(sequences.into_iter().map( + |(fragment_id, sequence)| { + let fragment = fragment_map + .get(&fragment_id) + .expect("Fragment should exist"); + let has_deletion_file = fragment.metadata().deletion_file.is_some(); + let fragment_clone = (*fragment).clone(); async move { - let fragments = dataset.get_fragments(); - let fragment = fragments - .iter() - .find(|f| f.id() as u32 == fragment_id) - .expect("Fragment should exist"); - - let deletion_vector = match fragment.get_deletion_vector().await { - Ok(Some(dv)) => dv, - Ok(None) | Err(_) => Arc::new(DeletionVector::default()), + let deletion_vector = if has_deletion_file { + match fragment_clone.get_deletion_vector().await { + Ok(Some(dv)) => dv, + Ok(None) | Err(_) => Arc::new(DeletionVector::default()), + } + } else { + Arc::new(DeletionVector::default()) }; Ok::(FragmentRowIdIndex { @@ -113,8 +122,11 @@ async fn load_row_id_index(dataset: &Dataset) -> Result Date: Fri, 27 Mar 2026 19:09:13 +0800 Subject: [PATCH 2/4] refactor: simplify decompose_sequence and remove OPTIMIZATION.md - Extract shared build_chunk_from_pairs helper to eliminate duplication between decompose_segment_no_deletions and decompose_segment_with_deletions - Use .unzip() instead of manual map/collect for splitting pairs - Return Option instead of Option> to avoid unnecessary single-element Vec allocations - Fix clippy warnings: redundant .into_iter(), len() == 0 -> is_empty() - Remove redundant comments that restate what the code does - Remove OPTIMIZATION.md (content moved to PR description) Co-Authored-By: Claude Opus 4.6 (1M context) --- rust/lance-table/src/rowids/OPTIMIZATION.md | 112 -------------------- rust/lance-table/src/rowids/index.rs | 86 +++++---------- rust/lance/src/dataset/rowids.rs | 3 - 3 files changed, 26 insertions(+), 175 deletions(-) delete mode 100644 rust/lance-table/src/rowids/OPTIMIZATION.md diff --git a/rust/lance-table/src/rowids/OPTIMIZATION.md b/rust/lance-table/src/rowids/OPTIMIZATION.md deleted file mode 100644 index dbe6362b0cd..00000000000 --- a/rust/lance-table/src/rowids/OPTIMIZATION.md +++ /dev/null @@ -1,112 +0,0 @@ -# RowIdIndex Build Optimization - -## Problem - -When `enable_stable_row_id` is enabled, the first `take_rows()` call triggers a full `RowIdIndex` build. On large datasets this cold start was extremely slow: - -| Dataset | Rows | Fragments | Cold start (before) | -|---------|------|-----------|-------------------| -| 968M rows | 968,938,257 | 3,540 | 18.9 seconds | -| 4.26B rows | 4,261,682,117 | 18,243 | 220 seconds (3.7 min) | - -## Root Causes - -### 1. O(total_rows) segment expansion in `decompose_sequence` - -**File:** `rust/lance-table/src/rowids/index.rs` - -`decompose_sequence` builds the row_id-to-address mapping for each fragment. The original code expanded every `U64Segment` element-by-element: - -```rust -segment.iter() // Range(0..273711) expands to 273711 individual u64s - .enumerate() - .filter_map(...) // checks deletion_vector.contains() for each row - .collect::>() // allocates Vec of all active (row_id, address) pairs -``` - -Then it split the pairs into two `Vec` and called `U64Segment::from_iter()` to re-compress back into a segment. - -For a `Range(0..273711)` with no deletions, this means: - -- Iterate 273,711 values -- Check deletion vector 273,711 times (all pass) -- Allocate and fill a `Vec<(u64, u64)>` with 273,711 entries -- Split into two `Vec`, each with 273,711 entries -- `from_iter` re-discovers that both are contiguous ranges - -**The input is a Range, the output is a Range, but 273K iterations happen in between.** - -Across 18,243 fragments averaging 233K rows each, this totals **4.26 billion iterations** with ~32 GB of temporary allocations. - -### 2. O(N²) fragment lookup in `load_row_id_index` - -**File:** `rust/lance/src/dataset/rowids.rs` - -`load_row_id_index` converts row_id sequences into `FragmentRowIdIndex` structs. The original code did: - -```rust -try_join_all(sequences.into_iter().map(|(fragment_id, sequence)| { - async move { - let fragments = dataset.get_fragments(); // returns all N fragments - let fragment = fragments.iter() - .find(|f| f.id() as u32 == fragment_id); // O(N) linear search - fragment.get_deletion_vector().await // called even without deletion file - } -})) -``` - -Three compounding issues: - -- **O(N) linear search × N fragments = O(N²):** For 18,243 fragments, this is 333 million comparisons. -- **`try_join_all` spawns all N futures at once:** Overwhelms the async runtime scheduler with 18K concurrent tasks. -- **`get_deletion_vector()` called unconditionally:** Even fragments with no deletion file pay the async overhead. - -## Solution - -### Fix 1: O(1) fast path for Range segments without deletions - -When a fragment has no deletions and its row_id sequence is a `Range`, we construct the index chunk directly without iterating: - -```rust -U64Segment::Range(range) => { - let row_id_segment = U64Segment::Range(range.clone()); - let address_segment = U64Segment::Range(start_address..start_address + len); - // Done in O(1), no iteration needed -} -``` - -This reduces `decompose_sequence` from O(total_rows) to O(num_fragments) for the common case. - -### Fix 2: HashMap lookup + conditional deletion vector loading - -```rust -// O(1) lookup via HashMap instead of O(N) linear search -let fragment_map: HashMap = fragments.iter() - .map(|f| (f.id() as u32, f)).collect(); - -// buffer_unordered instead of try_join_all for controlled concurrency -futures::stream::iter(sequences.into_iter().map(|(fragment_id, sequence)| { - let fragment = fragment_map.get(&fragment_id).expect("Fragment should exist"); - // Skip async deletion vector load when there's no deletion file - if has_deletion_file { - fragment.get_deletion_vector().await - } else { - DeletionVector::default() - } -})) -.buffer_unordered(io_parallelism()) -``` - -## Results - -| Dataset | Before | After | Speedup | -|---------|--------|-------|---------| -| 968M rows, 3,540 fragments | 18.9s | 150ms | **126x** | -| 4.26B rows, 18,243 fragments | 220s | 89ms | **2,471x** | - -All existing tests pass. Correctness verified against real S3 datasets with: - -- Boundary row_ids (first/last row of each fragment) -- 1,000 random row_ids across the full range -- Data consistency between `take()` and `take_rows()` -- Out-of-range row_id handling diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 42185bbdd4f..391ba5c74ba 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -107,11 +107,8 @@ fn decompose_sequence( let segment_len = segment.len(); let result = if no_deletions { - // Fast path: no deletions, avoid per-row iteration. - // Directly construct index chunks from the segment metadata. decompose_segment_no_deletions(segment, start_address) } else { - // Slow path: has deletions, must check each row. decompose_segment_with_deletions( segment, start_address, @@ -125,98 +122,67 @@ fn decompose_sequence( result }) - .flatten() .collect() } -/// Fast path: decompose a segment when there are no deletions. -/// This avoids iterating over every row and instead operates on segment -/// metadata directly, reducing complexity from O(rows) to O(1) for Range -/// segments. +/// Build an IndexChunk from a list of (row_id, address) pairs. +fn build_chunk_from_pairs(pairs: Vec<(u64, u64)>) -> Option { + if pairs.is_empty() { + return None; + } + let (row_ids, addresses): (Vec, Vec) = pairs.into_iter().unzip(); + let row_id_segment = U64Segment::from_iter(row_ids); + let address_segment = U64Segment::from_iter(addresses); + let coverage = row_id_segment.range()?; + Some((coverage, (row_id_segment, address_segment))) +} + +/// Fast path: no deletions. O(1) for Range segments. fn decompose_segment_no_deletions( segment: &U64Segment, start_address: u64, -) -> Option, (U64Segment, U64Segment))>> { +) -> Option { match segment { - U64Segment::Range(range) => { - if range.is_empty() { - return None; - } - // Row IDs are range.start..range.end, addresses are start_address..start_address+len. - // Both are contiguous ranges, so we can construct them directly in O(1). + U64Segment::Range(range) if !range.is_empty() => { let len = range.end - range.start; let row_id_segment = U64Segment::Range(range.clone()); let address_segment = U64Segment::Range(start_address..start_address + len); let coverage = range.start..=range.end - 1; - Some(vec![(coverage, (row_id_segment, address_segment))]) + Some((coverage, (row_id_segment, address_segment))) } + _ if segment.is_empty() => None, _ => { - // For non-Range segments (RangeWithHoles, RangeWithBitmap, SortedArray, Array), - // the address mapping is non-trivial (addresses use positional offsets within - // the original range, not within the active set). Fall back to per-element iteration. - // This is still fast since there are no deletions to check. - let segment_len = segment.len(); - if segment_len == 0 { - return None; - } - - let active_pairs: Vec<(u64, u64)> = segment + // Non-Range segments: must iterate to build address mapping. + let pairs: Vec<(u64, u64)> = segment .iter() .enumerate() - .map(|(i, row_id)| { - let address = start_address + i as u64; - (row_id, address) - }) + .map(|(i, row_id)| (row_id, start_address + i as u64)) .collect(); - - let row_ids: Vec = active_pairs.iter().map(|(rid, _)| *rid).collect(); - let addresses: Vec = active_pairs.iter().map(|(_, addr)| *addr).collect(); - - let row_id_segment = U64Segment::from_iter(row_ids.into_iter()); - let address_segment = U64Segment::from_iter(addresses.into_iter()); - - let coverage = row_id_segment.range()?; - - Some(vec![(coverage, (row_id_segment, address_segment))]) + build_chunk_from_pairs(pairs) } } } -/// Slow path: decompose a segment when there are deletions. -/// Must check each row against the deletion vector. +/// Slow path: has deletions, must check each row. fn decompose_segment_with_deletions( segment: &U64Segment, start_address: u64, current_offset: u32, deletion_vector: &DeletionVector, -) -> Option, (U64Segment, U64Segment))>> { - let active_pairs: Vec<(u64, u64)> = segment +) -> Option { + let pairs: Vec<(u64, u64)> = segment .iter() .enumerate() .filter_map(|(i, row_id)| { let row_offset = current_offset + i as u32; if !deletion_vector.contains(row_offset) { - let address = start_address + i as u64; - Some((row_id, address)) + Some((row_id, start_address + i as u64)) } else { None } }) .collect(); - - if active_pairs.is_empty() { - return None; - } - - let row_ids: Vec = active_pairs.iter().map(|(rid, _)| *rid).collect(); - let addresses: Vec = active_pairs.iter().map(|(_, addr)| *addr).collect(); - - let row_id_segment = U64Segment::from_iter(row_ids.into_iter()); - let address_segment = U64Segment::from_iter(addresses.into_iter()); - - let coverage = row_id_segment.range()?; - - Some(vec![(coverage, (row_id_segment, address_segment))]) + build_chunk_from_pairs(pairs) } type IndexChunk = (RangeInclusive, (U64Segment, U64Segment)); diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 19684dcf50f..8ab7d4fa8b5 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -92,13 +92,10 @@ async fn load_row_id_index(dataset: &Dataset) -> Result>() .await?; - // Build a lookup map for O(1) fragment access instead of O(N) linear search. let fragments = dataset.get_fragments(); let fragment_map: std::collections::HashMap = fragments.iter().map(|f| (f.id() as u32, f)).collect(); - // Load deletion vectors. Use buffered concurrency to avoid spawning all - // futures at once (which can overwhelm the runtime for large datasets). let fragment_indices: Vec<_> = futures::stream::iter(sequences.into_iter().map( |(fragment_id, sequence)| { let fragment = fragment_map From a824ca2ef06c3d790fb9e76808c74b5f88bb2a88 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Fri, 27 Mar 2026 19:36:01 +0800 Subject: [PATCH 3/4] style: cargo fmt Co-Authored-By: Claude Opus 4.6 (1M context) --- rust/lance-table/src/rowids/index.rs | 5 +---- rust/lance/src/dataset/rowids.rs | 13 ++++++------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 391ba5c74ba..936d88a06e2 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -138,10 +138,7 @@ fn build_chunk_from_pairs(pairs: Vec<(u64, u64)>) -> Option { } /// Fast path: no deletions. O(1) for Range segments. -fn decompose_segment_no_deletions( - segment: &U64Segment, - start_address: u64, -) -> Option { +fn decompose_segment_no_deletions(segment: &U64Segment, start_address: u64) -> Option { match segment { U64Segment::Range(range) if !range.is_empty() => { let len = range.end - range.start; diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 8ab7d4fa8b5..d40a9adfd50 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -96,8 +96,8 @@ async fn load_row_id_index(dataset: &Dataset) -> Result = fragments.iter().map(|f| (f.id() as u32, f)).collect(); - let fragment_indices: Vec<_> = futures::stream::iter(sequences.into_iter().map( - |(fragment_id, sequence)| { + let fragment_indices: Vec<_> = + futures::stream::iter(sequences.into_iter().map(|(fragment_id, sequence)| { let fragment = fragment_map .get(&fragment_id) .expect("Fragment should exist"); @@ -119,11 +119,10 @@ async fn load_row_id_index(dataset: &Dataset) -> Result Date: Fri, 27 Mar 2026 20:04:48 +0800 Subject: [PATCH 4/4] test: add test case for range segments with deletions Co-Authored-By: Claude Opus 4.6 (1M context) --- rust/lance-table/src/rowids/index.rs | 66 ++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 936d88a06e2..7ad04bd961d 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -650,6 +650,72 @@ mod tests { ); } + #[test] + fn test_large_range_segments_with_deletions() { + let rows_per_fragment = 1_000u64; + let num_fragments = 10u32; + let mut offset = 0u64; + + let fragment_indices: Vec = (0..num_fragments) + .map(|frag_id| { + let start = offset; + offset += rows_per_fragment; + + // Delete every 3rd row (offsets 0, 3, 6, ...) within each fragment. + let mut deleted = roaring::RoaringBitmap::new(); + for i in (0..rows_per_fragment as u32).step_by(3) { + deleted.insert(i); + } + + FragmentRowIdIndex { + fragment_id: frag_id, + row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range( + start..start + rows_per_fragment, + )])), + deletion_vector: Arc::new(DeletionVector::Bitmap(deleted)), + } + }) + .collect(); + + let index = RowIdIndex::new(&fragment_indices).unwrap(); + + // Deleted rows (offset 0, 3, 6, ...) should not be found. + // Row ID 0 has offset 0 in fragment 0 -> deleted. + assert_eq!(index.get(0), None); + // Row ID 3 has offset 3 in fragment 0 -> deleted. + assert_eq!(index.get(3), None); + + // Non-deleted rows should resolve correctly. + // Row ID 1 has offset 1 in fragment 0 -> address (frag=0, row=1). + assert_eq!(index.get(1), Some(RowAddress::new_from_parts(0, 1))); + // Row ID 2 has offset 2 in fragment 0 -> address (frag=0, row=2). + assert_eq!(index.get(2), Some(RowAddress::new_from_parts(0, 2))); + // Row ID 4 has offset 4 in fragment 0 -> address (frag=0, row=4). + assert_eq!(index.get(4), Some(RowAddress::new_from_parts(0, 4))); + + // Check second fragment: row IDs start at 1000. + // Row ID 1000 has offset 0 in fragment 1 -> deleted. + assert_eq!(index.get(rows_per_fragment), None); + // Row ID 1001 has offset 1 in fragment 1 -> address (frag=1, row=1). + assert_eq!( + index.get(rows_per_fragment + 1), + Some(RowAddress::new_from_parts(1, 1)) + ); + + // Last fragment, last non-deleted row. + // Row ID 9999 has offset 999 in fragment 9 -> 999 % 3 == 0 -> deleted. + let last_row = num_fragments as u64 * rows_per_fragment - 1; + assert_eq!(index.get(last_row), None); + // Row ID 9998 has offset 998 -> 998 % 3 == 2 -> not deleted. + assert_eq!( + index.get(last_row - 1), + Some(RowAddress::new_from_parts(num_fragments - 1, 998)) + ); + + // Out of range. + assert_eq!(index.get(last_row + 1), None); + } + proptest::proptest! { #[test] fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) {