Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 87 additions & 29 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1689,37 +1689,33 @@ def scan_stats_callback(stats: lance.ScanStatistics):
assert small_bytes_read < large_bytes_read


def test_bloomfilter_index(tmp_path: Path):
"""Test create bloomfilter index"""
tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"])
dataset = lance.write_dataset(tbl, tmp_path / "dataset")
dataset.create_scalar_index("values", index_type="BLOOMFILTER")
indices = dataset.list_indices()
assert len(indices) == 1

# Get detailed index statistics
index_stats = dataset.stats.index_stats("values_idx")
assert index_stats["index_type"] == "BloomFilter"
assert "indices" in index_stats
assert len(index_stats["indices"]) == 1

# Verify bloomfilter statistics
bloom_stats = index_stats["indices"][0]
assert "num_blocks" in bloom_stats
assert bloom_stats["num_blocks"] == 2
assert bloom_stats["number_of_items"] == 8192
assert "probability" in bloom_stats
assert bloom_stats["probability"] == 0.00057 # Default probability
def test_zonemap_deletion_handling(tmp_path: Path):
"""Test zonemap deletion handling"""
data = pa.table(
{
"id": range(10),
"value": [True, False] * 5,
}
)
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.delete("NOT value")
assert ds.to_table(filter="value = True").num_rows == 5
assert ds.to_table(filter="value = False").num_rows == 0
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# Test that the bloomfilter index is being used in the query plan
scanner = dataset.scanner(filter="values == 1234", prefilter=True)
plan = scanner.explain_plan()
assert "ScalarIndexQuery" in plan
ds.create_scalar_index("value", index_type="zonemap")
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# Verify the query returns correct results
result = scanner.to_table()
assert result.num_rows == 1
assert result["values"][0].as_py() == 1234
# now create the index before deletion
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.create_scalar_index("value", index_type="zonemap")
ds.delete("NOT value")
assert ds.to_table(filter="value = True").num_rows == 5
assert ds.to_table(filter="value = False").num_rows == 0
ids = ds.to_table(filter="value = True")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]


def test_zonemap_index_remapping(tmp_path: Path):
Expand Down Expand Up @@ -1778,6 +1774,68 @@ def test_zonemap_index_remapping(tmp_path: Path):
assert result.num_rows == 501 # 1000..1500 inclusive


def test_bloomfilter_index(tmp_path: Path):
"""Test create bloomfilter index"""
tbl = pa.Table.from_arrays([pa.array([i for i in range(10000)])], names=["values"])
dataset = lance.write_dataset(tbl, tmp_path / "dataset")
dataset.create_scalar_index("values", index_type="BLOOMFILTER")
indices = dataset.list_indices()
assert len(indices) == 1

# Get detailed index statistics
index_stats = dataset.stats.index_stats("values_idx")
assert index_stats["index_type"] == "BloomFilter"
assert "indices" in index_stats
assert len(index_stats["indices"]) == 1

# Verify bloomfilter statistics
bloom_stats = index_stats["indices"][0]
assert "num_blocks" in bloom_stats
assert bloom_stats["num_blocks"] == 2
assert bloom_stats["number_of_items"] == 8192
assert "probability" in bloom_stats
assert bloom_stats["probability"] == 0.00057 # Default probability

# Test that the bloomfilter index is being used in the query plan
scanner = dataset.scanner(filter="values == 1234", prefilter=True)
plan = scanner.explain_plan()
assert "ScalarIndexQuery" in plan

# Verify the query returns correct results
result = scanner.to_table()
assert result.num_rows == 1
assert result["values"][0].as_py() == 1234


def test_bloomfilter_deletion_handling(tmp_path: Path):
"""Test bloomfilter deletion handling"""
data = pa.table(
{
"id": range(10),
"value": [1, 0] * 5,
}
)
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.delete("value = 0")
assert ds.to_table(filter="value = 1").num_rows == 5
assert ds.to_table(filter="value = 0").num_rows == 0
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

ds.create_scalar_index("value", index_type="bloomfilter")
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]

# now create the index before deletion
ds = lance.write_dataset(data, "memory://", max_rows_per_group=5)
ds.create_scalar_index("value", index_type="bloomfilter")
ds.delete("value = 0")
assert ds.to_table(filter="value = 1").num_rows == 5
assert ds.to_table(filter="value = 0").num_rows == 0
ids = ds.to_table(filter="value = 1")["id"].to_pylist()
assert ids == [0, 2, 4, 6, 8]


def test_json_index():
vals = ['{"x": 7, "y": 10}', '{"x": 11, "y": 22}', '{"y": 0}', '{"x": 10}']
tbl = pa.table({"jsons": pa.array(vals, pa.json_())})
Expand Down
110 changes: 78 additions & 32 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::scalar::{
};
use crate::{pb, Any};
use arrow_array::{Array, UInt64Array};
use lance_core::utils::address::RowAddress;
use lance_core::utils::mask::RowIdTreeMap;
use lance_core::ROW_ADDR;
use lance_datafusion::chunker::chunk_concat_stream;
Expand Down Expand Up @@ -49,12 +50,28 @@ const BLOOMFILTER_ITEM_META_KEY: &str = "bloomfilter_item";
const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability";
const BLOOMFILTER_INDEX_VERSION: u32 = 0;

