From d88f1171996eaa272701c0ce62bbb84d155bef01 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 26 Nov 2025 16:26:26 -0800 Subject: [PATCH 1/3] fix: parallelize bitmap partition loading in IsIn expressions Prior to this commit an IsIn query on a bitmap index would load partitions in serial order. On high latency connections, queries with large IsIn lists can get very slow. This commit changes the partition loading to be parallelized over the number of CPUs, similar to btree. --- rust/lance-index/src/scalar/bitmap.rs | 75 +++++++++++++++++---------- 1 file changed, 47 insertions(+), 28 deletions(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 345c1687142..616eeff5643 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -17,11 +17,11 @@ use async_trait::async_trait; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; use deepsize::DeepSizeOf; -use futures::TryStreamExt; +use futures::{stream, StreamExt, TryStreamExt}; use lance_core::{ cache::{CacheKey, LanceCache, WeakLanceCache}, error::LanceOptionExt, - utils::mask::RowIdTreeMap, + utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus}, Error, Result, ROW_ID, }; use roaring::RoaringBitmap; @@ -438,11 +438,13 @@ impl ScalarIndex for BitmapIndex { if keys.is_empty() { RowIdTreeMap::default() } else { - let mut bitmaps = Vec::new(); - for key in keys { - let bitmap = self.load_bitmap(&key, Some(metrics)).await?; - bitmaps.push(bitmap); - } + let bitmaps: Vec<_> = stream::iter(keys.into_iter().map(|key| { + let this = self.clone(); + async move { this.load_bitmap(&key, None).await } + })) + .buffered(get_num_compute_intensive_cpus()) + .try_collect() + .await?; let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect(); RowIdTreeMap::union_all(&bitmap_refs) @@ -451,32 +453,49 @@ impl ScalarIndex for BitmapIndex { SargableQuery::IsIn(values) => { metrics.record_comparisons(values.len()); - let mut bitmaps = Vec::new(); + // Collect keys that exist in the index, tracking if we need nulls let mut has_null = false; - - for val in values { - if val.is_null() { - has_null = true; - } else { - let key = OrderableScalarValue(val.clone()); - if self.index_map.contains_key(&key) { - let bitmap = self.load_bitmap(&key, Some(metrics)).await?; - bitmaps.push(bitmap); + let keys: Vec<_> = values + .iter() + .filter_map(|val| { + if val.is_null() { + has_null = true; + None + } else { + let key = OrderableScalarValue(val.clone()); + if self.index_map.contains_key(&key) { + Some(key) + } else { + None + } } - } - } - - // Add null bitmap if needed - if has_null && !self.null_map.is_empty() { - bitmaps.push(self.null_map.clone()); - } + }) + .collect(); - if bitmaps.is_empty() { + if keys.is_empty() && !(has_null && !self.null_map.is_empty()) { RowIdTreeMap::default() } else { - // Convert Arc to &RowIdTreeMap for union_all - let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect(); - RowIdTreeMap::union_all(&bitmap_refs) + // Load bitmaps in parallel + let mut bitmaps: Vec<_> = stream::iter(keys.into_iter().map(|key| { + let this = self.clone(); + async move { this.load_bitmap(&key, None).await } + })) + .buffered(get_num_compute_intensive_cpus()) + .try_collect() + .await?; + + // Add null bitmap if needed + if has_null && !self.null_map.is_empty() { + bitmaps.push(self.null_map.clone()); + } + + if bitmaps.is_empty() { + RowIdTreeMap::default() + } else { + // Convert Arc to &RowIdTreeMap for union_all + let bitmap_refs: Vec<_> = bitmaps.iter().map(|b| b.as_ref()).collect(); + RowIdTreeMap::union_all(&bitmap_refs) + } } } SargableQuery::IsNull() => { From 99408f96590e4b28fd3c57ae97a1bf7a52163192 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 26 Nov 2025 18:10:18 -0800 Subject: [PATCH 2/3] lint --- rust/lance-index/src/scalar/bitmap.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 616eeff5643..9cd68d7d531 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -472,7 +472,7 @@ impl ScalarIndex for BitmapIndex { }) .collect(); - if keys.is_empty() && !(has_null && !self.null_map.is_empty()) { + if keys.is_empty() && (!has_null || self.null_map.is_empty()) { RowIdTreeMap::default() } else { // Load bitmaps in parallel From 02e0613da8ae32c1050f6cc15ab0e33a0c77c749 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Thu, 27 Nov 2025 06:14:50 -0800 Subject: [PATCH 3/3] Use buffer_unordered --- rust/lance-index/src/scalar/bitmap.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 9cd68d7d531..370f9ed8ef2 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -442,7 +442,7 @@ impl ScalarIndex for BitmapIndex { let this = self.clone(); async move { this.load_bitmap(&key, None).await } })) - .buffered(get_num_compute_intensive_cpus()) + .buffer_unordered(get_num_compute_intensive_cpus()) .try_collect() .await?; @@ -480,7 +480,7 @@ impl ScalarIndex for BitmapIndex { let this = self.clone(); async move { this.load_bitmap(&key, None).await } })) - .buffered(get_num_compute_intensive_cpus()) + .buffer_unordered(get_num_compute_intensive_cpus()) .try_collect() .await?;