Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 193 additions & 24 deletions rust/lance-table/src/rowids/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ fn decompose_sequence(
) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into();
let mut current_offset = 0u32;
let no_deletions = frag_index.deletion_vector.is_empty();

frag_index
.row_id_sequence
Expand All @@ -105,38 +106,80 @@ fn decompose_sequence(
.filter_map(|segment| {
let segment_len = segment.len();

let active_pairs: Vec<(u64, u64)> = segment
.iter()
.enumerate()
.filter_map(|(i, row_id)| {
let row_offset = current_offset + i as u32;
if !frag_index.deletion_vector.contains(row_offset) {
let address = start_address + i as u64;
Some((row_id, address))
} else {
None
}
})
.collect();
let result = if no_deletions {
decompose_segment_no_deletions(segment, start_address)
} else {
decompose_segment_with_deletions(
segment,
start_address,
current_offset,
&frag_index.deletion_vector,
)
};

current_offset += segment_len as u32;
start_address += segment_len as u64;

if active_pairs.is_empty() {
return None;
}

let row_ids: Vec<u64> = active_pairs.iter().map(|(rid, _)| *rid).collect();
let addresses: Vec<u64> = active_pairs.iter().map(|(_, addr)| *addr).collect();

let row_id_segment = U64Segment::from_iter(row_ids.iter().copied());
let address_segment = U64Segment::from_iter(addresses.iter().copied());
result
})
.collect()
}

let coverage = row_id_segment.range()?;
/// Build an IndexChunk from a list of (row_id, address) pairs.
fn build_chunk_from_pairs(pairs: Vec<(u64, u64)>) -> Option<IndexChunk> {
if pairs.is_empty() {
return None;
}
let (row_ids, addresses): (Vec<u64>, Vec<u64>) = pairs.into_iter().unzip();
let row_id_segment = U64Segment::from_iter(row_ids);
let address_segment = U64Segment::from_iter(addresses);
let coverage = row_id_segment.range()?;
Some((coverage, (row_id_segment, address_segment)))
}

/// Fast path: no deletions. O(1) for Range segments.
fn decompose_segment_no_deletions(segment: &U64Segment, start_address: u64) -> Option<IndexChunk> {
match segment {
U64Segment::Range(range) if !range.is_empty() => {
let len = range.end - range.start;
let row_id_segment = U64Segment::Range(range.clone());
let address_segment = U64Segment::Range(start_address..start_address + len);
let coverage = range.start..=range.end - 1;
Some((coverage, (row_id_segment, address_segment)))
}
_ if segment.is_empty() => None,
_ => {
// Non-Range segments: must iterate to build address mapping.
let pairs: Vec<(u64, u64)> = segment
.iter()
.enumerate()
.map(|(i, row_id)| (row_id, start_address + i as u64))
.collect();
build_chunk_from_pairs(pairs)
}
}
}

/// Slow path: has deletions, must check each row.
fn decompose_segment_with_deletions(
segment: &U64Segment,
start_address: u64,
current_offset: u32,
deletion_vector: &DeletionVector,
) -> Option<IndexChunk> {
let pairs: Vec<(u64, u64)> = segment
.iter()
.enumerate()
.filter_map(|(i, row_id)| {
let row_offset = current_offset + i as u32;
if !deletion_vector.contains(row_offset) {
Some((row_id, start_address + i as u64))
} else {
None
}
})
.collect()
.collect();
build_chunk_from_pairs(pairs)
}

type IndexChunk = (RangeInclusive<u64>, (U64Segment, U64Segment));
Expand Down Expand Up @@ -547,6 +590,132 @@ mod tests {
})
}

#[test]
fn test_large_range_segments_no_deletions() {
Comment thread
jiaoew1991 marked this conversation as resolved.
// Simulates a real-world scenario: many fragments with large Range segments
// and no deletions. Before optimization, this would iterate over all rows
// (O(total_rows)). After optimization, it's O(num_fragments).
let rows_per_fragment = 250_000u64;
let num_fragments = 100u32;
let mut offset = 0u64;

let fragment_indices: Vec<FragmentRowIdIndex> = (0..num_fragments)
.map(|frag_id| {
let start = offset;
offset += rows_per_fragment;
FragmentRowIdIndex {
fragment_id: frag_id,
row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(
start..start + rows_per_fragment,
)])),
deletion_vector: Arc::new(DeletionVector::default()),
}
})
.collect();

let start = std::time::Instant::now();
let index = RowIdIndex::new(&fragment_indices).unwrap();
let elapsed = start.elapsed();

// Verify correctness at boundaries
assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
assert_eq!(
index.get(rows_per_fragment - 1),
Some(RowAddress::new_from_parts(0, rows_per_fragment as u32 - 1))
);
assert_eq!(
index.get(rows_per_fragment),
Some(RowAddress::new_from_parts(1, 0))
);
let last_row = num_fragments as u64 * rows_per_fragment - 1;
assert_eq!(
index.get(last_row),
Some(RowAddress::new_from_parts(
num_fragments - 1,
rows_per_fragment as u32 - 1
))
);
assert_eq!(index.get(last_row + 1), None);

