diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 345c1687142..370f9ed8ef2 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -17,11 +17,11 @@ use async_trait::async_trait; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; use deepsize::DeepSizeOf; -use futures::TryStreamExt; +use futures::{stream, StreamExt, TryStreamExt}; use lance_core::{ cache::{CacheKey, LanceCache, WeakLanceCache}, error::LanceOptionExt, - utils::mask::RowIdTreeMap, + utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus}, Error, Result, ROW_ID, }; use roaring::RoaringBitmap; @@ -438,11 +438,13 @@ impl ScalarIndex for BitmapIndex { if keys.is_empty() { RowIdTreeMap::default() } else { - let mut bitmaps = Vec::new(); - for key in keys { - let bitmap = self.load_bitmap(&key, Some(metrics)).await?; - bitmaps.push(bitmap); - } + let bitmaps: Vec<_> = stream::iter(keys.into_iter().map(|key| { + let this = self.clone(); + async move { this.load_bitmap(&key, None).await } + })) + .buffer_unordered(get_num_compute_intensive_cpus()) + .try_collect() + .await?; let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect(); RowIdTreeMap::union_all(&bitmap_refs) @@ -451,32 +453,49 @@ impl ScalarIndex for BitmapIndex { SargableQuery::IsIn(values) => { metrics.record_comparisons(values.len()); - let mut bitmaps = Vec::new(); + // Collect keys that exist in the index, tracking if we need nulls let mut has_null = false; - - for val in values { - if val.is_null() { - has_null = true; - } else { - let key = OrderableScalarValue(val.clone()); - if self.index_map.contains_key(&key) { - let bitmap = self.load_bitmap(&key, Some(metrics)).await?; - bitmaps.push(bitmap); + let keys: Vec<_> = values + .iter() + .filter_map(|val| { + if val.is_null() { + has_null = true; + None + } else { + let key = OrderableScalarValue(val.clone()); + if self.index_map.contains_key(&key) { + Some(key) + } else { + None + } } - } - } - - // Add null bitmap if needed - if has_null && !self.null_map.is_empty() { - bitmaps.push(self.null_map.clone()); - } + }) + .collect(); - if bitmaps.is_empty() { + if keys.is_empty() && (!has_null || self.null_map.is_empty()) { RowIdTreeMap::default() } else { - // Convert Arc to &RowIdTreeMap for union_all - let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect(); - RowIdTreeMap::union_all(&bitmap_refs) + // Load bitmaps in parallel + let mut bitmaps: Vec<_> = stream::iter(keys.into_iter().map(|key| { + let this = self.clone(); + async move { this.load_bitmap(&key, None).await } + })) + .buffer_unordered(get_num_compute_intensive_cpus()) + .try_collect() + .await?; + + // Add null bitmap if needed + if has_null && !self.null_map.is_empty() { + bitmaps.push(self.null_map.clone()); + } + + if bitmaps.is_empty() { + RowIdTreeMap::default() + } else { + // Convert Arc to &RowIdTreeMap for union_all + let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect(); + RowIdTreeMap::union_all(&bitmap_refs) + } } } SargableQuery::IsNull() => {