//
// Example: Suppose we have two fragments, each with 4 rows.
// Fragment 0: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 0
// The row addresses for fragment 0 are: 0, 1, 2, 3
// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 1, 2, 3 in fragment 1
// The row addresses for fragment 1 are: 32>>1, 32>>1 + 1, 32>>1 + 2, 32>>1 + 3
//
// Deletion is 0 index based. We delete the 0th and 1st row in fragment 0,
// and the 1st and 2nd row in fragment 1,
// Fragment 0: zone_start = 2, zone_length = 2 // covers rows 2, 3 in fragment 0
// The row addresses for fragment 0 are: 2, 3
// Fragment 1: zone_start = 0, zone_length = 4 // covers rows 0, 3 in fragment 1
// The row addresses for fragment 1 are: 32>>1, 32>>1 + 3
Comment on lines +53 to +65
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this example describing BloomFilterStatistics?

#[derive(Debug, Clone)]
struct BloomFilterStatistics {
fragment_id: u64,
// zone_start is the start row of the zone in the fragment, also known
// as local row offset
// zone_start is start row of the zone in the fragment, also known
// as the local offset. To get the actual first row address,
// you can do `fragment_id << 32 + zone_start`
zone_start: u64,
// zone_length is the `row offset span` between the first and the last row in the current SBBF block
// calculated as: (last_row_offset - first_row_offset + 1)
zone_length: usize,
// Whether this zone contains any null values
has_null: bool,
Expand Down Expand Up @@ -469,10 +486,8 @@ impl ScalarIndex for BloomFilterIndex {
// For each zone, check if it might contain the queried value
for block in self.zones.iter() {
if self.evaluate_block_against_query(block, query)? {
// Calculate the range of row addresses for this zone
// Row addresses are: (fragment_id << 32) + zone_start
let zone_start_addr = (block.fragment_id << 32) + block.zone_start;
let zone_end_addr = zone_start_addr + (block.zone_length as u64);
let zone_end_addr = zone_start_addr + block.zone_length as u64;

// Add all row addresses in this zone to the result
row_id_tree_map.insert_range(zone_start_addr..zone_end_addr);
Expand Down Expand Up @@ -618,7 +633,11 @@ pub struct BloomFilterIndexBuilder {
blocks: Vec<BloomFilterStatistics>,
// The local offset within the current zones
cur_zone_offset: usize,
cur_fragment_id: u64,
cur_fragment_id: u32,
// Track the actual first and last row offsets in the current zone
// This handles non-contiguous offsets after deletions
cur_zone_first_row_offset: Option<u32>,
cur_zone_last_row_offset: Option<u32>,
cur_zone_has_null: bool,
sbbf: Option<Sbbf>,
}
Expand All @@ -639,6 +658,8 @@ impl BloomFilterIndexBuilder {
blocks: Vec::new(),
cur_zone_offset: 0,
cur_fragment_id: 0,
cur_zone_first_row_offset: None,
cur_zone_last_row_offset: None,
cur_zone_has_null: false,
sbbf: Some(sbbf),
})
Expand Down Expand Up @@ -921,14 +942,14 @@ impl BloomFilterIndexBuilder {
Ok(())
}

fn new_block(&mut self, fragment_id: u64) -> Result<()> {
// Calculate zone_start based on existing zones in the same fragment
let zone_start = self
.blocks
.iter()
.filter(|block| block.fragment_id == fragment_id)
.map(|block| block.zone_length as u64)
.sum::<u64>();
fn new_block(&mut self, fragment_id: u32) -> Result<()> {
let zone_start = self.cur_zone_first_row_offset.unwrap_or(0) as u64;
let zone_length = self
.cur_zone_last_row_offset
.map(|last_row_offset| {
(last_row_offset - self.cur_zone_first_row_offset.unwrap_or(0) + 1) as usize
})
.unwrap_or(self.cur_zone_offset);

// Store the current bloom filter directly
let bloom_filter = if let Some(ref sbbf) = self.sbbf {
Expand All @@ -946,15 +967,17 @@ impl BloomFilterIndexBuilder {
};

let new_block = BloomFilterStatistics {
fragment_id,
fragment_id: fragment_id as u64,
zone_start,
zone_length: self.cur_zone_offset,
zone_length,
has_null: self.cur_zone_has_null,
bloom_filter,
};

self.blocks.push(new_block);
self.cur_zone_offset = 0;
self.cur_zone_first_row_offset = None;
self.cur_zone_last_row_offset = None;
self.cur_zone_has_null = false;

// Reset sbbf for the next block
Expand Down Expand Up @@ -997,22 +1020,22 @@ impl BloomFilterIndexBuilder {
// Initialize cur_fragment_id from the first row address if this is the first batch
if self.blocks.is_empty() && self.cur_zone_offset == 0 {
let first_row_addr = row_addrs_array.value(0);
self.cur_fragment_id = first_row_addr >> 32;
self.cur_fragment_id = (first_row_addr >> 32) as u32;
}

while remaining > 0 {
// Find the next fragment boundary in this batch
let next_fragment_index = (array_offset..row_addrs_array.len()).find(|&i| {
let row_addr = row_addrs_array.value(i);
let fragment_id = row_addr >> 32;
let fragment_id = (row_addr >> 32) as u32;
fragment_id == self.cur_fragment_id + 1
});
let empty_rows_left_in_cur_zone: usize =
(self.params.number_of_items - self.cur_zone_offset as u64) as usize;

// Check if there is enough data from the current fragment to fill the current zone
let desired = if let Some(idx) = next_fragment_index {
self.cur_fragment_id = row_addrs_array.value(idx) >> 32;
self.cur_fragment_id = (row_addrs_array.value(idx) >> 32) as u32;
// Take the minimum between distance to boundary and space left in zone
// to ensure we don't exceed the zone size limit
std::cmp::min(idx - array_offset, empty_rows_left_in_cur_zone)
Expand All @@ -1023,17 +1046,40 @@ impl BloomFilterIndexBuilder {
if desired > remaining {
// Not enough data to fill a map, just increment counts
self.update_stats(&data_array.slice(array_offset, remaining))?;

let first_row_offset =
RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset();
let last_row_offset = RowAddress::new_from_u64(
row_addrs_array.value(array_offset + remaining - 1),
)
.row_offset();
if self.cur_zone_first_row_offset.is_none() {
self.cur_zone_first_row_offset = Some(first_row_offset);
}
self.cur_zone_last_row_offset = Some(last_row_offset);

self.cur_zone_offset += remaining;
break;
} else if desired > 0 {
// There is enough data, create a new zone
self.update_stats(&data_array.slice(array_offset, desired))?;

let first_row_offset =
RowAddress::new_from_u64(row_addrs_array.value(array_offset)).row_offset();
let last_row_offset =
RowAddress::new_from_u64(row_addrs_array.value(array_offset + desired - 1))
.row_offset();
if self.cur_zone_first_row_offset.is_none() {
self.cur_zone_first_row_offset = Some(first_row_offset);
}
self.cur_zone_last_row_offset = Some(last_row_offset);

self.cur_zone_offset += desired;
self.new_block(row_addrs_array.value(array_offset) >> 32)?;
self.new_block((row_addrs_array.value(array_offset) >> 32) as u32)?;
} else if desired == 0 {
// The new batch starts with a new fragment. Flush the current zone if it's not empty
if self.cur_zone_offset > 0 {
self.new_block(self.cur_fragment_id - 1)?;
self.new_block(self.cur_fragment_id.wrapping_sub(1))?;
}
// Let the loop run again
// to find the next fragment boundary
Expand Down Expand Up @@ -1421,8 +1467,8 @@ mod tests {
assert_eq!(index.probability, 0.01);

// Check that we have one zone (since 100 items fit exactly in one zone of size 100)
assert_eq!(index.zones[0].fragment_id, 0);
assert_eq!(index.zones[0].zone_start, 0);
assert_eq!(index.zones[0].fragment_id, 0u64);
assert_eq!(index.zones[0].zone_start, 0u64);
assert_eq!(index.zones[0].zone_length, 100);

// Test search functionality
Expand Down Expand Up @@ -1502,21 +1548,21 @@ mod tests {
assert_eq!(index.zones.len(), 4);

// Check fragment 0 zones
assert_eq!(index.zones[0].fragment_id, 0);
assert_eq!(index.zones[0].zone_start, 0);
assert_eq!(index.zones[0].fragment_id, 0u64);
assert_eq!(index.zones[0].zone_start, 0u64);
assert_eq!(index.zones[0].zone_length, 50);

assert_eq!(index.zones[1].fragment_id, 0);
assert_eq!(index.zones[1].zone_start, 50);
assert_eq!(index.zones[1].fragment_id, 0u64);
assert_eq!(index.zones[1].zone_start, 50u64);
assert_eq!(index.zones[1].zone_length, 50);

// Check fragment 1 zones
assert_eq!(index.zones[2].fragment_id, 1);
assert_eq!(index.zones[2].zone_start, 0);
assert_eq!(index.zones[2].fragment_id, 1u64);
assert_eq!(index.zones[2].zone_start, 0u64);
assert_eq!(index.zones[2].zone_length, 50);

assert_eq!(index.zones[3].fragment_id, 1);
assert_eq!(index.zones[3].zone_start, 50);
assert_eq!(index.zones[3].fragment_id, 1u64);
assert_eq!(index.zones[3].zone_start, 50u64);
assert_eq!(index.zones[3].zone_length, 50);

// Test search functionality
Expand Down Expand Up @@ -1678,7 +1724,7 @@ mod tests {

// Verify zone structure
for (i, block) in index.zones.iter().enumerate() {
assert_eq!(block.fragment_id, 0);
assert_eq!(block.fragment_id, 0u64);
assert_eq!(block.zone_start, (i * 1000) as u64);
assert_eq!(block.zone_length, 1000);
// Check that the bloom filter has some data (non-zero bytes when serialized)
Expand Down
Loading
Loading