// With the optimization, building an index for 25M rows across 100 fragments
// should complete in well under 1 second (typically < 1ms).
assert!(
elapsed.as_secs() < 1,
"Index build took {:?} for {} fragments x {} rows = {} total rows. \
This suggests the O(rows) -> O(fragments) optimization is not working.",
elapsed,
num_fragments,
rows_per_fragment,
num_fragments as u64 * rows_per_fragment,
);
}

#[test]
fn test_large_range_segments_with_deletions() {
let rows_per_fragment = 1_000u64;
let num_fragments = 10u32;
let mut offset = 0u64;

let fragment_indices: Vec<FragmentRowIdIndex> = (0..num_fragments)
.map(|frag_id| {
let start = offset;
offset += rows_per_fragment;

// Delete every 3rd row (offsets 0, 3, 6, ...) within each fragment.
let mut deleted = roaring::RoaringBitmap::new();
for i in (0..rows_per_fragment as u32).step_by(3) {
deleted.insert(i);
}

FragmentRowIdIndex {
fragment_id: frag_id,
row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(
start..start + rows_per_fragment,
)])),
deletion_vector: Arc::new(DeletionVector::Bitmap(deleted)),
}
})
.collect();

let index = RowIdIndex::new(&fragment_indices).unwrap();

// Deleted rows (offset 0, 3, 6, ...) should not be found.
// Row ID 0 has offset 0 in fragment 0 -> deleted.
assert_eq!(index.get(0), None);
// Row ID 3 has offset 3 in fragment 0 -> deleted.
assert_eq!(index.get(3), None);

// Non-deleted rows should resolve correctly.
// Row ID 1 has offset 1 in fragment 0 -> address (frag=0, row=1).
assert_eq!(index.get(1), Some(RowAddress::new_from_parts(0, 1)));
// Row ID 2 has offset 2 in fragment 0 -> address (frag=0, row=2).
assert_eq!(index.get(2), Some(RowAddress::new_from_parts(0, 2)));
// Row ID 4 has offset 4 in fragment 0 -> address (frag=0, row=4).
assert_eq!(index.get(4), Some(RowAddress::new_from_parts(0, 4)));

// Check second fragment: row IDs start at 1000.
// Row ID 1000 has offset 0 in fragment 1 -> deleted.
assert_eq!(index.get(rows_per_fragment), None);
// Row ID 1001 has offset 1 in fragment 1 -> address (frag=1, row=1).
assert_eq!(
index.get(rows_per_fragment + 1),
Some(RowAddress::new_from_parts(1, 1))
);

// Last fragment, last non-deleted row.
// Row ID 9999 has offset 999 in fragment 9 -> 999 % 3 == 0 -> deleted.
let last_row = num_fragments as u64 * rows_per_fragment - 1;
assert_eq!(index.get(last_row), None);
// Row ID 9998 has offset 998 -> 998 % 3 == 2 -> not deleted.
assert_eq!(
index.get(last_row - 1),
Some(RowAddress::new_from_parts(num_fragments - 1, 998))
);

// Out of range.
assert_eq!(index.get(last_row + 1), None);
}

proptest::proptest! {
#[test]
fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) {
Expand Down
30 changes: 19 additions & 11 deletions rust/lance/src/dataset/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,25 @@ async fn load_row_id_index(dataset: &Dataset) -> Result<lance_table::rowids::Row
.try_collect::<Vec<_>>()
.await?;

let fragments = dataset.get_fragments();
let fragment_map: std::collections::HashMap<u32, &crate::dataset::fragment::FileFragment> =
fragments.iter().map(|f| (f.id() as u32, f)).collect();

let fragment_indices: Vec<_> =
futures::future::try_join_all(sequences.into_iter().map(|(fragment_id, sequence)| {
let dataset = dataset.clone();
futures::stream::iter(sequences.into_iter().map(|(fragment_id, sequence)| {
let fragment = fragment_map
.get(&fragment_id)
.expect("Fragment should exist");
let has_deletion_file = fragment.metadata().deletion_file.is_some();
let fragment_clone = (*fragment).clone();
async move {
let fragments = dataset.get_fragments();
let fragment = fragments
.iter()
.find(|f| f.id() as u32 == fragment_id)
.expect("Fragment should exist");

let deletion_vector = match fragment.get_deletion_vector().await {
Ok(Some(dv)) => dv,
Ok(None) | Err(_) => Arc::new(DeletionVector::default()),
let deletion_vector = if has_deletion_file {
match fragment_clone.get_deletion_vector().await {
Ok(Some(dv)) => dv,
Ok(None) | Err(_) => Arc::new(DeletionVector::default()),
}
} else {
Arc::new(DeletionVector::default())
};

Ok::<FragmentRowIdIndex, Error>(FragmentRowIdIndex {
Expand All @@ -114,6 +120,8 @@ async fn load_row_id_index(dataset: &Dataset) -> Result<lance_table::rowids::Row
})
}
}))
.buffer_unordered(dataset.object_store.io_parallelism())
.try_collect()
.await?;

let index = RowIdIndex::new(&fragment_indices)?;
Expand Down
Loading