Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 47 additions & 28 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use async_trait::async_trait;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::ScalarValue;
use deepsize::DeepSizeOf;
use futures::TryStreamExt;
use futures::{stream, StreamExt, TryStreamExt};
use lance_core::{
cache::{CacheKey, LanceCache, WeakLanceCache},
error::LanceOptionExt,
utils::mask::RowIdTreeMap,
utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus},
Error, Result, ROW_ID,
};
use roaring::RoaringBitmap;
Expand Down Expand Up @@ -438,11 +438,13 @@ impl ScalarIndex for BitmapIndex {
if keys.is_empty() {
RowIdTreeMap::default()
} else {
let mut bitmaps = Vec::new();
for key in keys {
let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
bitmaps.push(bitmap);
}
let bitmaps: Vec<_> = stream::iter(keys.into_iter().map(|key| {
let this = self.clone();
async move { this.load_bitmap(&key, None).await }
}))
.buffer_unordered(get_num_compute_intensive_cpus())
.try_collect()
.await?;

let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
RowIdTreeMap::union_all(&bitmap_refs)
Expand All @@ -451,32 +453,49 @@ impl ScalarIndex for BitmapIndex {
SargableQuery::IsIn(values) => {
metrics.record_comparisons(values.len());

let mut bitmaps = Vec::new();
// Collect keys that exist in the index, tracking if we need nulls
let mut has_null = false;

for val in values {
if val.is_null() {
has_null = true;
} else {
let key = OrderableScalarValue(val.clone());
if self.index_map.contains_key(&key) {
let bitmap = self.load_bitmap(&key, Some(metrics)).await?;
bitmaps.push(bitmap);
let keys: Vec<_> = values
.iter()
.filter_map(|val| {
if val.is_null() {
has_null = true;
None
} else {
let key = OrderableScalarValue(val.clone());
if self.index_map.contains_key(&key) {
Some(key)
} else {
None
}
}
}
}

// Add null bitmap if needed
if has_null && !self.null_map.is_empty() {
bitmaps.push(self.null_map.clone());
}
})
.collect();

if bitmaps.is_empty() {
if keys.is_empty() && (!has_null || self.null_map.is_empty()) {
RowIdTreeMap::default()
} else {
// Convert Arc<RowIdTreeMap> to &RowIdTreeMap for union_all
let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
RowIdTreeMap::union_all(&bitmap_refs)
// Load bitmaps in parallel
let mut bitmaps: Vec<_> = stream::iter(keys.into_iter().map(|key| {
let this = self.clone();
async move { this.load_bitmap(&key, None).await }
}))
.buffer_unordered(get_num_compute_intensive_cpus())
.try_collect()
.await?;

// Add null bitmap if needed
if has_null && !self.null_map.is_empty() {
bitmaps.push(self.null_map.clone());
}

if bitmaps.is_empty() {
RowIdTreeMap::default()
} else {
// Convert Arc<RowIdTreeMap> to &RowIdTreeMap for union_all
let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect();
RowIdTreeMap::union_all(&bitmap_refs)
}
}
}
SargableQuery::IsNull() => {
Expand Down
Loading