diff --git a/rust/lance-table/src/rowids/index.rs b/rust/lance-table/src/rowids/index.rs index 0bf4eb76027..7ad04bd961d 100644 --- a/rust/lance-table/src/rowids/index.rs +++ b/rust/lance-table/src/rowids/index.rs @@ -97,6 +97,7 @@ fn decompose_sequence( ) -> Vec<(RangeInclusive, (U64Segment, U64Segment))> { let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into(); let mut current_offset = 0u32; + let no_deletions = frag_index.deletion_vector.is_empty(); frag_index .row_id_sequence @@ -105,38 +106,80 @@ fn decompose_sequence( .filter_map(|segment| { let segment_len = segment.len(); - let active_pairs: Vec<(u64, u64)> = segment - .iter() - .enumerate() - .filter_map(|(i, row_id)| { - let row_offset = current_offset + i as u32; - if !frag_index.deletion_vector.contains(row_offset) { - let address = start_address + i as u64; - Some((row_id, address)) - } else { - None - } - }) - .collect(); + let result = if no_deletions { + decompose_segment_no_deletions(segment, start_address) + } else { + decompose_segment_with_deletions( + segment, + start_address, + current_offset, + &frag_index.deletion_vector, + ) + }; current_offset += segment_len as u32; start_address += segment_len as u64; - if active_pairs.is_empty() { - return None; - } - - let row_ids: Vec = active_pairs.iter().map(|(rid, _)| *rid).collect(); - let addresses: Vec = active_pairs.iter().map(|(_, addr)| *addr).collect(); - - let row_id_segment = U64Segment::from_iter(row_ids.iter().copied()); - let address_segment = U64Segment::from_iter(addresses.iter().copied()); + result + }) + .collect() +} - let coverage = row_id_segment.range()?; +/// Build an IndexChunk from a list of (row_id, address) pairs. +fn build_chunk_from_pairs(pairs: Vec<(u64, u64)>) -> Option { + if pairs.is_empty() { + return None; + } + let (row_ids, addresses): (Vec, Vec) = pairs.into_iter().unzip(); + let row_id_segment = U64Segment::from_iter(row_ids); + let address_segment = U64Segment::from_iter(addresses); + let coverage = row_id_segment.range()?; + Some((coverage, (row_id_segment, address_segment))) +} +/// Fast path: no deletions. O(1) for Range segments. +fn decompose_segment_no_deletions(segment: &U64Segment, start_address: u64) -> Option { + match segment { + U64Segment::Range(range) if !range.is_empty() => { + let len = range.end - range.start; + let row_id_segment = U64Segment::Range(range.clone()); + let address_segment = U64Segment::Range(start_address..start_address + len); + let coverage = range.start..=range.end - 1; Some((coverage, (row_id_segment, address_segment))) + } + _ if segment.is_empty() => None, + _ => { + // Non-Range segments: must iterate to build address mapping. + let pairs: Vec<(u64, u64)> = segment + .iter() + .enumerate() + .map(|(i, row_id)| (row_id, start_address + i as u64)) + .collect(); + build_chunk_from_pairs(pairs) + } + } +} + +/// Slow path: has deletions, must check each row. +fn decompose_segment_with_deletions( + segment: &U64Segment, + start_address: u64, + current_offset: u32, + deletion_vector: &DeletionVector, +) -> Option { + let pairs: Vec<(u64, u64)> = segment + .iter() + .enumerate() + .filter_map(|(i, row_id)| { + let row_offset = current_offset + i as u32; + if !deletion_vector.contains(row_offset) { + Some((row_id, start_address + i as u64)) + } else { + None + } }) - .collect() + .collect(); + build_chunk_from_pairs(pairs) } type IndexChunk = (RangeInclusive, (U64Segment, U64Segment)); @@ -547,6 +590,132 @@ mod tests { }) } + #[test] + fn test_large_range_segments_no_deletions() { + // Simulates a real-world scenario: many fragments with large Range segments + // and no deletions. Before optimization, this would iterate over all rows + // (O(total_rows)). After optimization, it's O(num_fragments). + let rows_per_fragment = 250_000u64; + let num_fragments = 100u32; + let mut offset = 0u64; + + let fragment_indices: Vec = (0..num_fragments) + .map(|frag_id| { + let start = offset; + offset += rows_per_fragment; + FragmentRowIdIndex { + fragment_id: frag_id, + row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range( + start..start + rows_per_fragment, + )])), + deletion_vector: Arc::new(DeletionVector::default()), + } + }) + .collect(); + + let start = std::time::Instant::now(); + let index = RowIdIndex::new(&fragment_indices).unwrap(); + let elapsed = start.elapsed(); + + // Verify correctness at boundaries + assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0))); + assert_eq!( + index.get(rows_per_fragment - 1), + Some(RowAddress::new_from_parts(0, rows_per_fragment as u32 - 1)) + ); + assert_eq!( + index.get(rows_per_fragment), + Some(RowAddress::new_from_parts(1, 0)) + ); + let last_row = num_fragments as u64 * rows_per_fragment - 1; + assert_eq!( + index.get(last_row), + Some(RowAddress::new_from_parts( + num_fragments - 1, + rows_per_fragment as u32 - 1 + )) + ); + assert_eq!(index.get(last_row + 1), None); + + // With the optimization, building an index for 25M rows across 100 fragments + // should complete in well under 1 second (typically < 1ms). + assert!( + elapsed.as_secs() < 1, + "Index build took {:?} for {} fragments x {} rows = {} total rows. \ + This suggests the O(rows) -> O(fragments) optimization is not working.", + elapsed, + num_fragments, + rows_per_fragment, + num_fragments as u64 * rows_per_fragment, + ); + } + + #[test] + fn test_large_range_segments_with_deletions() { + let rows_per_fragment = 1_000u64; + let num_fragments = 10u32; + let mut offset = 0u64; + + let fragment_indices: Vec = (0..num_fragments) + .map(|frag_id| { + let start = offset; + offset += rows_per_fragment; + + // Delete every 3rd row (offsets 0, 3, 6, ...) within each fragment. + let mut deleted = roaring::RoaringBitmap::new(); + for i in (0..rows_per_fragment as u32).step_by(3) { + deleted.insert(i); + } + + FragmentRowIdIndex { + fragment_id: frag_id, + row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range( + start..start + rows_per_fragment, + )])), + deletion_vector: Arc::new(DeletionVector::Bitmap(deleted)), + } + }) + .collect(); + + let index = RowIdIndex::new(&fragment_indices).unwrap(); + + // Deleted rows (offset 0, 3, 6, ...) should not be found. + // Row ID 0 has offset 0 in fragment 0 -> deleted. + assert_eq!(index.get(0), None); + // Row ID 3 has offset 3 in fragment 0 -> deleted. + assert_eq!(index.get(3), None); + + // Non-deleted rows should resolve correctly. + // Row ID 1 has offset 1 in fragment 0 -> address (frag=0, row=1). + assert_eq!(index.get(1), Some(RowAddress::new_from_parts(0, 1))); + // Row ID 2 has offset 2 in fragment 0 -> address (frag=0, row=2). + assert_eq!(index.get(2), Some(RowAddress::new_from_parts(0, 2))); + // Row ID 4 has offset 4 in fragment 0 -> address (frag=0, row=4). + assert_eq!(index.get(4), Some(RowAddress::new_from_parts(0, 4))); + + // Check second fragment: row IDs start at 1000. + // Row ID 1000 has offset 0 in fragment 1 -> deleted. + assert_eq!(index.get(rows_per_fragment), None); + // Row ID 1001 has offset 1 in fragment 1 -> address (frag=1, row=1). + assert_eq!( + index.get(rows_per_fragment + 1), + Some(RowAddress::new_from_parts(1, 1)) + ); + + // Last fragment, last non-deleted row. + // Row ID 9999 has offset 999 in fragment 9 -> 999 % 3 == 0 -> deleted. + let last_row = num_fragments as u64 * rows_per_fragment - 1; + assert_eq!(index.get(last_row), None); + // Row ID 9998 has offset 998 -> 998 % 3 == 2 -> not deleted. + assert_eq!( + index.get(last_row - 1), + Some(RowAddress::new_from_parts(num_fragments - 1, 998)) + ); + + // Out of range. + assert_eq!(index.get(last_row + 1), None); + } + proptest::proptest! { #[test] fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) { diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 447868fca99..d40a9adfd50 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -92,19 +92,25 @@ async fn load_row_id_index(dataset: &Dataset) -> Result>() .await?; + let fragments = dataset.get_fragments(); + let fragment_map: std::collections::HashMap = + fragments.iter().map(|f| (f.id() as u32, f)).collect(); + let fragment_indices: Vec<_> = - futures::future::try_join_all(sequences.into_iter().map(|(fragment_id, sequence)| { - let dataset = dataset.clone(); + futures::stream::iter(sequences.into_iter().map(|(fragment_id, sequence)| { + let fragment = fragment_map + .get(&fragment_id) + .expect("Fragment should exist"); + let has_deletion_file = fragment.metadata().deletion_file.is_some(); + let fragment_clone = (*fragment).clone(); async move { - let fragments = dataset.get_fragments(); - let fragment = fragments - .iter() - .find(|f| f.id() as u32 == fragment_id) - .expect("Fragment should exist"); - - let deletion_vector = match fragment.get_deletion_vector().await { - Ok(Some(dv)) => dv, - Ok(None) | Err(_) => Arc::new(DeletionVector::default()), + let deletion_vector = if has_deletion_file { + match fragment_clone.get_deletion_vector().await { + Ok(Some(dv)) => dv, + Ok(None) | Err(_) => Arc::new(DeletionVector::default()), + } + } else { + Arc::new(DeletionVector::default()) }; Ok::(FragmentRowIdIndex { @@ -114,6 +120,8 @@ async fn load_row_id_index(dataset: &Dataset) -> Result