diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index f0b7e76743e..baeee198f6e 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -1689,37 +1689,33 @@ def scan_stats_callback(stats: lance.ScanStatistics): assert small_bytes_read < large_bytes_read -def test_bloomfilter_index(tmp_path: Path): - """Test create bloomfilter index""" - tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"]) - dataset = lance.write_dataset(tbl, tmp_path / "dataset") - dataset.create_scalar_index("values", index_type="BLOOMFILTER") - indices = dataset.list_indices() - assert len(indices) == 1 - - # Get detailed index statistics - index_stats = dataset.stats.index_stats("values_idx") - assert index_stats["index_type"] == "BloomFilter" - assert "indices" in index_stats - assert len(index_stats["indices"]) == 1 - - # Verify bloomfilter statistics - bloom_stats = index_stats["indices"][0] - assert "num_blocks" in bloom_stats - assert bloom_stats["num_blocks"] == 2 - assert bloom_stats["number_of_items"] == 8192 - assert "probability" in bloom_stats - assert bloom_stats["probability"] == 0.00057 # Default probability +def test_zonemap_deletion_handling(tmp_path: Path): + """Test zonemap deletion handling""" + data = pa.table( + { + "id": range(10), + "value": [True, False] * 5, + } + ) + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) + ds.delete("NOT value") + assert ds.to_table(filter="value = True").num_rows == 5 + assert ds.to_table(filter="value = False").num_rows == 0 + ids = ds.to_table(filter="value = True")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] - # Test that the bloomfilter index is being used in the query plan - scanner = dataset.scanner(filter="values == 1234", prefilter=True) - plan = scanner.explain_plan() - assert "ScalarIndexQuery" in plan + ds.create_scalar_index("value", index_type="zonemap") + ids = ds.to_table(filter="value = True")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] - # Verify the query returns correct results - result = scanner.to_table() - assert result.num_rows == 1 - assert result["values"][0].as_py() == 1234 + # now create the index before deletion + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) + ds.create_scalar_index("value", index_type="zonemap") + ds.delete("NOT value") + assert ds.to_table(filter="value = True").num_rows == 5 + assert ds.to_table(filter="value = False").num_rows == 0 + ids = ds.to_table(filter="value = True")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] def test_zonemap_index_remapping(tmp_path: Path): @@ -1778,6 +1774,68 @@ def test_zonemap_index_remapping(tmp_path: Path): assert result.num_rows == 501 # 1000..1500 inclusive +def test_bloomfilter_index(tmp_path: Path): + """Test create bloomfilter index""" + tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"]) + dataset = lance.write_dataset(tbl, tmp_path / "dataset") + dataset.create_scalar_index("values", index_type="BLOOMFILTER") + indices = dataset.list_indices() + assert len(indices) == 1 + + # Get detailed index statistics + index_stats = dataset.stats.index_stats("values_idx") + assert index_stats["index_type"] == "BloomFilter" + assert "indices" in index_stats + assert len(index_stats["indices"]) == 1 + + # Verify bloomfilter statistics + bloom_stats = index_stats["indices"][0] + assert "num_blocks" in bloom_stats + assert bloom_stats["num_blocks"] == 2 + assert bloom_stats["number_of_items"] == 8192 + assert "probability" in bloom_stats + assert bloom_stats["probability"] == 0.00057 # Default probability + + # Test that the bloomfilter index is being used in the query plan + scanner = dataset.scanner(filter="values == 1234", prefilter=True) + plan = scanner.explain_plan() + assert "ScalarIndexQuery" in plan + + # Verify the query returns correct results + result = scanner.to_table() + assert result.num_rows == 1 + assert result["values"][0].as_py() == 1234 + + +def test_bloomfilter_deletion_handling(tmp_path: Path): + """Test bloomfilter deletion handling""" + data = pa.table( + { + "id": range(10), + "value": [1, 0] * 5, + } + ) + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) + ds.delete("value = 0") + assert ds.to_table(filter="value = 1").num_rows == 5 + assert ds.to_table(filter="value = 0").num_rows == 0 + ids = ds.to_table(filter="value = 1")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + ds.create_scalar_index("value", index_type="bloomfilter") + ids = ds.to_table(filter="value = 1")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + # now create the index before deletion + ds = lance.write_dataset(data, "memory://", max_rows_per_group=5) + ds.create_scalar_index("value", index_type="bloomfilter") + ds.delete("value = 0") + assert ds.to_table(filter="value = 1").num_rows == 5 + assert ds.to_table(filter="value = 0").num_rows == 0 + ids = ds.to_table(filter="value = 1")["id"].to_pylist() + assert ids == [0, 2, 4, 6, 8] + + def test_json_index(): vals = ['{"x": 7, "y": 10}', '{"x": 11, "y": 22}', '{"y": 0}', '{"x": 10}'] tbl = pa.table({"jsons": pa.array(vals, pa.json_())}) diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 7fef76136e2..7430ad9a517 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -17,6 +17,7 @@ use crate::scalar::{ }; use crate::{pb, Any}; use arrow_array::{Array, UInt64Array}; +use lance_core::utils::address::RowAddress; use lance_core::utils::mask::RowIdTreeMap; use lance_core::ROW_ADDR; use lance_datafusion::chunker::chunk_concat_stream; @@ -49,12 +50,28 @@ const BLOOMFILTER_ITEM_META_KEY: &str = "bloomfilter_item"; const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability"; const BLOOMFILTER_INDEX_VERSION: u32 = 0; +// +// Example: Suppose we have two fragments, each with 4 rows. +// Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 0, 1, 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3 +// +// Deletion is 0 index based. We delete the 0th and 1st row in fragment 0, +// and the 1st and 2nd row in fragment 1, +// Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 3 #[derive(Debug, Clone)] struct BloomFilterStatistics { fragment_id: u64, - // zone_start is the start row of the zone in the fragment, also known - // as local row offset + // zone_start is start row of the zone in the fragment, also known + // as the local offset. To get the actual first row address, + // you can do `fragment_id << 32 + zone_start` zone_start: u64, + // zone_length is the `row offset span` between the first and the last row in the current SBBF block + // calculated as: (last_row_offset - first_row_offset + 1) zone_length: usize, // Whether this zone contains any null values has_null: bool, @@ -469,10 +486,8 @@ impl ScalarIndex for BloomFilterIndex { // For each zone, check if it might contain the queried value for block in self.zones.iter() { if self.evaluate_block_against_query(block, query)? { - // Calculate the range of row addresses for this zone - // Row addresses are: (fragment_id << 32) + zone_start let zone_start_addr = (block.fragment_id << 32) + block.zone_start; - let zone_end_addr = zone_start_addr + (block.zone_length as u64); + let zone_end_addr = zone_start_addr + block.zone_length as u64; // Add all row addresses in this zone to the result row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); @@ -618,7 +633,11 @@ pub struct BloomFilterIndexBuilder { blocks: Vec, // The local offset within the current zones cur_zone_offset: usize, - cur_fragment_id: u64, + cur_fragment_id: u32, + // Track the actual first and last row offsets in the current zone + // This handles non-contiguous offsets after deletions + cur_zone_first_row_offset: Option, + cur_zone_last_row_offset: Option, cur_zone_has_null: bool, sbbf: Option, } @@ -639,6 +658,8 @@ impl BloomFilterIndexBuilder { blocks: Vec::new(), cur_zone_offset: 0, cur_fragment_id: 0, + cur_zone_first_row_offset: None, + cur_zone_last_row_offset: None, cur_zone_has_null: false, sbbf: Some(sbbf), }) @@ -921,14 +942,14 @@ impl BloomFilterIndexBuilder { Ok(()) } - fn new_block(&mut self, fragment_id: u64) -> Result<()> { - // Calculate zone_start based on existing zones in the same fragment - let zone_start = self - .blocks - .iter() - .filter(|block| block.fragment_id == fragment_id) - .map(|block| block.zone_length as u64) - .sum::(); + fn new_block(&mut self, fragment_id: u32) -> Result<()> { + let zone_start = self.cur_zone_first_row_offset.unwrap_or(0) as u64; + let zone_length = self + .cur_zone_last_row_offset + .map(|last_row_offset| { + (last_row_offset - self.cur_zone_first_row_offset.unwrap_or(0) + 1) as usize + }) + .unwrap_or(self.cur_zone_offset); // Store the current bloom filter directly let bloom_filter = if let Some(ref sbbf) = self.sbbf { @@ -946,15 +967,17 @@ impl BloomFilterIndexBuilder { }; let new_block = BloomFilterStatistics { - fragment_id, + fragment_id: fragment_id as u64, zone_start, - zone_length: self.cur_zone_offset, + zone_length, has_null: self.cur_zone_has_null, bloom_filter, }; self.blocks.push(new_block); self.cur_zone_offset = 0; + self.cur_zone_first_row_offset = None; + self.cur_zone_last_row_offset = None; self.cur_zone_has_null = false; // Reset sbbf for the next block @@ -997,14 +1020,14 @@ impl BloomFilterIndexBuilder { // Initialize cur_fragment_id from the first row address if this is the first batch if self.blocks.is_empty() && self.cur_zone_offset == 0 { let first_row_addr = row_addrs_array.value(0); - self.cur_fragment_id = first_row_addr >> 32; + self.cur_fragment_id = (first_row_addr >> 32) as u32; } while remaining > 0 { // Find the next fragment boundary in this batch let next_fragment_index = (array_offset..row_addrs_array.len()).find(|&i| { let row_addr = row_addrs_array.value(i); - let fragment_id = row_addr >> 32; + let fragment_id = (row_addr >> 32) as u32; fragment_id == self.cur_fragment_id + 1 }); let empty_rows_left_in_cur_zone: usize = @@ -1012,7 +1035,7 @@ impl BloomFilterIndexBuilder { // Check if there is enough data from the current fragment to fill the current zone let desired = if let Some(idx) = next_fragment_index { - self.cur_fragment_id = row_addrs_array.value(idx) >> 32; + self.cur_fragment_id = (row_addrs_array.value(idx) >> 32) as u32; // Take the minimum between distance to boundary and space left in zone // to ensure we don't exceed the zone size limit std::cmp::min(idx - array_offset, empty_rows_left_in_cur_zone) @@ -1023,17 +1046,40 @@ impl BloomFilterIndexBuilder { if desired > remaining { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; + + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_row_offset = RowAddress::new_from_u64( + row_addrs_array.value(array_offset + remaining - 1), + ) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); + } + self.cur_zone_last_row_offset = Some(last_row_offset); + self.cur_zone_offset += remaining; break; } else if desired > 0 { // There is enough data, create a new zone self.update_stats(&data_array.slice(array_offset, desired))?; + + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset + desired - 1)) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); + } + self.cur_zone_last_row_offset = Some(last_row_offset); + self.cur_zone_offset += desired; - self.new_block(row_addrs_array.value(array_offset) >> 32)?; + self.new_block((row_addrs_array.value(array_offset) >> 32) as u32)?; } else if desired == 0 { // The new batch starts with a new fragment. Flush the current zone if it's not empty if self.cur_zone_offset > 0 { - self.new_block(self.cur_fragment_id - 1)?; + self.new_block(self.cur_fragment_id.wrapping_sub(1))?; } // Let the loop run again // to find the next fragment boundary @@ -1421,8 +1467,8 @@ mod tests { assert_eq!(index.probability, 0.01); // Check that we have one zone (since 100 items fit exactly in one zone of size 100) - assert_eq!(index.zones[0].fragment_id, 0); - assert_eq!(index.zones[0].zone_start, 0); + assert_eq!(index.zones[0].fragment_id, 0u64); + assert_eq!(index.zones[0].zone_start, 0u64); assert_eq!(index.zones[0].zone_length, 100); // Test search functionality @@ -1502,21 +1548,21 @@ mod tests { assert_eq!(index.zones.len(), 4); // Check fragment 0 zones - assert_eq!(index.zones[0].fragment_id, 0); - assert_eq!(index.zones[0].zone_start, 0); + assert_eq!(index.zones[0].fragment_id, 0u64); + assert_eq!(index.zones[0].zone_start, 0u64); assert_eq!(index.zones[0].zone_length, 50); - assert_eq!(index.zones[1].fragment_id, 0); - assert_eq!(index.zones[1].zone_start, 50); + assert_eq!(index.zones[1].fragment_id, 0u64); + assert_eq!(index.zones[1].zone_start, 50u64); assert_eq!(index.zones[1].zone_length, 50); // Check fragment 1 zones - assert_eq!(index.zones[2].fragment_id, 1); - assert_eq!(index.zones[2].zone_start, 0); + assert_eq!(index.zones[2].fragment_id, 1u64); + assert_eq!(index.zones[2].zone_start, 0u64); assert_eq!(index.zones[2].zone_length, 50); - assert_eq!(index.zones[3].fragment_id, 1); - assert_eq!(index.zones[3].zone_start, 50); + assert_eq!(index.zones[3].fragment_id, 1u64); + assert_eq!(index.zones[3].zone_start, 50u64); assert_eq!(index.zones[3].zone_length, 50); // Test search functionality @@ -1678,7 +1724,7 @@ mod tests { // Verify zone structure for (i, block) in index.zones.iter().enumerate() { - assert_eq!(block.fragment_id, 0); + assert_eq!(block.fragment_id, 0u64); assert_eq!(block.zone_start, (i * 1000) as u64); assert_eq!(block.zone_length, 1000); // Check that the bloom filter has some data (non-zero bytes when serialized) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 7b6e6078310..1b4320148b1 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -43,7 +43,7 @@ use crate::{Index, IndexType}; use async_trait::async_trait; use deepsize::DeepSizeOf; use lance_core::Result; -use lance_core::{utils::mask::RowIdTreeMap, Error}; +use lance_core::{utils::address::RowAddress, utils::mask::RowIdTreeMap, Error}; use roaring::RoaringBitmap; use snafu::location; const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches @@ -52,6 +52,19 @@ const ZONEMAP_FILENAME: &str = "zonemap.lance"; const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; const ZONEMAP_INDEX_VERSION: u32 = 0; +// +// Example: Suppose we have two fragments, each with 4 rows. +// Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 0, 1, 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3 +// +// Deletion is 0 index based. We delete the 0th and 1st row in fragment 0, +// and the 1st and 2nd row in fragment 1, +// Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0 +// The row addresses for fragment 0 are: 2, 3 +// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1 +// The row addresses for fragment 1 are: 32>>1, 32>>1 + 3 /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] struct ZoneMapStatistics { @@ -61,9 +74,12 @@ struct ZoneMapStatistics { // only apply to float type nan_count: u32, fragment_id: u64, - // zone_start is the start row of the zone in the fragment, also known - // as local row offset + // zone_start is start row of the zone in the fragment, also known + // as the local offset. To get the actual first row address, + // you can do `fragment_id << 32 + zone_start` zone_start: u64, + // zone_length is the `row offset span` between the first and the last row in the zone + // calculated as: (last_row_offset - first_row_offset + 1) zone_length: usize, } @@ -405,7 +421,7 @@ impl ZoneMapIndex { .downcast_ref::() .ok_or_else(|| { Error::invalid_input( - "ZoneMapIndex: 'zone_length' column is not Uint64", + "ZoneMapIndex: 'zone_length' column is not UInt64", location!(), ) })?; @@ -459,6 +475,7 @@ impl ZoneMapIndex { let max = ScalarValue::try_from_array(max_col, i)?; let null_count = null_count_col.value(i); let nan_count = nan_count_col.value(i); + zones.push(ZoneMapStatistics { min, max, @@ -543,9 +560,8 @@ impl ScalarIndex for ZoneMapIndex { // Check if this zone matches the query if self.evaluate_zone_against_query(zone, query)? { // Calculate the range of row addresses for this zone - // Row addresses are: (fragment_id << 32) + zone_start let zone_start_addr = (zone.fragment_id << 32) + zone.zone_start; - let zone_end_addr = zone_start_addr + (zone.zone_length as u64); + let zone_end_addr = zone_start_addr + zone.zone_length as u64; // Add all row addresses in this zone to the result row_id_tree_map.insert_range(zone_start_addr..zone_end_addr); @@ -668,7 +684,11 @@ pub struct ZoneMapIndexBuilder { maps: Vec, // The local offset within the current zone cur_zone_offset: usize, - cur_fragment_id: u64, + cur_fragment_id: u32, + // Track the actual first and last row offsets in the current zone + // This handles non-contiguous offsets after deletions + cur_zone_first_row_offset: Option, + cur_zone_last_row_offset: Option, min: MinAccumulator, max: MaxAccumulator, @@ -686,6 +706,8 @@ impl ZoneMapIndexBuilder { maps: Vec::new(), cur_zone_offset: 0, cur_fragment_id: 0, + cur_zone_first_row_offset: None, + cur_zone_last_row_offset: None, min, max, null_count: 0, @@ -728,27 +750,30 @@ impl ZoneMapIndexBuilder { Ok(()) } - fn new_map(&mut self, fragment_id: u64) -> Result<()> { - // Calculate zone_start based on existing zones in the same fragment - let zone_start = self - .maps - .iter() - .filter(|zone| zone.fragment_id == fragment_id) - .map(|zone| zone.zone_length as u64) - .sum::(); + fn new_map(&mut self, fragment_id: u32) -> Result<()> { + let zone_start = self.cur_zone_first_row_offset.unwrap_or(0) as u64; + let zone_length = self + .cur_zone_last_row_offset + .map(|last_row_offset| { + (last_row_offset - self.cur_zone_first_row_offset.unwrap_or(0) + 1) as usize + }) + .unwrap_or(self.cur_zone_offset); + let new_map = ZoneMapStatistics { min: self.min.evaluate()?, max: self.max.evaluate()?, null_count: self.null_count, nan_count: self.nan_count, - fragment_id, + fragment_id: fragment_id as u64, zone_start, - zone_length: self.cur_zone_offset, + zone_length, }; self.maps.push(new_map); self.cur_zone_offset = 0; + self.cur_zone_first_row_offset = None; + self.cur_zone_last_row_offset = None; self.min = MinAccumulator::try_new(&self.items_type)?; self.max = MaxAccumulator::try_new(&self.items_type)?; self.null_count = 0; @@ -781,14 +806,14 @@ impl ZoneMapIndexBuilder { // Initialize cur_fragment_id from the first row address if this is the first batch if self.maps.is_empty() && self.cur_zone_offset == 0 { let first_row_addr = row_addrs_array.value(0); - self.cur_fragment_id = first_row_addr >> 32; + self.cur_fragment_id = (first_row_addr >> 32) as u32; } while remaining > 0 { // Find the next fragment boundary in this batch let next_fragment_index = (array_offset..row_addrs_array.len()).find(|&i| { let row_addr = row_addrs_array.value(i); - let fragment_id = row_addr >> 32; + let fragment_id = (row_addr >> 32) as u32; fragment_id == self.cur_fragment_id + 1 }); let empty_rows_left_in_cur_zone: usize = @@ -796,7 +821,7 @@ impl ZoneMapIndexBuilder { // Check if there is enough data from the current fragment to fill the current zone let desired = if let Some(idx) = next_fragment_index { - self.cur_fragment_id = row_addrs_array.value(idx) >> 32; + self.cur_fragment_id = (row_addrs_array.value(idx) >> 32) as u32; // Take the minimum between distance to boundary and space left in zone // to ensure we don't exceed the zone size limit std::cmp::min(idx - array_offset, empty_rows_left_in_cur_zone) @@ -807,17 +832,42 @@ impl ZoneMapIndexBuilder { if desired > remaining { // Not enough data to fill a map, just increment counts self.update_stats(&data_array.slice(array_offset, remaining))?; + + // Track first and last row offsets (local offsets within fragment) + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_row_offset = RowAddress::new_from_u64( + row_addrs_array.value(array_offset + remaining - 1), + ) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); + } + self.cur_zone_last_row_offset = Some(last_row_offset); + self.cur_zone_offset += remaining; break; } else if desired > 0 { // There is enough data, create a new zone map self.update_stats(&data_array.slice(array_offset, desired))?; + + // Track first and last row offsets + let first_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset(); + let last_row_offset = + RowAddress::new_from_u64(row_addrs_array.value(array_offset + desired - 1)) + .row_offset(); + if self.cur_zone_first_row_offset.is_none() { + self.cur_zone_first_row_offset = Some(first_row_offset); + } + self.cur_zone_last_row_offset = Some(last_row_offset); + self.cur_zone_offset += desired; - self.new_map(row_addrs_array.value(array_offset) >> 32)?; + self.new_map((row_addrs_array.value(array_offset) >> 32) as u32)?; } else if desired == 0 { // The new batch starts with a new fragment. Flush the current zone if it's not empty if self.cur_zone_offset > 0 { - self.new_map(self.cur_fragment_id - 1)?; + self.new_map(self.cur_fragment_id.wrapping_sub(1))?; } // Let the loop run again // to find the next fragment boundary @@ -1211,7 +1261,7 @@ mod tests { // Verify the new zone was added let new_zone = &updated_index.zones[10]; // Last zone should be the new one - assert_eq!(new_zone.fragment_id, 10); // New fragment ID + assert_eq!(new_zone.fragment_id, 10u64); // New fragment ID assert_eq!(new_zone.zone_length, 5000); assert_eq!(new_zone.null_count, 0); // New data has no nulls assert_eq!(new_zone.nan_count, 0); // New data has no NaN values @@ -1310,7 +1360,11 @@ mod tests { "Zone {} should have zone_length 100", i ); - assert_eq!(zone.fragment_id, 0, "Zone {} should have fragment_id 0", i); + assert_eq!( + zone.fragment_id, 0u64, + "Zone {} should have fragment_id 0", + i + ); } // Test search for NaN values using Equals with NaN diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index edc405bec43..51b81323ed2 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -546,8 +546,8 @@ mod tests { use lance_core::utils::tempfile::TempStrDir; use lance_core::{datatypes::Field, utils::address::RowAddress}; use lance_datagen::array; - use lance_index::pbold::NGramIndexDetails; use lance_index::IndexType; + use lance_index::{pbold::NGramIndexDetails, scalar::BuiltinIndexType}; use lance_table::format::pb::VectorIndexDetails; fn make_index_metadata( @@ -1184,4 +1184,510 @@ mod tests { let rows_per_zone = stats["rows_per_zone"].as_u64().unwrap(); assert_eq!(rows_per_zone, 200, "ZoneMap rows_per_zone should be 200"); } + + #[tokio::test] + async fn test_zonemap_with_deletions() { + let deletion_predicates = [ + "NOT value", // every other row + "id > 8191 or id < 10", // Second zone of each fragment + "id < 9190 ", // Most of first zone + ]; + let query_predicates = ["value", "id <= 8191", "id >= 1"]; + + async fn filter_query(ds: &Dataset, query_pred: &str) -> arrow_array::RecordBatch { + ds.scan() + .filter(query_pred) + .unwrap() + .try_into_batch() + .await + .unwrap() + } + + for del_pred in &deletion_predicates { + // We use 2 * 8192 so each fragment has two zones. + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_bool(vec![true, false])) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(2 * 8192)) + .await + .unwrap(); + + // Create zonemap index on "value" column + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + ds.create_index_builder(&["value"], IndexType::Scalar, ¶ms) + .name("value_zone_map".into()) + .await + .unwrap(); + ds.create_index_builder(&["id"], IndexType::Scalar, ¶ms) + .name("id_zone_map".into()) + .await + .unwrap(); + + ds.delete(del_pred).await.unwrap(); + let mut result_before = Vec::new(); + for query_pred in &query_predicates { + let batch = filter_query(&ds, query_pred).await; + result_before.push(batch); + } + ds.drop_index("value_zone_map").await.unwrap(); + ds.drop_index("id_zone_map").await.unwrap(); + + let mut expected = Vec::new(); + for query_pred in &query_predicates { + let batch = filter_query(&ds, query_pred).await; + expected.push(batch); + } + + for (before, expected) in result_before.iter().zip(expected.iter()) { + assert_eq!(before, expected, "Zonemap index with deletions returned wrong results for deletion predicate '{}'", del_pred); + } + + // Now recreate the indexes for the next iteration + ds.create_index_builder(&["value"], IndexType::Scalar, ¶ms) + .name("value_zone_map".into()) + .await + .unwrap(); + ds.create_index_builder(&["id"], IndexType::Scalar, ¶ms) + .name("id_zone_map".into()) + .await + .unwrap(); + let mut result_after = Vec::new(); + for query_pred in &query_predicates { + let batch = filter_query(&ds, query_pred).await; + result_after.push(batch); + } + + for (after, expected) in result_after.iter().zip(expected.iter()) { + assert_eq!(after, expected, "Zonemap index with deletions returned wrong results for deletion predicate '{}' after re-creating the index", del_pred); + } + } + } + + #[tokio::test] + async fn test_zonemap_deletion_then_index() { + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows in two fragments: alternating boolean values + // Rows 0,2,4,6,8 have value=true, rows 1,3,5,7,9 have value=false + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_bool(vec![true, false])) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) + .await + .unwrap(); + + // Delete rows where value=false (rows 1, 3, 5, 7, 9) + ds.delete("NOT value").await.unwrap(); + + // Verify data before index creation: should have 5 rows with value=true + let before_index = ds + .scan() + .filter("value") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let before_ids = before_index["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + assert_eq!( + before_ids, + &[0, 2, 4, 6, 8], + "Before index: should have 5 rows" + ); + + // Create zonemap index on "value" column + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + let after_index = ds + .scan() + .filter("value") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let after_ids = after_index["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + // This assertion will FAIL if bug #4758 is present + assert_eq!( + after_ids.len(), + 5, + "Expected 5 rows after index creation, got {}. Only {:?} returned instead of [0, 2, 4, 6, 8]", + after_ids.len(), + after_ids + ); + assert_eq!( + after_ids, + &[0, 2, 4, 6, 8], + "Zonemap index with deletions returns wrong results" + ); + } + + #[tokio::test] + async fn test_zonemap_index_then_deletion() { + // Tests the opposite scenario: create index FIRST, then perform deletions + // Verifies that zonemap index properly handles deletions that occur after index creation + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows: alternating boolean values + // Rows 0,2,4,6,8 have value=true, rows 1,3,5,7,9 have value=false + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_bool(vec![true, false])) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) + .await + .unwrap(); + + // Verify initial data: should have 10 rows + let initial_data = ds.scan().try_into_batch().await.unwrap(); + + let initial_count: usize = initial_data["id"].len(); + assert_eq!(initial_count, 10, "Should start with 10 rows"); + + // CREATE INDEX FIRST (before deletion) + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Query with index before deletion - should return all 5 rows with value=true + let before_deletion = ds + .scan() + .filter("value") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let before_deletion_ids = before_deletion["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + assert_eq!( + before_deletion_ids, + &[0, 2, 4, 6, 8], + "Before deletion: should return 5 rows with value=true" + ); + + // NOW DELETE rows where value=false (rows 1, 3, 5, 7, 9) + ds.delete("NOT value").await.unwrap(); + + // Query after deletion - should still return 5 rows with value=true + let after_deletion = ds + .scan() + .filter("value") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let after_deletion_ids = after_deletion["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + // Verify we get the correct data after deletion + assert_eq!( + after_deletion_ids.len(), + 5, + "After deletion: Expected 5 rows, got {}", + after_deletion_ids.len() + ); + assert_eq!( + after_deletion_ids, + &[0, 2, 4, 6, 8], + "After deletion: Should return rows [0, 2, 4, 6, 8] with value=true" + ); + + // Verify the actual values are correct + let after_deletion_values: Vec = after_deletion["value"] + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .collect(); + + assert_eq!( + after_deletion_values, + vec![true, true, true, true, true], + "All returned rows should have value=true" + ); + + // Count rows matching "value = true" + let count_true = ds + .scan() + .filter("value") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let count_true_rows: usize = count_true.num_rows(); + + // Count rows matching "value = false" (should be 0 after deletion) + let count_false = ds + .scan() + .filter("NOT value") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let count_false_rows: usize = count_false.num_rows(); + + // The key assertions: filtered queries should return correct data + assert_eq!( + count_true_rows, 5, + "Should have exactly 5 rows with value=true" + ); + assert_eq!( + count_false_rows, 0, + "Should have 0 rows with value=false after deletion" + ); + } + + #[tokio::test] + async fn test_bloomfilter_deletion_then_index() { + // Reproduces the same bug as #4758 but for bloom filter indexes + // After deleting rows and creating a bloom filter index, queries return fewer results than expected + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows: alternating string values "apple" and "banana" + // Rows 0,2,4,6,8 have value="apple", rows 1,3,5,7,9 have value="banana" + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_utf8_literals(&["apple", "banana"])) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) + .await + .unwrap(); + + // Delete rows where value="banana" (rows 1, 3, 5, 7, 9) + ds.delete("value = 'banana'").await.unwrap(); + + // Verify data before index creation: should have 5 rows with value="apple" + let before_index = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let before_ids = before_index["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + assert_eq!( + before_ids, + &[0, 2, 4, 6, 8], + "Before index: should have 5 rows" + ); + + // Create bloom filter index on "value" column with small zone size to ensure the bug is triggered + #[derive(serde::Serialize)] + struct BloomParams { + number_of_items: u64, + probability: f64, + } + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter).with_params( + &BloomParams { + number_of_items: 5, // Small zone size to ensure multiple zones + probability: 0.01, + }, + ); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + let after_index = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let after_ids = after_index["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + // This assertion verifies the fix works + assert_eq!( + after_ids.len(), + 5, + "Expected 5 rows after index creation, got {}. Only {:?} returned instead of [0, 2, 4, 6, 8]", + after_ids.len(), + after_ids + ); + assert_eq!( + after_ids, + &[0, 2, 4, 6, 8], + "Bloom filter index with deletions returns wrong results" + ); + } + + #[tokio::test] + async fn test_bloomfilter_index_then_deletion() { + // Tests the opposite scenario: create bloom filter index FIRST, then perform deletions + // Verifies that bloom filter index properly handles deletions that occur after index creation + use arrow::datatypes::UInt64Type; + use lance_datagen::array; + use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; + use lance_index::IndexType; + + // Create dataset with 10 rows: alternating string values "apple" and "banana" + // Rows 0,2,4,6,8 have value="apple", rows 1,3,5,7,9 have value="banana" + let mut ds = lance_datagen::gen_batch() + .col("id", array::step::()) + .col("value", array::cycle_utf8_literals(&["apple", "banana"])) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(5)) + .await + .unwrap(); + + // Verify initial data: should have 10 rows + let initial_data = ds.scan().try_into_batch().await.unwrap(); + + let initial_count: usize = initial_data.num_rows(); + assert_eq!(initial_count, 10, "Should start with 10 rows"); + + // CREATE INDEX FIRST (before deletion) + #[derive(serde::Serialize)] + struct BloomParams { + number_of_items: u64, + probability: f64, + } + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter).with_params( + &BloomParams { + number_of_items: 5, // Small zone size to ensure multiple zones + probability: 0.01, + }, + ); + ds.create_index(&["value"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Query with index before deletion - should return all 5 rows with value="apple" + let before_deletion = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let before_deletion_ids = before_deletion["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + assert_eq!( + before_deletion_ids, + &[0, 2, 4, 6, 8], + "Before deletion: should return 5 rows with value='apple'" + ); + + // NOW DELETE rows where value="banana" (rows 1, 3, 5, 7, 9) + ds.delete("value = 'banana'").await.unwrap(); + + // Query after deletion - should still return 5 rows with value="apple" + let after_deletion = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let after_deletion_ids = after_deletion["id"] + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + // Verify we get the correct data after deletion + assert_eq!( + after_deletion_ids.len(), + 5, + "After deletion: Expected 5 rows, got {}", + after_deletion_ids.len() + ); + assert_eq!( + after_deletion_ids, + &[0, 2, 4, 6, 8], + "After deletion: Should return rows [0, 2, 4, 6, 8] with value='apple'" + ); + + // Verify the actual values are correct + let after_deletion_values: Vec<&str> = after_deletion["value"] + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .collect(); + + assert_eq!( + after_deletion_values, + vec!["apple", "apple", "apple", "apple", "apple"], + "All returned rows should have value='apple'" + ); + + // Count rows matching "value = 'apple'" + let count_apple = ds + .scan() + .filter("value = 'apple'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let count_apple_rows: usize = count_apple.num_rows(); + + // Count rows matching "value = 'banana'" (should be 0 after deletion) + let count_banana = ds + .scan() + .filter("value = 'banana'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let count_banana_rows: usize = count_banana.num_rows(); + + // The key assertions: filtered queries should return correct data + assert_eq!( + count_apple_rows, 5, + "Should have exactly 5 rows with value='apple'" + ); + assert_eq!( + count_banana_rows, 0, + "Should have 0 rows with value='banana' after deletion" + ); + } } diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 6af1158fe26..6534f01d9f7 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -363,6 +363,12 @@ pub struct NoContextTestFixture { pub dataset: Dataset, } +impl Default for NoContextTestFixture { + fn default() -> Self { + Self::new() + } +} + impl NoContextTestFixture { pub fn new() -> Self { let runtime = tokio::runtime::Builder::new_current_thread()