From a28f15970901f0131ba4fd25ba5a1514310eb87a Mon Sep 17 00:00:00 2001 From: Haocheng Liu <30446009+HaochengLIU@users.noreply.github.com> Date: Wed, 5 Nov 2025 11:02:56 -0500 Subject: [PATCH 1/7] fix: Make zonemap handle deletion properly --- rust/lance-index/src/scalar/zonemap.rs | 62 +++-- rust/lance/src/index/scalar.rs | 315 +++++++++++++++++++++++++ rust/lance/src/utils.rs | 3 +- 3 files changed, 364 insertions(+), 16 deletions(-) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 7b6e6078310..e8324d9ab80 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -61,10 +61,11 @@ struct ZoneMapStatistics { // only apply to float type nan_count: u32, fragment_id: u64, - // zone_start is the start row of the zone in the fragment, also known - // as local row offset + // zone_start is the actual first row address (local offset within fragment) zone_start: u64, - zone_length: usize, + // zone_length is the address span: (last_row_addr - first_row_addr + 1) + // AKA offset in the fragment, which allows handling non-contiguous addresses after deletions + zone_length: u64, } impl DeepSizeOf for ZoneMapStatistics { @@ -459,6 +460,7 @@ impl ZoneMapIndex { let max = ScalarValue::try_from_array(max_col, i)?; let null_count = null_count_col.value(i); let nan_count = nan_count_col.value(i); + zones.push(ZoneMapStatistics { min, max, @@ -466,7 +468,7 @@ impl ZoneMapIndex { nan_count, fragment_id: fragment_id_col.value(i), zone_start: zone_start_col.value(i), - zone_length: zone_length.value(i) as usize, + zone_length: zone_length.value(i), }); } @@ -543,9 +545,10 @@ impl ScalarIndex for ZoneMapIndex { // Check if this zone matches the query if self.evaluate_zone_against_query(zone, query)? { // Calculate the range of row addresses for this zone - // Row addresses are: (fragment_id << 32) + zone_start + // zone_length is the address span (not row count), so we can directly use it + // This handles non-contiguous addresses from deletions correctly let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start; - let zone_end_addr = zone_start_addr + (zone.zone_length as u64); + let zone_end_addr = zone_start_addr + zone.zone_length; // Add all row addresses in this zone to the result row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); @@ -669,6 +672,10 @@ pub struct ZoneMapIndexBuilder { // The local offset within the current zone cur_zone_offset: usize, cur_fragment_id: u64, + // Track the actual first and last row addresses in the current zone + // This handles non-contiguous addresses after deletions + cur_zone_first_row_addr: Option, + cur_zone_last_row_addr: Option, min: MinAccumulator, max: MaxAccumulator, @@ -686,6 +693,8 @@ impl ZoneMapIndexBuilder { maps: Vec::new(), cur_zone_offset: 0, cur_fragment_id: 0, + cur_zone_first_row_addr: None, + cur_zone_last_row_addr: None, min, max, null_count: 0, @@ -729,13 +738,15 @@ impl ZoneMapIndexBuilder { } fn new_map(&mut self, fragment_id: u64) -> Result<()> { - // Calculate zone_start based on existing zones in the same fragment - let zone_start = self - .maps - .iter() - .filter(|zone| zone.fragment_id == fragment_id) - .map(|zone| zone.zone_length as u64) - .sum::(); + // Use the actual first and last row addresses we tracked + // zone_length is the address span (last - first + 1), not row count + // This correctly handles non-contiguous addresses after deletions + let zone_start = self.cur_zone_first_row_addr.unwrap_or(0); + let zone_length = self + .cur_zone_last_row_addr + .map(|last_addr| last_addr - zone_start + 1) + .unwrap_or(self.cur_zone_offset as u64); + let new_map = ZoneMapStatistics { min: self.min.evaluate()?, max: self.max.evaluate()?, @@ -743,12 +754,14 @@ impl ZoneMapIndexBuilder { nan_count: self.nan_count, fragment_id, zone_start, - zone_length: self.cur_zone_offset, + zone_length, }; self.maps.push(new_map); self.cur_zone_offset = 0; + self.cur_zone_first_row_addr = None; + self.cur_zone_last_row_addr = None; self.min = MinAccumulator::try_new(&self.items_type)?; self.max = MaxAccumulator::try_new(&self.items_type)?; self.null_count = 0; @@ -807,11 +820,30 @@ impl ZoneMapIndexBuilder { if desired > remaining { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; + + // Track first and last row addresses (local offsets within fragment) + let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; + let last_addr = + row_addrs_array.value(array_offset + remaining - 1) & 0xFFFFFFFF; + if self.cur_zone_first_row_addr.is_none() { + self.cur_zone_first_row_addr = Some(first_addr); + } + self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_offset += remaining; break; } else if desired > 0 { // There is enough data, create a new zone map self.update_stats(&data_array.slice(array_offset, desired))?; + + // Track first and last row addresses (local offsets within fragment) + let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; + let last_addr = row_addrs_array.value(array_offset + desired - 1) & 0xFFFFFFFF; + if self.cur_zone_first_row_addr.is_none() { + self.cur_zone_first_row_addr = Some(first_addr); + } + self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_offset += desired; self.new_map(row_addrs_array.value(array_offset) >> 32)?; } else if desired == 0 { @@ -856,7 +888,7 @@ impl ZoneMapIndexBuilder { UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.fragment_id)); let zone_lengths = - UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_length as u64)); + UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_length)); let zone_starts = UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_start)); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index edc405bec43..24deb973b45 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -1184,4 +1184,319 @@ mod tests { let rows_per_zone = stats["rows_per_zone"].as_u64().unwrap(); assert_eq!(rows_per_zone, 200, "ZoneMap rows_per_zone should be 200"); } + + #[tokio::test] + async fn test_zonemap_deletion_then_index() { + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows: alternating boolean values + // Rows 0,2,4,6,8 have value=true, rows 1,3,5,7,9 have value=false + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_bool(vec![true, false])) + .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .await + .unwrap(); + + // Delete rows where value=false (rows 1, 3, 5, 7, 9) + ds.delete("NOT value").await.unwrap(); + + // Verify data before index creation: should have 5 rows with value=true + let before_index: Vec = ds + .scan() + .filter("value") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let before_ids: Vec = before_index[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + assert_eq!( + before_ids, + vec![0, 2, 4, 6, 8], + "Before index: should have 5 rows" + ); + + // Create zonemap index on "value" column + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Query after index creation - THIS IS WHERE THE BUG MANIFESTS + let after_index: Vec = ds + .scan() + .filter("value") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let after_ids: Vec = after_index[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + println!("Before index: {:?}", before_ids); + println!("After index: {:?}", after_ids); + println!("BUG: Expected [0, 2, 4, 6, 8] but may get only [0, 2, 4]"); + + // This assertion will FAIL if bug #4758 is present + assert_eq!( + after_ids.len(), + 5, + "BUG #4758: Expected 5 rows after index creation, got {}. Only {:?} returned instead of [0, 2, 4, 6, 8]", + after_ids.len(), + after_ids + ); + assert_eq!( + after_ids, + vec![0, 2, 4, 6, 8], + "BUG #4758: Zonemap index with deletions returns wrong results" + ); + } + + #[tokio::test] + async fn test_zonemap_index_then_deletion() { + // Tests the opposite scenario: create index FIRST, then perform deletions + // Verifies that zonemap index properly handles deletions that occur after index creation + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows: alternating boolean values + // Rows 0,2,4,6,8 have value=true, rows 1,3,5,7,9 have value=false + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_bool(vec![true, false])) + .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .await + .unwrap(); + + // Verify initial data: should have 10 rows + let initial_data: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let initial_count: usize = initial_data.iter().map(|b| b.num_rows()).sum(); + assert_eq!(initial_count, 10, "Should start with 10 rows"); + + // CREATE INDEX FIRST (before deletion) + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + println!("Created zonemap index on 'value' column"); + + // Query with index before deletion - should return all 5 rows with value=true + let before_deletion: Vec = ds + .scan() + .filter("value") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let before_deletion_ids: Vec = before_deletion[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + println!("Before deletion (with index): {:?}", before_deletion_ids); + assert_eq!( + before_deletion_ids, + vec![0, 2, 4, 6, 8], + "Before deletion: should return 5 rows with value=true" + ); + + // NOW DELETE rows where value=false (rows 1, 3, 5, 7, 9) + ds.delete("NOT value").await.unwrap(); + println!("Deleted rows where value=false"); + + // Query after deletion - should still return 5 rows with value=true + let after_deletion: Vec = ds + .scan() + .filter("value") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let after_deletion_ids: Vec = after_deletion[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + println!("After deletion (with index): {:?}", after_deletion_ids); + + // Verify we get the correct data after deletion + assert_eq!( + after_deletion_ids.len(), + 5, + "After deletion: Expected 5 rows, got {}", + after_deletion_ids.len() + ); + assert_eq!( + after_deletion_ids, + vec![0, 2, 4, 6, 8], + "After deletion: Should return rows [0, 2, 4, 6, 8] with value=true" + ); + + // Verify the actual values are correct + let after_deletion_values: Vec = after_deletion[0] + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .collect(); + + assert_eq!( + after_deletion_values, + vec![true, true, true, true, true], + "All returned rows should have value=true" + ); + + // Check dataset statistics after deletion + let count_after_deletion = ds.count_rows(None).await.unwrap(); + println!("Dataset count after deletion: {}", count_after_deletion); + + // Count all rows by scanning (to see actual visible rows) + let count_by_scan_all: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let scan_count: usize = count_by_scan_all.iter().map(|b| b.num_rows()).sum(); + println!("Rows visible in scan: {}", scan_count); + + // Debug: What IDs and values are in the unfiltered scan? + if !count_by_scan_all.is_empty() { + let scan_ids_debug: Vec = count_by_scan_all[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + let scan_values_debug: Vec> = count_by_scan_all[0] + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect(); + println!("Unfiltered scan IDs: {:?}", scan_ids_debug); + println!("Unfiltered scan values: {:?}", scan_values_debug); + } + + // Count rows matching "value = true" + let count_true: Vec = ds + .scan() + .filter("value") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let count_true_rows: usize = count_true.iter().map(|b| b.num_rows()).sum(); + println!("Rows with value=true: {}", count_true_rows); + + // Count rows matching "value = false" (should be 0 after deletion) + let count_false: Vec = ds + .scan() + .filter("NOT value") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let count_false_rows: usize = count_false.iter().map(|b| b.num_rows()).sum(); + println!("Rows with value=false: {}", count_false_rows); + + // The key assertions: filtered queries should return correct data + assert_eq!( + count_true_rows, 5, + "Should have exactly 5 rows with value=true" + ); + assert_eq!( + count_false_rows, 0, + "Should have 0 rows with value=false after deletion" + ); + + println!("\n=== Test Results ==="); + println!("✓ Zonemap index correctly filters for value=true: 5 rows"); + println!("✓ Zonemap index correctly filters for value=false: 0 rows (deleted)"); + println!("✓ Filtered query after deletion returns correct IDs: [0, 2, 4, 6, 8]"); + println!("✓ Filtered query after deletion returns correct values: all true"); + + // Note: Unfiltered scans may show different behavior with deletion vectors + // The critical functionality is that filtered queries work correctly + println!("\nNote: Unfiltered scan shows {} rows", scan_count); + println!("This is expected behavior - deletion vectors are applied during filtering"); + println!("✓ Test passed: Index-then-deletion scenario validates correctly"); + } } diff --git a/rust/lance/src/utils.rs b/rust/lance/src/utils.rs index 4ae847cfb9c..e2d162e49d9 100644 --- a/rust/lance/src/utils.rs +++ b/rust/lance/src/utils.rs @@ -5,7 +5,8 @@ pub(crate) mod future; pub(crate) mod temporal; +// Public test utilities module - only available during testing #[cfg(test)] -pub(crate) mod test; +pub mod test; #[cfg(feature = "tensorflow")] pub mod tfrecord; From ad300dea23fbef8b66a2222fd886aee0b86935f7 Mon Sep 17 00:00:00 2001 From: Haocheng Liu <30446009+HaochengLIU@users.noreply.github.com> Date: Wed, 5 Nov 2025 11:20:21 -0500 Subject: [PATCH 2/7] add a python test --- python/python/tests/test_scalar_index.py | 95 ++++++++++++++++-------- 1 file changed, 62 insertions(+), 33 deletions(-) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index f0b7e76743e..433aaa27ffa 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -1689,6 +1689,35 @@ def scan_stats_callback(stats: lance.ScanStatistics): assert small_bytes_read < large_bytes_read +def test_zonemap_deletion_handling(tmp_path: Path): + """Test zonemap deletion handling""" + data = pa.table( + { + "id": range(10), + "value": [True, False] * 5, + } + ) + ds = lance.write_dataset(data, "memory://") + ds.delete("NOT value") + assert ds.to_table(filter="value = True").num_rows == 5 + assert ds.to_table(filter="value = False").num_rows == 0 + ids = ds.to_table(filter="value = True")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + ds.create_scalar_index("value", index_type="zonemap") + ids = ds.to_table(filter="value = True")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + # now create the index before deletion + ds = lance.write_dataset(data, "memory://") + ds.create_scalar_index("value", index_type="zonemap") + ds.delete("NOT value") + assert ds.to_table(filter="value = True").num_rows == 5 + assert ds.to_table(filter="value = False").num_rows == 0 + ids = ds.to_table(filter="value = True")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + def test_bloomfilter_index(tmp_path: Path): """Test create bloomfilter index""" tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"]) @@ -2307,9 +2336,9 @@ def compare_fts_results( if "_rowid" in single_df.columns: single_rowids = set(single_df["_rowid"]) distributed_rowids = set(distributed_df["_rowid"]) - assert single_rowids == distributed_rowids, ( - f"Row ID mismatch: single={single_rowids}, distributed={distributed_rowids}" - ) + assert ( + single_rowids == distributed_rowids + ), f"Row ID mismatch: single={single_rowids}, distributed={distributed_rowids}" # Compare scores with tolerance if "_score" in single_df.columns: @@ -2337,9 +2366,9 @@ def compare_fts_results( ) if isinstance(single_values, set): - assert single_values == distributed_values, ( - f"Column {col} content mismatch" - ) + assert ( + single_values == distributed_values + ), f"Column {col} content mismatch" else: np.testing.assert_array_equal( single_values, @@ -2676,9 +2705,9 @@ def generate_coherent_text(): # Verify we have the expected number of fragments fragments = ds.get_fragments() - assert len(fragments) == num_fragments, ( - f"Expected {num_fragments} fragments, got {len(fragments)}" - ) + assert ( + len(fragments) == num_fragments + ), f"Expected {num_fragments} fragments, got {len(fragments)}" return ds @@ -2714,9 +2743,9 @@ def test_build_distributed_fts_index_basic(tmp_path): break assert distributed_index is not None, "Distributed index not found" - assert distributed_index["type"] == "Inverted", ( - f"Expected Inverted index, got {distributed_index['type']}" - ) + assert ( + distributed_index["type"] == "Inverted" + ), f"Expected Inverted index, got {distributed_index['type']}" # Test that the index works for searching results = distributed_ds.scanner( @@ -2823,9 +2852,9 @@ def test_validate_distributed_fts_basic_search(tmp_path): # Both should have the same number of rows single_rows = results["single_machine"].num_rows distributed_rows = results["distributed"].num_rows - assert single_rows == distributed_rows, ( - f"Row count mismatch: {single_rows} vs {distributed_rows}" - ) + assert ( + single_rows == distributed_rows + ), f"Row count mismatch: {single_rows} vs {distributed_rows}" # Should have found some results for 'frodo' assert single_rows > 0, "No results found for search term 'frodo'" @@ -2853,12 +2882,12 @@ def test_validate_distributed_fts_score_consistency(tmp_path): single_results = results["single_machine"] distributed_results = results["distributed"] - assert "_score" in single_results.column_names, ( - "Missing _score in single machine results" - ) - assert "_score" in distributed_results.column_names, ( - "Missing _score in distributed results" - ) + assert ( + "_score" in single_results.column_names + ), "Missing _score in single machine results" + assert ( + "_score" in distributed_results.column_names + ), "Missing _score in distributed results" # Scores should be very close (within 1e-6 tolerance) single_scores = single_results.column("_score").to_pylist() @@ -2884,9 +2913,9 @@ def test_validate_distributed_fts_empty_results(tmp_path): ) # Both should return empty results - assert results["single_machine"].num_rows == 0, ( - "Single machine should return 0 results" - ) + assert ( + results["single_machine"].num_rows == 0 + ), "Single machine should return 0 results" assert results["distributed"].num_rows == 0, "Distributed should return 0 results" @@ -2912,9 +2941,9 @@ def test_validate_distributed_fts_large_dataset(tmp_path): distributed_rows = results["distributed"].num_rows assert single_rows > 0, "Should find results for 'gandalf'" - assert single_rows == distributed_rows, ( - f"Row count mismatch: {single_rows} vs {distributed_rows}" - ) + assert ( + single_rows == distributed_rows + ), f"Row count mismatch: {single_rows} vs {distributed_rows}" # ============================================================================ @@ -3141,9 +3170,9 @@ def test_distribute_fts_index_build(tmp_path): our_index = idx break assert our_index is not None, f"Index '{index_name}' not found in indices list" - assert our_index["type"] == "Inverted", ( - f"Expected Inverted index, got {our_index['type']}" - ) + assert ( + our_index["type"] == "Inverted" + ), f"Expected Inverted index, got {our_index['type']}" # Test that the index works for searching # Get a sample text from the dataset to search for @@ -3333,9 +3362,9 @@ def test_distribute_btree_index_build(tmp_path): break assert our_index is not None, f"Index '{index_name}' not found in indices list" - assert our_index["type"] == "BTree", ( - f"Expected BTree index, got {our_index['type']}" - ) + assert ( + our_index["type"] == "BTree" + ), f"Expected BTree index, got {our_index['type']}" # Test that the index works for searching # Test exact equality queries From bac5a00bd635b6a2ef272af07f1efe375852971a Mon Sep 17 00:00:00 2001 From: Haocheng Liu <30446009+HaochengLIU@users.noreply.github.com> Date: Wed, 5 Nov 2025 11:49:59 -0500 Subject: [PATCH 3/7] fix bloom filter as well --- python/python/tests/test_scalar_index.py | 161 ++++++---- rust/lance-index/src/scalar/bloomfilter.rs | 60 +++- rust/lance/src/index/scalar.rs | 338 +++++++++++++++++---- rust/lance/src/utils/test.rs | 6 + 4 files changed, 419 insertions(+), 146 deletions(-) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 433aaa27ffa..62bb0254514 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -1718,39 +1718,6 @@ def test_zonemap_deletion_handling(tmp_path: Path): assert ids == [0, 2, 4, 6, 8] -def test_bloomfilter_index(tmp_path: Path): - """Test create bloomfilter index""" - tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"]) - dataset = lance.write_dataset(tbl, tmp_path / "dataset") - dataset.create_scalar_index("values", index_type="BLOOMFILTER") - indices = dataset.list_indices() - assert len(indices) == 1 - - # Get detailed index statistics - index_stats = dataset.stats.index_stats("values_idx") - assert index_stats["index_type"] == "BloomFilter" - assert "indices" in index_stats - assert len(index_stats["indices"]) == 1 - - # Verify bloomfilter statistics - bloom_stats = index_stats["indices"][0] - assert "num_blocks" in bloom_stats - assert bloom_stats["num_blocks"] == 2 - assert bloom_stats["number_of_items"] == 8192 - assert "probability" in bloom_stats - assert bloom_stats["probability"] == 0.00057 # Default probability - - # Test that the bloomfilter index is being used in the query plan - scanner = dataset.scanner(filter="values == 1234", prefilter=True) - plan = scanner.explain_plan() - assert "ScalarIndexQuery" in plan - - # Verify the query returns correct results - result = scanner.to_table() - assert result.num_rows == 1 - assert result["values"][0].as_py() == 1234 - - def test_zonemap_index_remapping(tmp_path: Path): """Test zonemap index remapping after compaction and optimization""" # Create a dataset with 5 fragments by writing data in chunks @@ -1807,6 +1774,68 @@ def test_zonemap_index_remapping(tmp_path: Path): assert result.num_rows == 501 # 1000..1500 inclusive +def test_bloomfilter_index(tmp_path: Path): + """Test create bloomfilter index""" + tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"]) + dataset = lance.write_dataset(tbl, tmp_path / "dataset") + dataset.create_scalar_index("values", index_type="BLOOMFILTER") + indices = dataset.list_indices() + assert len(indices) == 1 + + # Get detailed index statistics + index_stats = dataset.stats.index_stats("values_idx") + assert index_stats["index_type"] == "BloomFilter" + assert "indices" in index_stats + assert len(index_stats["indices"]) == 1 + + # Verify bloomfilter statistics + bloom_stats = index_stats["indices"][0] + assert "num_blocks" in bloom_stats + assert bloom_stats["num_blocks"] == 2 + assert bloom_stats["number_of_items"] == 8192 + assert "probability" in bloom_stats + assert bloom_stats["probability"] == 0.00057 # Default probability + + # Test that the bloomfilter index is being used in the query plan + scanner = dataset.scanner(filter="values == 1234", prefilter=True) + plan = scanner.explain_plan() + assert "ScalarIndexQuery" in plan + + # Verify the query returns correct results + result = scanner.to_table() + assert result.num_rows == 1 + assert result["values"][0].as_py() == 1234 + + +def test_bloomfilter_deletion_handling(tmp_path: Path): + """Test bloomfilter deletion handling""" + data = pa.table( + { + "id": range(10), + "value": [1, 0] * 5, + } + ) + ds = lance.write_dataset(data, "memory://") + ds.delete("value = 0") + assert ds.to_table(filter="value = 1").num_rows == 5 + assert ds.to_table(filter="value = 0").num_rows == 0 + ids = ds.to_table(filter="value = 1")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + ds.create_scalar_index("value", index_type="bloomfilter") + ids = ds.to_table(filter="value = 1")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + # now create the index before deletion + ds = lance.write_dataset(data, "memory://") + ds.create_scalar_index("value", index_type="bloomfilter") + ds.delete("value = 0") + assert ds.to_table(filter="value = 1").num_rows == 5 + assert ds.to_table(filter="value = 0").num_rows == 0 + ids = ds.to_table(filter="value = 1")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + def test_json_index(): vals = ['{"x": 7, "y": 10}', '{"x": 11, "y": 22}', '{"y": 0}', '{"x": 10}'] tbl = pa.table({"jsons": pa.array(vals, pa.json_())}) @@ -2336,9 +2365,9 @@ def compare_fts_results( if "_rowid" in single_df.columns: single_rowids = set(single_df["_rowid"]) distributed_rowids = set(distributed_df["_rowid"]) - assert ( - single_rowids == distributed_rowids - ), f"Row ID mismatch: single={single_rowids}, distributed={distributed_rowids}" + assert single_rowids == distributed_rowids, ( + f"Row ID mismatch: single={single_rowids}, distributed={distributed_rowids}" + ) # Compare scores with tolerance if "_score" in single_df.columns: @@ -2366,9 +2395,9 @@ def compare_fts_results( ) if isinstance(single_values, set): - assert ( - single_values == distributed_values - ), f"Column {col} content mismatch" + assert single_values == distributed_values, ( + f"Column {col} content mismatch" + ) else: np.testing.assert_array_equal( single_values, @@ -2705,9 +2734,9 @@ def generate_coherent_text(): # Verify we have the expected number of fragments fragments = ds.get_fragments() - assert ( - len(fragments) == num_fragments - ), f"Expected {num_fragments} fragments, got {len(fragments)}" + assert len(fragments) == num_fragments, ( + f"Expected {num_fragments} fragments, got {len(fragments)}" + ) return ds @@ -2743,9 +2772,9 @@ def test_build_distributed_fts_index_basic(tmp_path): break assert distributed_index is not None, "Distributed index not found" - assert ( - distributed_index["type"] == "Inverted" - ), f"Expected Inverted index, got {distributed_index['type']}" + assert distributed_index["type"] == "Inverted", ( + f"Expected Inverted index, got {distributed_index['type']}" + ) # Test that the index works for searching results = distributed_ds.scanner( @@ -2852,9 +2881,9 @@ def test_validate_distributed_fts_basic_search(tmp_path): # Both should have the same number of rows single_rows = results["single_machine"].num_rows distributed_rows = results["distributed"].num_rows - assert ( - single_rows == distributed_rows - ), f"Row count mismatch: {single_rows} vs {distributed_rows}" + assert single_rows == distributed_rows, ( + f"Row count mismatch: {single_rows} vs {distributed_rows}" + ) # Should have found some results for 'frodo' assert single_rows > 0, "No results found for search term 'frodo'" @@ -2882,12 +2911,12 @@ def test_validate_distributed_fts_score_consistency(tmp_path): single_results = results["single_machine"] distributed_results = results["distributed"] - assert ( - "_score" in single_results.column_names - ), "Missing _score in single machine results" - assert ( - "_score" in distributed_results.column_names - ), "Missing _score in distributed results" + assert "_score" in single_results.column_names, ( + "Missing _score in single machine results" + ) + assert "_score" in distributed_results.column_names, ( + "Missing _score in distributed results" + ) # Scores should be very close (within 1e-6 tolerance) single_scores = single_results.column("_score").to_pylist() @@ -2913,9 +2942,9 @@ def test_validate_distributed_fts_empty_results(tmp_path): ) # Both should return empty results - assert ( - results["single_machine"].num_rows == 0 - ), "Single machine should return 0 results" + assert results["single_machine"].num_rows == 0, ( + "Single machine should return 0 results" + ) assert results["distributed"].num_rows == 0, "Distributed should return 0 results" @@ -2941,9 +2970,9 @@ def test_validate_distributed_fts_large_dataset(tmp_path): distributed_rows = results["distributed"].num_rows assert single_rows > 0, "Should find results for 'gandalf'" - assert ( - single_rows == distributed_rows - ), f"Row count mismatch: {single_rows} vs {distributed_rows}" + assert single_rows == distributed_rows, ( + f"Row count mismatch: {single_rows} vs {distributed_rows}" + ) # ============================================================================ @@ -3170,9 +3199,9 @@ def test_distribute_fts_index_build(tmp_path): our_index = idx break assert our_index is not None, f"Index '{index_name}' not found in indices list" - assert ( - our_index["type"] == "Inverted" - ), f"Expected Inverted index, got {our_index['type']}" + assert our_index["type"] == "Inverted", ( + f"Expected Inverted index, got {our_index['type']}" + ) # Test that the index works for searching # Get a sample text from the dataset to search for @@ -3362,9 +3391,9 @@ def test_distribute_btree_index_build(tmp_path): break assert our_index is not None, f"Index '{index_name}' not found in indices list" - assert ( - our_index["type"] == "BTree" - ), f"Expected BTree index, got {our_index['type']}" + assert our_index["type"] == "BTree", ( + f"Expected BTree index, got {our_index['type']}" + ) # Test that the index works for searching # Test exact equality queries diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 7fef76136e2..2a2e5a40ca4 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -52,10 +52,11 @@ const BLOOMFILTER_INDEX_VERSION: u32 = 0; #[derive(Debug, Clone)] struct BloomFilterStatistics { fragment_id: u64, - // zone_start is the start row of the zone in the fragment, also known - // as local row offset + // zone_start is the actual first row address (local offset within fragment) zone_start: u64, - zone_length: usize, + // zone_length is the address span: (last_row_addr - first_row_addr + 1) + // AKA offset in the fragment, which allows handling non-contiguous addresses after deletions + zone_length: u64, // Whether this zone contains any null values has_null: bool, // The actual bloom filter (SBBF) for efficient querying @@ -231,7 +232,7 @@ impl BloomFilterIndex { blocks.push(BloomFilterStatistics { fragment_id: fragment_id_col.value(i), zone_start: zone_start_col.value(i), - zone_length: zone_length_col.value(i) as usize, + zone_length: zone_length_col.value(i), has_null: has_null_col.value(i), bloom_filter, }); @@ -470,9 +471,10 @@ impl ScalarIndex for BloomFilterIndex { for block in self.zones.iter() { if self.evaluate_block_against_query(block, query)? { // Calculate the range of row addresses for this zone - // Row addresses are: (fragment_id << 32) + zone_start + // zone_length is the address span (not row count), so we can directly use it + // This handles non-contiguous addresses from deletions correctly let zone_start_addr = (block.fragment_id << 32) + block.zone_start; - let zone_end_addr = zone_start_addr + (block.zone_length as u64); + let zone_end_addr = zone_start_addr + block.zone_length; // Add all row addresses in this zone to the result row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); @@ -619,6 +621,10 @@ pub struct BloomFilterIndexBuilder { // The local offset within the current zones cur_zone_offset: usize, cur_fragment_id: u64, + // Track the actual first and last row addresses in the current zone + // This handles non-contiguous addresses after deletions + cur_zone_first_row_addr: Option, + cur_zone_last_row_addr: Option, cur_zone_has_null: bool, sbbf: Option, } @@ -639,6 +645,8 @@ impl BloomFilterIndexBuilder { blocks: Vec::new(), cur_zone_offset: 0, cur_fragment_id: 0, + cur_zone_first_row_addr: None, + cur_zone_last_row_addr: None, cur_zone_has_null: false, sbbf: Some(sbbf), }) @@ -922,13 +930,14 @@ impl BloomFilterIndexBuilder { } fn new_block(&mut self, fragment_id: u64) -> Result<()> { - // Calculate zone_start based on existing zones in the same fragment - let zone_start = self - .blocks - .iter() - .filter(|block| block.fragment_id == fragment_id) - .map(|block| block.zone_length as u64) - .sum::(); + // Use the actual first and last row addresses we tracked + // zone_length is the address span (last - first + 1), not row count + // This correctly handles non-contiguous addresses after deletions + let zone_start = self.cur_zone_first_row_addr.unwrap_or(0); + let zone_length = self + .cur_zone_last_row_addr + .map(|last_addr| last_addr - zone_start + 1) + .unwrap_or(self.cur_zone_offset as u64); // Store the current bloom filter directly let bloom_filter = if let Some(ref sbbf) = self.sbbf { @@ -948,13 +957,15 @@ impl BloomFilterIndexBuilder { let new_block = BloomFilterStatistics { fragment_id, zone_start, - zone_length: self.cur_zone_offset, + zone_length, has_null: self.cur_zone_has_null, bloom_filter, }; self.blocks.push(new_block); self.cur_zone_offset = 0; + self.cur_zone_first_row_addr = None; + self.cur_zone_last_row_addr = None; self.cur_zone_has_null = false; // Reset sbbf for the next block @@ -1023,11 +1034,30 @@ impl BloomFilterIndexBuilder { if desired > remaining { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; + + // Track first and last row addresses (local offsets within fragment) + let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; + let last_addr = + row_addrs_array.value(array_offset + remaining - 1) & 0xFFFFFFFF; + if self.cur_zone_first_row_addr.is_none() { + self.cur_zone_first_row_addr = Some(first_addr); + } + self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_offset += remaining; break; } else if desired > 0 { // There is enough data, create a new zone self.update_stats(&data_array.slice(array_offset, desired))?; + + // Track first and last row addresses (local offsets within fragment) + let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; + let last_addr = row_addrs_array.value(array_offset + desired - 1) & 0xFFFFFFFF; + if self.cur_zone_first_row_addr.is_none() { + self.cur_zone_first_row_addr = Some(first_addr); + } + self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_offset += desired; self.new_block(row_addrs_array.value(array_offset) >> 32)?; } else if desired == 0 { @@ -1059,7 +1089,7 @@ impl BloomFilterIndexBuilder { UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_start)); let zone_lengths = - UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length as u64)); + UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length)); let has_nulls = arrow_array::BooleanArray::from( self.blocks diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 24deb973b45..47c54feca6e 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -1262,22 +1262,18 @@ mod tests { .copied() .collect(); - println!("Before index: {:?}", before_ids); - println!("After index: {:?}", after_ids); - println!("BUG: Expected [0, 2, 4, 6, 8] but may get only [0, 2, 4]"); - // This assertion will FAIL if bug #4758 is present assert_eq!( after_ids.len(), 5, - "BUG #4758: Expected 5 rows after index creation, got {}. Only {:?} returned instead of [0, 2, 4, 6, 8]", + "Expected 5 rows after index creation, got {}. Only {:?} returned instead of [0, 2, 4, 6, 8]", after_ids.len(), after_ids ); assert_eq!( after_ids, vec![0, 2, 4, 6, 8], - "BUG #4758: Zonemap index with deletions returns wrong results" + "Zonemap index with deletions returns wrong results" ); } @@ -1318,8 +1314,6 @@ mod tests { .await .unwrap(); - println!("Created zonemap index on 'value' column"); - // Query with index before deletion - should return all 5 rows with value=true let before_deletion: Vec = ds .scan() @@ -1343,7 +1337,6 @@ mod tests { .copied() .collect(); - println!("Before deletion (with index): {:?}", before_deletion_ids); assert_eq!( before_deletion_ids, vec![0, 2, 4, 6, 8], @@ -1352,7 +1345,6 @@ mod tests { // NOW DELETE rows where value=false (rows 1, 3, 5, 7, 9) ds.delete("NOT value").await.unwrap(); - println!("Deleted rows where value=false"); // Query after deletion - should still return 5 rows with value=true let after_deletion: Vec = ds @@ -1377,8 +1369,6 @@ mod tests { .copied() .collect(); - println!("After deletion (with index): {:?}", after_deletion_ids); - // Verify we get the correct data after deletion assert_eq!( after_deletion_ids.len(), @@ -1409,46 +1399,6 @@ mod tests { "All returned rows should have value=true" ); - // Check dataset statistics after deletion - let count_after_deletion = ds.count_rows(None).await.unwrap(); - println!("Dataset count after deletion: {}", count_after_deletion); - - // Count all rows by scanning (to see actual visible rows) - let count_by_scan_all: Vec = ds - .scan() - .try_into_stream() - .await - .unwrap() - .try_collect() - .await - .unwrap(); - let scan_count: usize = count_by_scan_all.iter().map(|b| b.num_rows()).sum(); - println!("Rows visible in scan: {}", scan_count); - - // Debug: What IDs and values are in the unfiltered scan? - if !count_by_scan_all.is_empty() { - let scan_ids_debug: Vec = count_by_scan_all[0] - .column_by_name("id") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter() - .copied() - .collect(); - let scan_values_debug: Vec> = count_by_scan_all[0] - .column_by_name("value") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect(); - println!("Unfiltered scan IDs: {:?}", scan_ids_debug); - println!("Unfiltered scan values: {:?}", scan_values_debug); - } - // Count rows matching "value = true" let count_true: Vec = ds .scan() @@ -1461,7 +1411,6 @@ mod tests { .await .unwrap(); let count_true_rows: usize = count_true.iter().map(|b| b.num_rows()).sum(); - println!("Rows with value=true: {}", count_true_rows); // Count rows matching "value = false" (should be 0 after deletion) let count_false: Vec = ds @@ -1475,7 +1424,6 @@ mod tests { .await .unwrap(); let count_false_rows: usize = count_false.iter().map(|b| b.num_rows()).sum(); - println!("Rows with value=false: {}", count_false_rows); // The key assertions: filtered queries should return correct data assert_eq!( @@ -1486,17 +1434,277 @@ mod tests { count_false_rows, 0, "Should have 0 rows with value=false after deletion" ); + } + + #[tokio::test] + async fn test_bloomfilter_deletion_then_index() { + // Reproduces the same bug as #4758 but for bloom filter indexes + // After deleting rows and creating a bloom filter index, queries return fewer results than expected + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows: alternating string values "apple" and "banana" + // Rows 0,2,4,6,8 have value="apple", rows 1,3,5,7,9 have value="banana" + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_utf8_literals(&["apple", "banana"])) + .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .await + .unwrap(); + + // Delete rows where value="banana" (rows 1, 3, 5, 7, 9) + ds.delete("value = 'banana'").await.unwrap(); + + // Verify data before index creation: should have 5 rows with value="apple" + let before_index: Vec = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let before_ids: Vec = before_index[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + assert_eq!( + before_ids, + vec![0, 2, 4, 6, 8], + "Before index: should have 5 rows" + ); + + // Create bloom filter index on "value" column with small zone size to ensure the bug is triggered + #[derive(serde::Serialize)] + struct BloomParams { + number_of_items: u64, + probability: f64, + } + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter).with_params( + &BloomParams { + number_of_items: 5, // Small zone size to ensure multiple zones + probability: 0.01, + }, + ); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + let after_index: Vec = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let after_ids: Vec = after_index[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + // This assertion verifies the fix works + assert_eq!( + after_ids.len(), + 5, + "Expected 5 rows after index creation, got {}. Only {:?} returned instead of [0, 2, 4, 6, 8]", + after_ids.len(), + after_ids + ); + assert_eq!( + after_ids, + vec![0, 2, 4, 6, 8], + "Bloom filter index with deletions returns wrong results" + ); + } + + #[tokio::test] + async fn test_bloomfilter_index_then_deletion() { + // Tests the opposite scenario: create bloom filter index FIRST, then perform deletions + // Verifies that bloom filter index properly handles deletions that occur after index creation + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; - println!("\n=== Test Results ==="); - println!("✓ Zonemap index correctly filters for value=true: 5 rows"); - println!("✓ Zonemap index correctly filters for value=false: 0 rows (deleted)"); - println!("✓ Filtered query after deletion returns correct IDs: [0, 2, 4, 6, 8]"); - println!("✓ Filtered query after deletion returns correct values: all true"); - - // Note: Unfiltered scans may show different behavior with deletion vectors - // The critical functionality is that filtered queries work correctly - println!("\nNote: Unfiltered scan shows {} rows", scan_count); - println!("This is expected behavior - deletion vectors are applied during filtering"); - println!("✓ Test passed: Index-then-deletion scenario validates correctly"); + // Create dataset with 10 rows: alternating string values "apple" and "banana" + // Rows 0,2,4,6,8 have value="apple", rows 1,3,5,7,9 have value="banana" + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_utf8_literals(&["apple", "banana"])) + .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .await + .unwrap(); + + // Verify initial data: should have 10 rows + let initial_data: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let initial_count: usize = initial_data.iter().map(|b| b.num_rows()).sum(); + assert_eq!(initial_count, 10, "Should start with 10 rows"); + + // CREATE INDEX FIRST (before deletion) + #[derive(serde::Serialize)] + struct BloomParams { + number_of_items: u64, + probability: f64, + } + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter).with_params( + &BloomParams { + number_of_items: 5, // Small zone size to ensure multiple zones + probability: 0.01, + }, + ); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Query with index before deletion - should return all 5 rows with value="apple" + let before_deletion: Vec = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let before_deletion_ids: Vec = before_deletion[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + assert_eq!( + before_deletion_ids, + vec![0, 2, 4, 6, 8], + "Before deletion: should return 5 rows with value='apple'" + ); + + // NOW DELETE rows where value="banana" (rows 1, 3, 5, 7, 9) + ds.delete("value = 'banana'").await.unwrap(); + + // Query after deletion - should still return 5 rows with value="apple" + let after_deletion: Vec = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let after_deletion_ids: Vec = after_deletion[0] + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect(); + + // Verify we get the correct data after deletion + assert_eq!( + after_deletion_ids.len(), + 5, + "After deletion: Expected 5 rows, got {}", + after_deletion_ids.len() + ); + assert_eq!( + after_deletion_ids, + vec![0, 2, 4, 6, 8], + "After deletion: Should return rows [0, 2, 4, 6, 8] with value='apple'" + ); + + // Verify the actual values are correct + let after_deletion_values: Vec<&str> = after_deletion[0] + .column_by_name("value") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .collect(); + + assert_eq!( + after_deletion_values, + vec!["apple", "apple", "apple", "apple", "apple"], + "All returned rows should have value='apple'" + ); + + // Count rows matching "value = 'apple'" + let count_apple: Vec = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let count_apple_rows: usize = count_apple.iter().map(|b| b.num_rows()).sum(); + + // Count rows matching "value = 'banana'" (should be 0 after deletion) + let count_banana: Vec = ds + .scan() + .filter("value = 'banana'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let count_banana_rows: usize = count_banana.iter().map(|b| b.num_rows()).sum(); + + // The key assertions: filtered queries should return correct data + assert_eq!( + count_apple_rows, 5, + "Should have exactly 5 rows with value='apple'" + ); + assert_eq!( + count_banana_rows, 0, + "Should have 0 rows with value='banana' after deletion" + ); } } diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6af1158fe26..6534f01d9f7 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -363,6 +363,12 @@ pub struct NoContextTestFixture { pub dataset: Dataset, } +impl Default for NoContextTestFixture { + fn default() -> Self { + Self::new() + } +} + impl NoContextTestFixture { pub fn new() -> Self { let runtime = tokio::runtime::Builder::new_current_thread() From d8b6d26e265130daebcf1e17a8a4352091cfee5e Mon Sep 17 00:00:00 2001 From: Haocheng Liu <30446009+HaochengLIU@users.noreply.github.com> Date: Mon, 10 Nov 2025 19:43:12 -0500 Subject: [PATCH 4/7] Address first round of PR review --- rust/lance-index/src/scalar/bloomfilter.rs | 139 ++++++++------ rust/lance-index/src/scalar/zonemap.rs | 142 ++++++++------ rust/lance/src/index/scalar.rs | 212 ++++++--------------- rust/lance/src/utils.rs | 3 +- 4 files changed, 223 insertions(+), 273 deletions(-) diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 2a2e5a40ca4..3108eda4e0b 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -16,7 +16,8 @@ use crate::scalar::{ BloomFilterQuery, BuiltinIndexType, CreatedIndex, ScalarIndexParams, UpdateCriteria, }; use crate::{pb, Any}; -use arrow_array::{Array, UInt64Array}; +use arrow_array::{Array, UInt32Array}; +use lance_core::utils::address::RowAddress; use lance_core::utils::mask::RowIdTreeMap; use lance_core::ROW_ADDR; use lance_datafusion::chunker::chunk_concat_stream; @@ -49,14 +50,30 @@ const BLOOMFILTER_ITEM_META_KEY: &str = "bloomfilter_item"; const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability"; const BLOOMFILTER_INDEX_VERSION: u32 = 0; +// +// Example: Suppose we have two fragments, each with 4 rows. Now building the bloom filter index: +// Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 0, 1, 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3 +// +// Example: Suppose we have two fragments, each with 4 rows. Deletion is 0 index based. +// We delete the 0th and 1st row in fragment 0, and the 1st and 2nd row in fragment 1, +// now building the bloom filter index: +// Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 3 #[derive(Debug, Clone)] struct BloomFilterStatistics { - fragment_id: u64, - // zone_start is the actual first row address (local offset within fragment) - zone_start: u64, - // zone_length is the address span: (last_row_addr - first_row_addr + 1) - // AKA offset in the fragment, which allows handling non-contiguous addresses after deletions - zone_length: u64, + fragment_id: u32, + // zone_start is start row of the zone in the fragment, also known + // as the local offset. To get the actual first row address, + // you can do `fragment_id << 32 + zone_start` + zone_start: u32, + // zone_length is the `row address span` between the first and the last row in the current SBBF block + // calculated as: (last_row_addr - first_row_addr + 1) + zone_length: u32, // Whether this zone contains any null values has_null: bool, // The actual bloom filter (SBBF) for efficient querying @@ -141,10 +158,10 @@ impl BloomFilterIndex { ) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "BloomFilterIndex: 'fragment_id' column is not UInt64", + "BloomFilterIndex: 'fragment_id' column is not UInt32", location!(), ) })?; @@ -155,10 +172,10 @@ impl BloomFilterIndex { Error::invalid_input("BloomFilterIndex: missing 'zone_start' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "BloomFilterIndex: 'zone_start' column is not UInt64", + "BloomFilterIndex: 'zone_start' column is not UInt32", location!(), ) })?; @@ -172,10 +189,10 @@ impl BloomFilterIndex { ) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "BloomFilterIndex: 'zone_length' column is not UInt64", + "BloomFilterIndex: 'zone_length' column is not UInt32", location!(), ) })?; @@ -448,7 +465,7 @@ impl Index for BloomFilterIndex { // Loop through zones and add unique fragment IDs to the bitmap for block in &self.zones { - frag_ids.insert(block.fragment_id as u32); + frag_ids.insert(block.fragment_id); } Ok(frag_ids) @@ -473,8 +490,8 @@ impl ScalarIndex for BloomFilterIndex { // Calculate the range of row addresses for this zone // zone_length is the address span (not row count), so we can directly use it // This handles non-contiguous addresses from deletions correctly - let zone_start_addr = (block.fragment_id << 32) + block.zone_start; - let zone_end_addr = zone_start_addr + block.zone_length; + let zone_start_addr = ((block.fragment_id as u64) << 32) + block.zone_start as u64; + let zone_end_addr = zone_start_addr + block.zone_length as u64; // Add all row addresses in this zone to the result row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); @@ -620,11 +637,11 @@ pub struct BloomFilterIndexBuilder { blocks: Vec, // The local offset within the current zones cur_zone_offset: usize, - cur_fragment_id: u64, - // Track the actual first and last row addresses in the current zone - // This handles non-contiguous addresses after deletions - cur_zone_first_row_addr: Option, - cur_zone_last_row_addr: Option, + cur_fragment_id: u32, + // Track the actual first and last row offsets in the current zone + // This handles non-contiguous offsets after deletions + cur_zone_first_row_offset: Option, + cur_zone_last_row_offset: Option, cur_zone_has_null: bool, sbbf: Option, } @@ -645,8 +662,8 @@ impl BloomFilterIndexBuilder { blocks: Vec::new(), cur_zone_offset: 0, cur_fragment_id: 0, - cur_zone_first_row_addr: None, - cur_zone_last_row_addr: None, + cur_zone_first_row_offset: None, + cur_zone_last_row_offset: None, cur_zone_has_null: false, sbbf: Some(sbbf), }) @@ -929,15 +946,15 @@ impl BloomFilterIndexBuilder { Ok(()) } - fn new_block(&mut self, fragment_id: u64) -> Result<()> { - // Use the actual first and last row addresses we tracked - // zone_length is the address span (last - first + 1), not row count - // This correctly handles non-contiguous addresses after deletions - let zone_start = self.cur_zone_first_row_addr.unwrap_or(0); + fn new_block(&mut self, fragment_id: u32) -> Result<()> { + // Use the actual first and last row offsets we tracked + // zone_length is the offset span (last - first + 1), not row count + // This correctly handles non-contiguous offsets after deletions + let zone_start = self.cur_zone_first_row_offset.unwrap_or(0); let zone_length = self - .cur_zone_last_row_addr - .map(|last_addr| last_addr - zone_start + 1) - .unwrap_or(self.cur_zone_offset as u64); + .cur_zone_last_row_offset + .map(|last_offset| last_offset - zone_start + 1) + .unwrap_or(self.cur_zone_offset as u32); // Store the current bloom filter directly let bloom_filter = if let Some(ref sbbf) = self.sbbf { @@ -964,8 +981,8 @@ impl BloomFilterIndexBuilder { self.blocks.push(new_block); self.cur_zone_offset = 0; - self.cur_zone_first_row_addr = None; - self.cur_zone_last_row_addr = None; + self.cur_zone_first_row_offset = None; + self.cur_zone_last_row_offset = None; self.cur_zone_has_null = false; // Reset sbbf for the next block @@ -1008,14 +1025,14 @@ impl BloomFilterIndexBuilder { // Initialize cur_fragment_id from the first row address if this is the first batch if self.blocks.is_empty() && self.cur_zone_offset == 0 { let first_row_addr = row_addrs_array.value(0); - self.cur_fragment_id = first_row_addr >> 32; + self.cur_fragment_id = (first_row_addr >> 32) as u32; } while remaining > 0 { // Find the next fragment boundary in this batch let next_fragment_index = (array_offset..row_addrs_array.len()).find(|&i| { let row_addr = row_addrs_array.value(i); - let fragment_id = row_addr >> 32; + let fragment_id = (row_addr >> 32) as u32; fragment_id == self.cur_fragment_id + 1 }); let empty_rows_left_in_cur_zone: usize = @@ -1023,7 +1040,7 @@ impl BloomFilterIndexBuilder { // Check if there is enough data from the current fragment to fill the current zone let desired = if let Some(idx) = next_fragment_index { - self.cur_fragment_id = row_addrs_array.value(idx) >> 32; + self.cur_fragment_id = (row_addrs_array.value(idx) >> 32) as u32; // Take the minimum between distance to boundary and space left in zone // to ensure we don't exceed the zone size limit std::cmp::min(idx - array_offset, empty_rows_left_in_cur_zone) @@ -1035,14 +1052,17 @@ impl BloomFilterIndexBuilder { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; - // Track first and last row addresses (local offsets within fragment) - let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; - let last_addr = - row_addrs_array.value(array_offset + remaining - 1) & 0xFFFFFFFF; - if self.cur_zone_first_row_addr.is_none() { - self.cur_zone_first_row_addr = Some(first_addr); + // Track first and last row offsets using RowAddress::new_from_u64 + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_offset = RowAddress::new_from_u64( + row_addrs_array.value(array_offset + remaining - 1), + ) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); } - self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_last_row_offset = Some(last_offset); self.cur_zone_offset += remaining; break; @@ -1050,20 +1070,23 @@ impl BloomFilterIndexBuilder { // There is enough data, create a new zone self.update_stats(&data_array.slice(array_offset, desired))?; - // Track first and last row addresses (local offsets within fragment) - let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; - let last_addr = row_addrs_array.value(array_offset + desired - 1) & 0xFFFFFFFF; - if self.cur_zone_first_row_addr.is_none() { - self.cur_zone_first_row_addr = Some(first_addr); + // Track first and last row offsets (local offsets within fragment) + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset + desired - 1)) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); } - self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_last_row_offset = Some(last_offset); self.cur_zone_offset += desired; - self.new_block(row_addrs_array.value(array_offset) >> 32)?; + self.new_block((row_addrs_array.value(array_offset) >> 32) as u32)?; } else if desired == 0 { // The new batch starts with a new fragment. Flush the current zone if it's not empty if self.cur_zone_offset > 0 { - self.new_block(self.cur_fragment_id - 1)?; + self.new_block(self.cur_fragment_id.wrapping_sub(1))?; } // Let the loop run again // to find the next fragment boundary @@ -1083,13 +1106,13 @@ impl BloomFilterIndexBuilder { fn bloomfilter_stats_as_batch(&self) -> Result { let fragment_ids = - UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.fragment_id)); + UInt32Array::from_iter_values(self.blocks.iter().map(|block| block.fragment_id)); let zone_starts = - UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_start)); + UInt32Array::from_iter_values(self.blocks.iter().map(|block| block.zone_start)); let zone_lengths = - UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length)); + UInt32Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length)); let has_nulls = arrow_array::BooleanArray::from( self.blocks @@ -1115,9 +1138,9 @@ impl BloomFilterIndexBuilder { }; let schema = Arc::new(arrow_schema::Schema::new(vec![ - Field::new("fragment_id", DataType::UInt64, false), - Field::new("zone_start", DataType::UInt64, false), - Field::new("zone_length", DataType::UInt64, false), + Field::new("fragment_id", DataType::UInt32, false), + Field::new("zone_start", DataType::UInt32, false), + Field::new("zone_length", DataType::UInt32, false), Field::new("has_null", DataType::Boolean, false), Field::new("bloom_filter_data", DataType::Binary, false), ])); @@ -1709,7 +1732,7 @@ mod tests { // Verify zone structure for (i, block) in index.zones.iter().enumerate() { assert_eq!(block.fragment_id, 0); - assert_eq!(block.zone_start, (i * 1000) as u64); + assert_eq!(block.zone_start, (i * 1000) as u32); assert_eq!(block.zone_length, 1000); // Check that the bloom filter has some data (non-zero bytes when serialized) assert!(!block.bloom_filter.to_bytes().is_empty()); diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index e8324d9ab80..562cbf78c09 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -30,7 +30,7 @@ use lance_datafusion::chunker::chunk_concat_stream; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; -use arrow_array::{new_empty_array, ArrayRef, RecordBatch, UInt32Array, UInt64Array}; +use arrow_array::{new_empty_array, ArrayRef, RecordBatch, UInt32Array}; use arrow_schema::{DataType, Field}; use datafusion::execution::SendableRecordBatchStream; use datafusion_common::ScalarValue; @@ -43,7 +43,7 @@ use crate::{Index, IndexType}; use async_trait::async_trait; use deepsize::DeepSizeOf; use lance_core::Result; -use lance_core::{utils::mask::RowIdTreeMap, Error}; +use lance_core::{utils::address::RowAddress, utils::mask::RowIdTreeMap, Error}; use roaring::RoaringBitmap; use snafu::location; const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches @@ -52,6 +52,20 @@ const ZONEMAP_FILENAME: &str = "zonemap.lance"; const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; const ZONEMAP_INDEX_VERSION: u32 = 0; +// +// Example: Suppose we have two fragments, each with 4 rows. Now building the zonemap index: +// Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 0, 1, 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3 +// +// Example: Suppose we have two fragments, each with 4 rows. Deletion is 0 index based. +// We delete the 0th and 1st row in fragment 0, and the 1st and 2nd row in fragment 1, +// now building the zonemap index: +// Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 3 /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] struct ZoneMapStatistics { @@ -60,12 +74,14 @@ struct ZoneMapStatistics { null_count: u32, // only apply to float type nan_count: u32, - fragment_id: u64, - // zone_start is the actual first row address (local offset within fragment) - zone_start: u64, - // zone_length is the address span: (last_row_addr - first_row_addr + 1) - // AKA offset in the fragment, which allows handling non-contiguous addresses after deletions - zone_length: u64, + fragment_id: u32, + // zone_start is start row of the zone in the fragment, also known + // as the local offset. To get the actual first row address, + // you can do `fragment_id << 32 + zone_start` + zone_start: u32, + // zone_length is the `row address span` between the first and the last row in the zone + // calculated as: (last_row_addr - first_row_addr + 1) + zone_length: u32, } impl DeepSizeOf for ZoneMapStatistics { @@ -403,10 +419,10 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: missing 'zone_length' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'zone_length' column is not Uint64", + "ZoneMapIndex: 'zone_length' column is not UInt32", location!(), ) })?; @@ -417,10 +433,10 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: missing 'fragment_id' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'fragment_id' column is not UInt64", + "ZoneMapIndex: 'fragment_id' column is not UInt32", location!(), ) })?; @@ -431,10 +447,10 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: missing 'zone_start' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'zone_start' column is not UInt64", + "ZoneMapIndex: 'zone_start' column is not UInt32", location!(), ) })?; @@ -521,7 +537,7 @@ impl Index for ZoneMapIndex { // Loop through zones and add unique fragment IDs to the bitmap for zone in &self.zones { - frag_ids.insert(zone.fragment_id as u32); + frag_ids.insert(zone.fragment_id); } Ok(frag_ids) @@ -545,10 +561,8 @@ impl ScalarIndex for ZoneMapIndex { // Check if this zone matches the query if self.evaluate_zone_against_query(zone, query)? { // Calculate the range of row addresses for this zone - // zone_length is the address span (not row count), so we can directly use it - // This handles non-contiguous addresses from deletions correctly - let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start; - let zone_end_addr = zone_start_addr + zone.zone_length; + let zone_start_addr = ((zone.fragment_id as u64) << 32) + zone.zone_start as u64; + let zone_end_addr = zone_start_addr + zone.zone_length as u64; // Add all row addresses in this zone to the result row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); @@ -671,11 +685,11 @@ pub struct ZoneMapIndexBuilder { maps: Vec, // The local offset within the current zone cur_zone_offset: usize, - cur_fragment_id: u64, - // Track the actual first and last row addresses in the current zone - // This handles non-contiguous addresses after deletions - cur_zone_first_row_addr: Option, - cur_zone_last_row_addr: Option, + cur_fragment_id: u32, + // Track the actual first and last row offsets in the current zone + // This handles non-contiguous offsets after deletions + cur_zone_first_row_offset: Option, + cur_zone_last_row_offset: Option, min: MinAccumulator, max: MaxAccumulator, @@ -693,8 +707,8 @@ impl ZoneMapIndexBuilder { maps: Vec::new(), cur_zone_offset: 0, cur_fragment_id: 0, - cur_zone_first_row_addr: None, - cur_zone_last_row_addr: None, + cur_zone_first_row_offset: None, + cur_zone_last_row_offset: None, min, max, null_count: 0, @@ -737,15 +751,15 @@ impl ZoneMapIndexBuilder { Ok(()) } - fn new_map(&mut self, fragment_id: u64) -> Result<()> { - // Use the actual first and last row addresses we tracked - // zone_length is the address span (last - first + 1), not row count - // This correctly handles non-contiguous addresses after deletions - let zone_start = self.cur_zone_first_row_addr.unwrap_or(0); + fn new_map(&mut self, fragment_id: u32) -> Result<()> { + // Use the actual first and last row offsets we tracked + // zone_length is the offset span (last - first + 1), not row count + // This correctly handles non-contiguous offsets after deletions + let zone_start = self.cur_zone_first_row_offset.unwrap_or(0); let zone_length = self - .cur_zone_last_row_addr - .map(|last_addr| last_addr - zone_start + 1) - .unwrap_or(self.cur_zone_offset as u64); + .cur_zone_last_row_offset + .map(|last_row_offset| last_row_offset - zone_start + 1) + .unwrap_or(self.cur_zone_offset as u32); let new_map = ZoneMapStatistics { min: self.min.evaluate()?, @@ -760,8 +774,8 @@ impl ZoneMapIndexBuilder { self.maps.push(new_map); self.cur_zone_offset = 0; - self.cur_zone_first_row_addr = None; - self.cur_zone_last_row_addr = None; + self.cur_zone_first_row_offset = None; + self.cur_zone_last_row_offset = None; self.min = MinAccumulator::try_new(&self.items_type)?; self.max = MaxAccumulator::try_new(&self.items_type)?; self.null_count = 0; @@ -794,14 +808,14 @@ impl ZoneMapIndexBuilder { // Initialize cur_fragment_id from the first row address if this is the first batch if self.maps.is_empty() && self.cur_zone_offset == 0 { let first_row_addr = row_addrs_array.value(0); - self.cur_fragment_id = first_row_addr >> 32; + self.cur_fragment_id = (first_row_addr >> 32) as u32; } while remaining > 0 { // Find the next fragment boundary in this batch let next_fragment_index = (array_offset..row_addrs_array.len()).find(|&i| { let row_addr = row_addrs_array.value(i); - let fragment_id = row_addr >> 32; + let fragment_id = (row_addr >> 32) as u32; fragment_id == self.cur_fragment_id + 1 }); let empty_rows_left_in_cur_zone: usize = @@ -809,7 +823,7 @@ impl ZoneMapIndexBuilder { // Check if there is enough data from the current fragment to fill the current zone let desired = if let Some(idx) = next_fragment_index { - self.cur_fragment_id = row_addrs_array.value(idx) >> 32; + self.cur_fragment_id = (row_addrs_array.value(idx) >> 32) as u32; // Take the minimum between distance to boundary and space left in zone // to ensure we don't exceed the zone size limit std::cmp::min(idx - array_offset, empty_rows_left_in_cur_zone) @@ -821,14 +835,17 @@ impl ZoneMapIndexBuilder { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; - // Track first and last row addresses (local offsets within fragment) - let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; - let last_addr = - row_addrs_array.value(array_offset + remaining - 1) & 0xFFFFFFFF; - if self.cur_zone_first_row_addr.is_none() { - self.cur_zone_first_row_addr = Some(first_addr); + // Track first and last row offsets (local offsets within fragment) + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_row_offset = RowAddress::new_from_u64( + row_addrs_array.value(array_offset + remaining - 1), + ) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); } - self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_last_row_offset = Some(last_row_offset); self.cur_zone_offset += remaining; break; @@ -836,20 +853,23 @@ impl ZoneMapIndexBuilder { // There is enough data, create a new zone map self.update_stats(&data_array.slice(array_offset, desired))?; - // Track first and last row addresses (local offsets within fragment) - let first_addr = row_addrs_array.value(array_offset) & 0xFFFFFFFF; - let last_addr = row_addrs_array.value(array_offset + desired - 1) & 0xFFFFFFFF; - if self.cur_zone_first_row_addr.is_none() { - self.cur_zone_first_row_addr = Some(first_addr); + // Track first and last row offsets + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset + desired - 1)) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); } - self.cur_zone_last_row_addr = Some(last_addr); + self.cur_zone_last_row_offset = Some(last_row_offset); self.cur_zone_offset += desired; - self.new_map(row_addrs_array.value(array_offset) >> 32)?; + self.new_map((row_addrs_array.value(array_offset) >> 32) as u32)?; } else if desired == 0 { // The new batch starts with a new fragment. Flush the current zone if it's not empty if self.cur_zone_offset > 0 { - self.new_map(self.cur_fragment_id - 1)?; + self.new_map(self.cur_fragment_id.wrapping_sub(1))?; } // Let the loop run again // to find the next fragment boundary @@ -885,13 +905,13 @@ impl ZoneMapIndexBuilder { let nan_counts = UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.nan_count)); let fragment_ids = - UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.fragment_id)); + UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.fragment_id)); let zone_lengths = - UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_length)); + UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_length)); let zone_starts = - UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_start)); + UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_start)); let schema = Arc::new(arrow_schema::Schema::new(vec![ // min and max can be null if the entire batch is null values @@ -899,9 +919,9 @@ impl ZoneMapIndexBuilder { Field::new("max", self.items_type.clone(), true), Field::new("null_count", DataType::UInt32, false), Field::new("nan_count", DataType::UInt32, false), - Field::new("fragment_id", DataType::UInt64, false), - Field::new("zone_start", DataType::UInt64, false), - Field::new("zone_length", DataType::UInt64, false), + Field::new("fragment_id", DataType::UInt32, false), + Field::new("zone_start", DataType::UInt32, false), + Field::new("zone_length", DataType::UInt32, false), ])); let columns: Vec = vec![ @@ -1190,7 +1210,7 @@ mod tests { assert_eq!(zone.null_count, 1000); assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i); assert_eq!(zone.zone_length, 5000); - assert_eq!(zone.fragment_id, i as u64); + assert_eq!(zone.fragment_id, i as u32); } // Equals query: null (should match all zones since they contain null values) diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 47c54feca6e..6b596c99f73 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -1192,12 +1192,12 @@ mod tests { use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; use lance_index::IndexType; - // Create dataset with 10 rows: alternating boolean values + // Create dataset with 10 rows in two fragments: alternating boolean values // Rows 0,2,4,6,8 have value=true, rows 1,3,5,7,9 have value=false let mut ds = lance_datagen::gen_batch() .col("id", array::step::()) .col("value", array::cycle_bool(vec![true, false])) - .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) .await .unwrap(); @@ -1205,31 +1205,23 @@ mod tests { ds.delete("NOT value").await.unwrap(); // Verify data before index creation: should have 5 rows with value=true - let before_index: Vec = ds + let before_index = ds .scan() .filter("value") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let before_ids: Vec = before_index[0] - .column_by_name("id") - .unwrap() + let before_ids = before_index["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); assert_eq!( before_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "Before index: should have 5 rows" ); @@ -1239,21 +1231,15 @@ mod tests { .await .unwrap(); - // Query after index creation - THIS IS WHERE THE BUG MANIFESTS - let after_index: Vec = ds + let after_index = ds .scan() .filter("value") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let after_ids: Vec = after_index[0] - .column_by_name("id") - .unwrap() + let after_ids: Vec = after_index["id"] .as_any() .downcast_ref::() .unwrap() @@ -1291,21 +1277,14 @@ mod tests { let mut ds = lance_datagen::gen_batch() .col("id", array::step::()) .col("value", array::cycle_bool(vec![true, false])) - .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) .await .unwrap(); // Verify initial data: should have 10 rows - let initial_data: Vec = ds - .scan() - .try_into_stream() - .await - .unwrap() - .try_collect() - .await - .unwrap(); + let initial_data = ds.scan().try_into_batch().await.unwrap(); - let initial_count: usize = initial_data.iter().map(|b| b.num_rows()).sum(); + let initial_count: usize = initial_data["id"].len(); assert_eq!(initial_count, 10, "Should start with 10 rows"); // CREATE INDEX FIRST (before deletion) @@ -1315,31 +1294,23 @@ mod tests { .unwrap(); // Query with index before deletion - should return all 5 rows with value=true - let before_deletion: Vec = ds + let before_deletion = ds .scan() .filter("value") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let before_deletion_ids: Vec = before_deletion[0] - .column_by_name("id") - .unwrap() + let before_deletion_ids = before_deletion["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); assert_eq!( before_deletion_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "Before deletion: should return 5 rows with value=true" ); @@ -1347,27 +1318,19 @@ mod tests { ds.delete("NOT value").await.unwrap(); // Query after deletion - should still return 5 rows with value=true - let after_deletion: Vec = ds + let after_deletion = ds .scan() .filter("value") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let after_deletion_ids: Vec = after_deletion[0] - .column_by_name("id") - .unwrap() + let after_deletion_ids = after_deletion["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); // Verify we get the correct data after deletion assert_eq!( @@ -1378,14 +1341,12 @@ mod tests { ); assert_eq!( after_deletion_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "After deletion: Should return rows [0, 2, 4, 6, 8] with value=true" ); // Verify the actual values are correct - let after_deletion_values: Vec = after_deletion[0] - .column_by_name("value") - .unwrap() + let after_deletion_values: Vec = after_deletion["value"] .as_any() .downcast_ref::() .unwrap() @@ -1400,30 +1361,24 @@ mod tests { ); // Count rows matching "value = true" - let count_true: Vec = ds + let count_true = ds .scan() .filter("value") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let count_true_rows: usize = count_true.iter().map(|b| b.num_rows()).sum(); + let count_true_rows: usize = count_true.num_rows(); // Count rows matching "value = false" (should be 0 after deletion) - let count_false: Vec = ds + let count_false = ds .scan() .filter("NOT value") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let count_false_rows: usize = count_false.iter().map(|b| b.num_rows()).sum(); + let count_false_rows: usize = count_false.num_rows(); // The key assertions: filtered queries should return correct data assert_eq!( @@ -1458,31 +1413,23 @@ mod tests { ds.delete("value = 'banana'").await.unwrap(); // Verify data before index creation: should have 5 rows with value="apple" - let before_index: Vec = ds + let before_index = ds .scan() .filter("value = 'apple'") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let before_ids: Vec = before_index[0] - .column_by_name("id") - .unwrap() + let before_ids = before_index["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); assert_eq!( before_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "Before index: should have 5 rows" ); @@ -1502,27 +1449,19 @@ mod tests { .await .unwrap(); - let after_index: Vec = ds + let after_index = ds .scan() .filter("value = 'apple'") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let after_ids: Vec = after_index[0] - .column_by_name("id") - .unwrap() + let after_ids = after_index["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); // This assertion verifies the fix works assert_eq!( @@ -1534,7 +1473,7 @@ mod tests { ); assert_eq!( after_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "Bloom filter index with deletions returns wrong results" ); } @@ -1553,21 +1492,14 @@ mod tests { let mut ds = lance_datagen::gen_batch() .col("id", array::step::()) .col("value", array::cycle_utf8_literals(&["apple", "banana"])) - .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) .await .unwrap(); // Verify initial data: should have 10 rows - let initial_data: Vec = ds - .scan() - .try_into_stream() - .await - .unwrap() - .try_collect() - .await - .unwrap(); + let initial_data = ds.scan().try_into_batch().await.unwrap(); - let initial_count: usize = initial_data.iter().map(|b| b.num_rows()).sum(); + let initial_count: usize = initial_data.num_rows(); assert_eq!(initial_count, 10, "Should start with 10 rows"); // CREATE INDEX FIRST (before deletion) @@ -1587,31 +1519,23 @@ mod tests { .unwrap(); // Query with index before deletion - should return all 5 rows with value="apple" - let before_deletion: Vec = ds + let before_deletion = ds .scan() .filter("value = 'apple'") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let before_deletion_ids: Vec = before_deletion[0] - .column_by_name("id") - .unwrap() + let before_deletion_ids = before_deletion["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); assert_eq!( before_deletion_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "Before deletion: should return 5 rows with value='apple'" ); @@ -1619,27 +1543,19 @@ mod tests { ds.delete("value = 'banana'").await.unwrap(); // Query after deletion - should still return 5 rows with value="apple" - let after_deletion: Vec = ds + let after_deletion = ds .scan() .filter("value = 'apple'") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let after_deletion_ids: Vec = after_deletion[0] - .column_by_name("id") - .unwrap() + let after_deletion_ids = after_deletion["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); // Verify we get the correct data after deletion assert_eq!( @@ -1650,14 +1566,12 @@ mod tests { ); assert_eq!( after_deletion_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "After deletion: Should return rows [0, 2, 4, 6, 8] with value='apple'" ); // Verify the actual values are correct - let after_deletion_values: Vec<&str> = after_deletion[0] - .column_by_name("value") - .unwrap() + let after_deletion_values: Vec<&str> = after_deletion["value"] .as_any() .downcast_ref::() .unwrap() @@ -1672,30 +1586,24 @@ mod tests { ); // Count rows matching "value = 'apple'" - let count_apple: Vec = ds + let count_apple = ds .scan() .filter("value = 'apple'") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let count_apple_rows: usize = count_apple.iter().map(|b| b.num_rows()).sum(); + let count_apple_rows: usize = count_apple.num_rows(); // Count rows matching "value = 'banana'" (should be 0 after deletion) - let count_banana: Vec = ds + let count_banana = ds .scan() .filter("value = 'banana'") .unwrap() - .try_into_stream() - .await - .unwrap() - .try_collect() + .try_into_batch() .await .unwrap(); - let count_banana_rows: usize = count_banana.iter().map(|b| b.num_rows()).sum(); + let count_banana_rows: usize = count_banana.num_rows(); // The key assertions: filtered queries should return correct data assert_eq!( diff --git a/rust/lance/src/utils.rs b/rust/lance/src/utils.rs index e2d162e49d9..4ae847cfb9c 100644 --- a/rust/lance/src/utils.rs +++ b/rust/lance/src/utils.rs @@ -5,8 +5,7 @@ pub(crate) mod future; pub(crate) mod temporal; -// Public test utilities module - only available during testing #[cfg(test)] -pub mod test; +pub(crate) mod test; #[cfg(feature = "tensorflow")] pub mod tfrecord; From 88ecb640854fb17495609d71624479dc8bbdc7a8 Mon Sep 17 00:00:00 2001 From: Haocheng Liu <30446009+HaochengLIU@users.noreply.github.com> Date: Tue, 11 Nov 2025 20:36:20 -0500 Subject: [PATCH 5/7] revert the types for zonemap and bloomfilter stats --- python/python/tests/test_scalar_index.py | 8 +- rust/debug.output | 43 ++++++++++ rust/lance-index/src/scalar/bloomfilter.rs | 99 ++++++++++------------ rust/lance-index/src/scalar/zonemap.rs | 72 ++++++++-------- rust/lance/src/index/scalar.rs | 2 +- 5 files changed, 131 insertions(+), 93 deletions(-) create mode 100644 rust/debug.output diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 62bb0254514..baeee198f6e 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -1697,7 +1697,7 @@ def test_zonemap_deletion_handling(tmp_path: Path): "value": [True, False] * 5, } ) - ds = lance.write_dataset(data, "memory://") + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) ds.delete("NOT value") assert ds.to_table(filter="value = True").num_rows == 5 assert ds.to_table(filter="value = False").num_rows == 0 @@ -1709,7 +1709,7 @@ def test_zonemap_deletion_handling(tmp_path: Path): assert ids == [0, 2, 4, 6, 8] # now create the index before deletion - ds = lance.write_dataset(data, "memory://") + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) ds.create_scalar_index("value", index_type="zonemap") ds.delete("NOT value") assert ds.to_table(filter="value = True").num_rows == 5 @@ -1815,7 +1815,7 @@ def test_bloomfilter_deletion_handling(tmp_path: Path): "value": [1, 0] * 5, } ) - ds = lance.write_dataset(data, "memory://") + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) ds.delete("value = 0") assert ds.to_table(filter="value = 1").num_rows == 5 assert ds.to_table(filter="value = 0").num_rows == 0 @@ -1827,7 +1827,7 @@ def test_bloomfilter_deletion_handling(tmp_path: Path): assert ids == [0, 2, 4, 6, 8] # now create the index before deletion - ds = lance.write_dataset(data, "memory://") + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) ds.create_scalar_index("value", index_type="bloomfilter") ds.delete("value = 0") assert ds.to_table(filter="value = 1").num_rows == 5 diff --git a/rust/debug.output b/rust/debug.output new file mode 100644 index 00000000000..647ab86de93 --- /dev/null +++ b/rust/debug.output @@ -0,0 +1,43 @@ +diff --git a/Cargo.lock b/Cargo.lock +index 71561804..b050dca3 100644 +--- a/Cargo.lock ++++ b/Cargo.lock +@@ -4661,6 +4661,7 @@ dependencies = [ + "itertools 0.13.0", + "jieba-rs", + "jsonb", ++ "lance", + "lance-arrow", + "lance-core", + "lance-datafusion", +diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs +index 0a746d0e..2e8a3852 100644 +--- a/rust/lance-index/src/scalar/zonemap.rs ++++ b/rust/lance-index/src/scalar/zonemap.rs +@@ -546,6 +546,10 @@ impl ScalarIndex for ZoneMapIndex { + // Row addresses are: (fragment_id << 32) + zone_start + let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start; + let zone_end_addr = zone_start_addr + (zone.zone_length as u64); ++ println!( ++ "found a match! zone_start_addr: {}, zone_end_addr: {}", ++ zone_start_addr, zone_end_addr ++ ); + + // Add all row addresses in this zone to the result + row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); +@@ -763,6 +767,7 @@ impl ZoneMapIndexBuilder { + chunk_concat_stream(batches_source, self.options.rows_per_zone as usize); + + while let Some(batch) = batches_source.try_next().await? { ++ println!("incoming data is {:?}", batch); + if batch.num_rows() == 0 { + continue; + } +@@ -831,6 +836,7 @@ impl ZoneMapIndexBuilder { + if self.cur_zone_offset > 0 { + self.new_map(self.cur_fragment_id)?; + } ++ println!("after training, the zone_maps are {:?}", self.maps); + + Ok(()) + } diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 3108eda4e0b..7430ad9a517 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -16,7 +16,7 @@ use crate::scalar::{ BloomFilterQuery, BuiltinIndexType, CreatedIndex, ScalarIndexParams, UpdateCriteria, }; use crate::{pb, Any}; -use arrow_array::{Array, UInt32Array}; +use arrow_array::{Array, UInt64Array}; use lance_core::utils::address::RowAddress; use lance_core::utils::mask::RowIdTreeMap; use lance_core::ROW_ADDR; @@ -51,29 +51,28 @@ const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability"; const BLOOMFILTER_INDEX_VERSION: u32 = 0; // -// Example: Suppose we have two fragments, each with 4 rows. Now building the bloom filter index: +// Example: Suppose we have two fragments, each with 4 rows. // Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0 // The row addresses for fragment 0 are: 0, 1, 2, 3 // Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1 // The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3 // -// Example: Suppose we have two fragments, each with 4 rows. Deletion is 0 index based. -// We delete the 0th and 1st row in fragment 0, and the 1st and 2nd row in fragment 1, -// now building the bloom filter index: +// Deletion is 0 index based. We delete the 0th and 1st row in fragment 0, +// and the 1st and 2nd row in fragment 1, // Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0 // The row addresses for fragment 0 are: 2, 3 // Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1 // The row addresses for fragment 1 are: 32>>1, 32>>1 + 3 #[derive(Debug, Clone)] struct BloomFilterStatistics { - fragment_id: u32, + fragment_id: u64, // zone_start is start row of the zone in the fragment, also known // as the local offset. To get the actual first row address, // you can do `fragment_id << 32 + zone_start` - zone_start: u32, - // zone_length is the `row address span` between the first and the last row in the current SBBF block - // calculated as: (last_row_addr - first_row_addr + 1) - zone_length: u32, + zone_start: u64, + // zone_length is the `row offset span` between the first and the last row in the current SBBF block + // calculated as: (last_row_offset - first_row_offset + 1) + zone_length: usize, // Whether this zone contains any null values has_null: bool, // The actual bloom filter (SBBF) for efficient querying @@ -158,10 +157,10 @@ impl BloomFilterIndex { ) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "BloomFilterIndex: 'fragment_id' column is not UInt32", + "BloomFilterIndex: 'fragment_id' column is not UInt64", location!(), ) })?; @@ -172,10 +171,10 @@ impl BloomFilterIndex { Error::invalid_input("BloomFilterIndex: missing 'zone_start' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "BloomFilterIndex: 'zone_start' column is not UInt32", + "BloomFilterIndex: 'zone_start' column is not UInt64", location!(), ) })?; @@ -189,10 +188,10 @@ impl BloomFilterIndex { ) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "BloomFilterIndex: 'zone_length' column is not UInt32", + "BloomFilterIndex: 'zone_length' column is not UInt64", location!(), ) })?; @@ -249,7 +248,7 @@ impl BloomFilterIndex { blocks.push(BloomFilterStatistics { fragment_id: fragment_id_col.value(i), zone_start: zone_start_col.value(i), - zone_length: zone_length_col.value(i), + zone_length: zone_length_col.value(i) as usize, has_null: has_null_col.value(i), bloom_filter, }); @@ -465,7 +464,7 @@ impl Index for BloomFilterIndex { // Loop through zones and add unique fragment IDs to the bitmap for block in &self.zones { - frag_ids.insert(block.fragment_id); + frag_ids.insert(block.fragment_id as u32); } Ok(frag_ids) @@ -487,10 +486,7 @@ impl ScalarIndex for BloomFilterIndex { // For each zone, check if it might contain the queried value for block in self.zones.iter() { if self.evaluate_block_against_query(block, query)? { - // Calculate the range of row addresses for this zone - // zone_length is the address span (not row count), so we can directly use it - // This handles non-contiguous addresses from deletions correctly - let zone_start_addr = ((block.fragment_id as u64) << 32) + block.zone_start as u64; + let zone_start_addr = (block.fragment_id << 32) + block.zone_start; let zone_end_addr = zone_start_addr + block.zone_length as u64; // Add all row addresses in this zone to the result @@ -947,14 +943,13 @@ impl BloomFilterIndexBuilder { } fn new_block(&mut self, fragment_id: u32) -> Result<()> { - // Use the actual first and last row offsets we tracked - // zone_length is the offset span (last - first + 1), not row count - // This correctly handles non-contiguous offsets after deletions - let zone_start = self.cur_zone_first_row_offset.unwrap_or(0); + let zone_start = self.cur_zone_first_row_offset.unwrap_or(0) as u64; let zone_length = self .cur_zone_last_row_offset - .map(|last_offset| last_offset - zone_start + 1) - .unwrap_or(self.cur_zone_offset as u32); + .map(|last_row_offset| { + (last_row_offset - self.cur_zone_first_row_offset.unwrap_or(0) + 1) as usize + }) + .unwrap_or(self.cur_zone_offset); // Store the current bloom filter directly let bloom_filter = if let Some(ref sbbf) = self.sbbf { @@ -972,7 +967,7 @@ impl BloomFilterIndexBuilder { }; let new_block = BloomFilterStatistics { - fragment_id, + fragment_id: fragment_id as u64, zone_start, zone_length, has_null: self.cur_zone_has_null, @@ -1052,17 +1047,16 @@ impl BloomFilterIndexBuilder { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; - // Track first and last row offsets using RowAddress::new_from_u64 let first_row_offset = RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); - let last_offset = RowAddress::new_from_u64( + let last_row_offset = RowAddress::new_from_u64( row_addrs_array.value(array_offset + remaining - 1), ) .row_offset(); if self.cur_zone_first_row_offset.is_none() { self.cur_zone_first_row_offset = Some(first_row_offset); } - self.cur_zone_last_row_offset = Some(last_offset); + self.cur_zone_last_row_offset = Some(last_row_offset); self.cur_zone_offset += remaining; break; @@ -1070,16 +1064,15 @@ impl BloomFilterIndexBuilder { // There is enough data, create a new zone self.update_stats(&data_array.slice(array_offset, desired))?; - // Track first and last row offsets (local offsets within fragment) let first_row_offset = RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); - let last_offset = + let last_row_offset = RowAddress::new_from_u64(row_addrs_array.value(array_offset + desired - 1)) .row_offset(); if self.cur_zone_first_row_offset.is_none() { self.cur_zone_first_row_offset = Some(first_row_offset); } - self.cur_zone_last_row_offset = Some(last_offset); + self.cur_zone_last_row_offset = Some(last_row_offset); self.cur_zone_offset += desired; self.new_block((row_addrs_array.value(array_offset) >> 32) as u32)?; @@ -1106,13 +1099,13 @@ impl BloomFilterIndexBuilder { fn bloomfilter_stats_as_batch(&self) -> Result { let fragment_ids = - UInt32Array::from_iter_values(self.blocks.iter().map(|block| block.fragment_id)); + UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.fragment_id)); let zone_starts = - UInt32Array::from_iter_values(self.blocks.iter().map(|block| block.zone_start)); + UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_start)); let zone_lengths = - UInt32Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length)); + UInt64Array::from_iter_values(self.blocks.iter().map(|block| block.zone_length as u64)); let has_nulls = arrow_array::BooleanArray::from( self.blocks @@ -1138,9 +1131,9 @@ impl BloomFilterIndexBuilder { }; let schema = Arc::new(arrow_schema::Schema::new(vec![ - Field::new("fragment_id", DataType::UInt32, false), - Field::new("zone_start", DataType::UInt32, false), - Field::new("zone_length", DataType::UInt32, false), + Field::new("fragment_id", DataType::UInt64, false), + Field::new("zone_start", DataType::UInt64, false), + Field::new("zone_length", DataType::UInt64, false), Field::new("has_null", DataType::Boolean, false), Field::new("bloom_filter_data", DataType::Binary, false), ])); @@ -1474,8 +1467,8 @@ mod tests { assert_eq!(index.probability, 0.01); // Check that we have one zone (since 100 items fit exactly in one zone of size 100) - assert_eq!(index.zones[0].fragment_id, 0); - assert_eq!(index.zones[0].zone_start, 0); + assert_eq!(index.zones[0].fragment_id, 0u64); + assert_eq!(index.zones[0].zone_start, 0u64); assert_eq!(index.zones[0].zone_length, 100); // Test search functionality @@ -1555,21 +1548,21 @@ mod tests { assert_eq!(index.zones.len(), 4); // Check fragment 0 zones - assert_eq!(index.zones[0].fragment_id, 0); - assert_eq!(index.zones[0].zone_start, 0); + assert_eq!(index.zones[0].fragment_id, 0u64); + assert_eq!(index.zones[0].zone_start, 0u64); assert_eq!(index.zones[0].zone_length, 50); - assert_eq!(index.zones[1].fragment_id, 0); - assert_eq!(index.zones[1].zone_start, 50); + assert_eq!(index.zones[1].fragment_id, 0u64); + assert_eq!(index.zones[1].zone_start, 50u64); assert_eq!(index.zones[1].zone_length, 50); // Check fragment 1 zones - assert_eq!(index.zones[2].fragment_id, 1); - assert_eq!(index.zones[2].zone_start, 0); + assert_eq!(index.zones[2].fragment_id, 1u64); + assert_eq!(index.zones[2].zone_start, 0u64); assert_eq!(index.zones[2].zone_length, 50); - assert_eq!(index.zones[3].fragment_id, 1); - assert_eq!(index.zones[3].zone_start, 50); + assert_eq!(index.zones[3].fragment_id, 1u64); + assert_eq!(index.zones[3].zone_start, 50u64); assert_eq!(index.zones[3].zone_length, 50); // Test search functionality @@ -1731,8 +1724,8 @@ mod tests { // Verify zone structure for (i, block) in index.zones.iter().enumerate() { - assert_eq!(block.fragment_id, 0); - assert_eq!(block.zone_start, (i * 1000) as u32); + assert_eq!(block.fragment_id, 0u64); + assert_eq!(block.zone_start, (i * 1000) as u64); assert_eq!(block.zone_length, 1000); // Check that the bloom filter has some data (non-zero bytes when serialized) assert!(!block.bloom_filter.to_bytes().is_empty()); diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 562cbf78c09..1b4320148b1 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -30,7 +30,7 @@ use lance_datafusion::chunker::chunk_concat_stream; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; -use arrow_array::{new_empty_array, ArrayRef, RecordBatch, UInt32Array}; +use arrow_array::{new_empty_array, ArrayRef, RecordBatch, UInt32Array, UInt64Array}; use arrow_schema::{DataType, Field}; use datafusion::execution::SendableRecordBatchStream; use datafusion_common::ScalarValue; @@ -53,15 +53,14 @@ const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; const ZONEMAP_INDEX_VERSION: u32 = 0; // -// Example: Suppose we have two fragments, each with 4 rows. Now building the zonemap index: +// Example: Suppose we have two fragments, each with 4 rows. // Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0 // The row addresses for fragment 0 are: 0, 1, 2, 3 // Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1 // The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3 // -// Example: Suppose we have two fragments, each with 4 rows. Deletion is 0 index based. -// We delete the 0th and 1st row in fragment 0, and the 1st and 2nd row in fragment 1, -// now building the zonemap index: +// Deletion is 0 index based. We delete the 0th and 1st row in fragment 0, +// and the 1st and 2nd row in fragment 1, // Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0 // The row addresses for fragment 0 are: 2, 3 // Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1 @@ -74,14 +73,14 @@ struct ZoneMapStatistics { null_count: u32, // only apply to float type nan_count: u32, - fragment_id: u32, + fragment_id: u64, // zone_start is start row of the zone in the fragment, also known // as the local offset. To get the actual first row address, // you can do `fragment_id << 32 + zone_start` - zone_start: u32, - // zone_length is the `row address span` between the first and the last row in the zone - // calculated as: (last_row_addr - first_row_addr + 1) - zone_length: u32, + zone_start: u64, + // zone_length is the `row offset span` between the first and the last row in the zone + // calculated as: (last_row_offset - first_row_offset + 1) + zone_length: usize, } impl DeepSizeOf for ZoneMapStatistics { @@ -419,10 +418,10 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: missing 'zone_length' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'zone_length' column is not UInt32", + "ZoneMapIndex: 'zone_length' column is not UInt64", location!(), ) })?; @@ -433,10 +432,10 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: missing 'fragment_id' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'fragment_id' column is not UInt32", + "ZoneMapIndex: 'fragment_id' column is not UInt64", location!(), ) })?; @@ -447,10 +446,10 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: missing 'zone_start' column", location!()) })? .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'zone_start' column is not UInt32", + "ZoneMapIndex: 'zone_start' column is not UInt64", location!(), ) })?; @@ -484,7 +483,7 @@ impl ZoneMapIndex { nan_count, fragment_id: fragment_id_col.value(i), zone_start: zone_start_col.value(i), - zone_length: zone_length.value(i), + zone_length: zone_length.value(i) as usize, }); } @@ -537,7 +536,7 @@ impl Index for ZoneMapIndex { // Loop through zones and add unique fragment IDs to the bitmap for zone in &self.zones { - frag_ids.insert(zone.fragment_id); + frag_ids.insert(zone.fragment_id as u32); } Ok(frag_ids) @@ -561,7 +560,7 @@ impl ScalarIndex for ZoneMapIndex { // Check if this zone matches the query if self.evaluate_zone_against_query(zone, query)? { // Calculate the range of row addresses for this zone - let zone_start_addr = ((zone.fragment_id as u64) << 32) + zone.zone_start as u64; + let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start; let zone_end_addr = zone_start_addr + zone.zone_length as u64; // Add all row addresses in this zone to the result @@ -752,21 +751,20 @@ impl ZoneMapIndexBuilder { } fn new_map(&mut self, fragment_id: u32) -> Result<()> { - // Use the actual first and last row offsets we tracked - // zone_length is the offset span (last - first + 1), not row count - // This correctly handles non-contiguous offsets after deletions - let zone_start = self.cur_zone_first_row_offset.unwrap_or(0); + let zone_start = self.cur_zone_first_row_offset.unwrap_or(0) as u64; let zone_length = self .cur_zone_last_row_offset - .map(|last_row_offset| last_row_offset - zone_start + 1) - .unwrap_or(self.cur_zone_offset as u32); + .map(|last_row_offset| { + (last_row_offset - self.cur_zone_first_row_offset.unwrap_or(0) + 1) as usize + }) + .unwrap_or(self.cur_zone_offset); let new_map = ZoneMapStatistics { min: self.min.evaluate()?, max: self.max.evaluate()?, null_count: self.null_count, nan_count: self.nan_count, - fragment_id, + fragment_id: fragment_id as u64, zone_start, zone_length, }; @@ -905,13 +903,13 @@ impl ZoneMapIndexBuilder { let nan_counts = UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.nan_count)); let fragment_ids = - UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.fragment_id)); + UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.fragment_id)); let zone_lengths = - UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_length)); + UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_length as u64)); let zone_starts = - UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_start)); + UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.zone_start)); let schema = Arc::new(arrow_schema::Schema::new(vec![ // min and max can be null if the entire batch is null values @@ -919,9 +917,9 @@ impl ZoneMapIndexBuilder { Field::new("max", self.items_type.clone(), true), Field::new("null_count", DataType::UInt32, false), Field::new("nan_count", DataType::UInt32, false), - Field::new("fragment_id", DataType::UInt32, false), - Field::new("zone_start", DataType::UInt32, false), - Field::new("zone_length", DataType::UInt32, false), + Field::new("fragment_id", DataType::UInt64, false), + Field::new("zone_start", DataType::UInt64, false), + Field::new("zone_length", DataType::UInt64, false), ])); let columns: Vec = vec![ @@ -1210,7 +1208,7 @@ mod tests { assert_eq!(zone.null_count, 1000); assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i); assert_eq!(zone.zone_length, 5000); - assert_eq!(zone.fragment_id, i as u32); + assert_eq!(zone.fragment_id, i as u64); } // Equals query: null (should match all zones since they contain null values) @@ -1263,7 +1261,7 @@ mod tests { // Verify the new zone was added let new_zone = &updated_index.zones[10]; // Last zone should be the new one - assert_eq!(new_zone.fragment_id, 10); // New fragment ID + assert_eq!(new_zone.fragment_id, 10u64); // New fragment ID assert_eq!(new_zone.zone_length, 5000); assert_eq!(new_zone.null_count, 0); // New data has no nulls assert_eq!(new_zone.nan_count, 0); // New data has no NaN values @@ -1362,7 +1360,11 @@ mod tests { "Zone {} should have zone_length 100", i ); - assert_eq!(zone.fragment_id, 0, "Zone {} should have fragment_id 0", i); + assert_eq!( + zone.fragment_id, 0u64, + "Zone {} should have fragment_id 0", + i + ); } // Test search for NaN values using Equals with NaN diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 6b596c99f73..90e402bda2e 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -1405,7 +1405,7 @@ mod tests { let mut ds = lance_datagen::gen_batch() .col("id", array::step::()) .col("value", array::cycle_utf8_literals(&["apple", "banana"])) - .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10)) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) .await .unwrap(); From edf263f2c2720d81b55a82151ddf973017cbe4f6 Mon Sep 17 00:00:00 2001 From: Haocheng Liu <30446009+HaochengLIU@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:22:47 -0500 Subject: [PATCH 6/7] Delete rust/debug.output --- rust/debug.output | 43 ------------------------------------------- 1 file changed, 43 deletions(-) delete mode 100644 rust/debug.output diff --git a/rust/debug.output b/rust/debug.output deleted file mode 100644 index 647ab86de93..00000000000 --- a/rust/debug.output +++ /dev/null @@ -1,43 +0,0 @@ -diff --git a/Cargo.lock b/Cargo.lock -index 71561804..b050dca3 100644 ---- a/Cargo.lock -+++ b/Cargo.lock -@@ -4661,6 +4661,7 @@ dependencies = [ - "itertools 0.13.0", - "jieba-rs", - "jsonb", -+ "lance", - "lance-arrow", - "lance-core", - "lance-datafusion", -diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs -index 0a746d0e..2e8a3852 100644 ---- a/rust/lance-index/src/scalar/zonemap.rs -+++ b/rust/lance-index/src/scalar/zonemap.rs -@@ -546,6 +546,10 @@ impl ScalarIndex for ZoneMapIndex { - // Row addresses are: (fragment_id << 32) + zone_start - let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start; - let zone_end_addr = zone_start_addr + (zone.zone_length as u64); -+ println!( -+ "found a match! zone_start_addr: {}, zone_end_addr: {}", -+ zone_start_addr, zone_end_addr -+ ); - - // Add all row addresses in this zone to the result - row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); -@@ -763,6 +767,7 @@ impl ZoneMapIndexBuilder { - chunk_concat_stream(batches_source, self.options.rows_per_zone as usize); - - while let Some(batch) = batches_source.try_next().await? { -+ println!("incoming data is {:?}", batch); - if batch.num_rows() == 0 { - continue; - } -@@ -831,6 +836,7 @@ impl ZoneMapIndexBuilder { - if self.cur_zone_offset > 0 { - self.new_map(self.cur_fragment_id)?; - } -+ println!("after training, the zone_maps are {:?}", self.maps); - - Ok(()) - } From b41d03e31444987a351eaa87a808308efff957a8 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 18 Nov 2025 16:08:34 -0800 Subject: [PATCH 7/7] add another test --- rust/lance/src/index/scalar.rs | 89 +++++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 90e402bda2e..51b81323ed2 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -546,8 +546,8 @@ mod tests { use lance_core::utils::tempfile::TempStrDir; use lance_core::{datatypes::Field, utils::address::RowAddress}; use lance_datagen::array; - use lance_index::pbold::NGramIndexDetails; use lance_index::IndexType; + use lance_index::{pbold::NGramIndexDetails, scalar::BuiltinIndexType}; use lance_table::format::pb::VectorIndexDetails; fn make_index_metadata( @@ -1185,6 +1185,84 @@ mod tests { assert_eq!(rows_per_zone, 200, "ZoneMap rows_per_zone should be 200"); } + #[tokio::test] + async fn test_zonemap_with_deletions() { + let deletion_predicates = [ + "NOT value", // every other row + "id > 8191 or id < 10", // Second zone of each fragment + "id < 9190 ", // Most of first zone + ]; + let query_predicates = ["value", "id <= 8191", "id >= 1"]; + + async fn filter_query(ds: &Dataset, query_pred: &str) -> arrow_array::RecordBatch { + ds.scan() + .filter(query_pred) + .unwrap() + .try_into_batch() + .await + .unwrap() + } + + for del_pred in &deletion_predicates { + // We use 2 * 8192 so each fragment has two zones. + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_bool(vec![true, false])) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(2 * 8192)) + .await + .unwrap(); + + // Create zonemap index on "value" column + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + ds.create_index_builder(&["value"], IndexType::Scalar, ¶ms) + .name("value_zone_map".into()) + .await + .unwrap(); + ds.create_index_builder(&["id"], IndexType::Scalar, ¶ms) + .name("id_zone_map".into()) + .await + .unwrap(); + + ds.delete(del_pred).await.unwrap(); + let mut result_before = Vec::new(); + for query_pred in &query_predicates { + let batch = filter_query(&ds, query_pred).await; + result_before.push(batch); + } + ds.drop_index("value_zone_map").await.unwrap(); + ds.drop_index("id_zone_map").await.unwrap(); + + let mut expected = Vec::new(); + for query_pred in &query_predicates { + let batch = filter_query(&ds, query_pred).await; + expected.push(batch); + } + + for (before, expected) in result_before.iter().zip(expected.iter()) { + assert_eq!(before, expected, "Zonemap index with deletions returned wrong results for deletion predicate '{}'", del_pred); + } + + // Now recreate the indexes for the next iteration + ds.create_index_builder(&["value"], IndexType::Scalar, ¶ms) + .name("value_zone_map".into()) + .await + .unwrap(); + ds.create_index_builder(&["id"], IndexType::Scalar, ¶ms) + .name("id_zone_map".into()) + .await + .unwrap(); + let mut result_after = Vec::new(); + for query_pred in &query_predicates { + let batch = filter_query(&ds, query_pred).await; + result_after.push(batch); + } + + for (after, expected) in result_after.iter().zip(expected.iter()) { + assert_eq!(after, expected, "Zonemap index with deletions returned wrong results for deletion predicate '{}' after re-creating the index", del_pred); + } + } + } + #[tokio::test] async fn test_zonemap_deletion_then_index() { use arrow::datatypes::UInt64Type; @@ -1239,14 +1317,11 @@ mod tests { .await .unwrap(); - let after_ids: Vec = after_index["id"] + let after_ids = after_index["id"] .as_any() .downcast_ref::() .unwrap() - .values() - .iter() - .copied() - .collect(); + .values(); // This assertion will FAIL if bug #4758 is present assert_eq!( @@ -1258,7 +1333,7 @@ mod tests { ); assert_eq!( after_ids, - vec![0, 2, 4, 6, 8], + &[0, 2, 4, 6, 8], "Zonemap index with deletions returns wrong results" ); }