From 871b89582073b6e5352cfe535bdbea5b4b343f6d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 7 Nov 2025 20:45:28 +0800 Subject: [PATCH 1/2] fix: ensure recheck for IsNotNull in bloom filter Signed-off-by: Xuanwo --- rust/lance-index/src/scalar/expression.rs | 13 +++-- rust/lance/src/io/exec/filtered_read.rs | 64 +++++++++++++++++++++-- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 910963a5997..2e867bc9de8 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -712,10 +712,15 @@ impl IndexedExpression { fn maybe_not(self) -> Option { match (self.scalar_query, self.refine_expr) { (Some(_), Some(_)) => None, - (Some(scalar_query), None) => Some(Self { - scalar_query: Some(ScalarIndexExpr::Not(Box::new(scalar_query))), - refine_expr: None, - }), + (Some(scalar_query), None) => { + if scalar_query.needs_recheck() { + return None; + } + Some(Self { + scalar_query: Some(ScalarIndexExpr::Not(Box::new(scalar_query))), + refine_expr: None, + }) + } (None, Some(refine_expr)) => Some(Self { scalar_query: None, refine_expr: Some(Expr::Not(Box::new(refine_expr))), diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index fc18e33d1df..0d6bd052054 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -486,9 +486,9 @@ impl FilteredReadStream { // We need to figure out which ranges to read from each fragment. // // If the scan range is ignoring the filters we can push it down here. - // If the scan range is not ignoring the filters we can only push it down if: - // 1. The index result is an exact match (we know exactly which rows will be in the result) - // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) + // If the scan range is not ignoring the filters we can only push it down if: + // 1. The index result is an exact match (we know exactly which rows will be in the result) + // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) // Returns: (fragment reads, whether limit was pushed down to fragment ranges) #[instrument(name = "plan_scan", skip_all)] async fn plan_scan( @@ -1845,7 +1845,10 @@ mod tests { compute::concat_batches, datatypes::{Float32Type, UInt32Type, UInt64Type}, }; - use arrow_array::{cast::AsArray, Array, UInt32Array}; + use arrow_array::{ + cast::AsArray, Array, ArrayRef, Int32Array, RecordBatch, RecordBatchIterator, + UInt32Array, + }; use itertools::Itertools; use lance_core::datatypes::OnMissing; use lance_core::utils::tempfile::TempStrDir; @@ -2042,12 +2045,65 @@ mod tests { } } + async fn dataset_with_bloom_filter_nulls() -> (TempStrDir, Arc) { + let tmp_path = TempStrDir::default(); + let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "value", + arrow_schema::DataType::Int32, + true, + )])); + let values: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); + let mut dataset = Dataset::write(reader, tmp_path.as_str(), None) + .await + .unwrap(); + dataset + .create_index( + &["value"], + IndexType::BloomFilter, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + (tmp_path, Arc::new(dataset)) + } + fn u32s(ranges: Vec>) -> Arc { Arc::new(UInt32Array::from_iter_values( ranges.into_iter().flat_map(|r| r.into_iter()), )) } + #[test_log::test(tokio::test)] + async fn test_bloom_filter_is_not_null_prefilter() { + let (_tmp_path, dataset) = dataset_with_bloom_filter_nulls().await; + let arrow_schema = Arc::new(arrow_schema::Schema::from(dataset.schema())); + let planner = Planner::new(arrow_schema); + let expr = planner.parse_filter("value IS NOT NULL").unwrap(); + let index_info = dataset.scalar_index_info().await.unwrap(); + let filter_plan = planner + .create_filter_plan(expr, &index_info, true) + .unwrap(); + assert!(filter_plan.index_query.is_none(), "bloom filter IS NOT NULL should not use an index query"); + + let options = FilteredReadOptions::basic_full_read(&dataset).with_filter_plan(filter_plan); + let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap(); + let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); + let batches = stream.try_collect::>().await.unwrap(); + let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + + assert_eq!(row_count, 3); + } + #[test_log::test(tokio::test)] async fn test_range_no_scalar_index() { let fixture = TestFixture::new().await; From a9f8c6974fbd2d5151e1925cd3222838c287976b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 7 Nov 2025 20:51:22 +0800 Subject: [PATCH 2/2] Fix Signed-off-by: Xuanwo --- rust/lance/src/io/exec/filtered_read.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0d6bd052054..e1dd87195a4 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -486,9 +486,9 @@ impl FilteredReadStream { // We need to figure out which ranges to read from each fragment. // // If the scan range is ignoring the filters we can push it down here. - // If the scan range is not ignoring the filters we can only push it down if: - // 1. The index result is an exact match (we know exactly which rows will be in the result) - // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) + // If the scan range is not ignoring the filters we can only push it down if: + // 1. The index result is an exact match (we know exactly which rows will be in the result) + // 2. The index result is AtLeast with guaranteed rows >= limit (we have enough guaranteed matches) // Returns: (fragment reads, whether limit was pushed down to fragment ranges) #[instrument(name = "plan_scan", skip_all)] async fn plan_scan( @@ -1846,8 +1846,7 @@ mod tests { datatypes::{Float32Type, UInt32Type, UInt64Type}, }; use arrow_array::{ - cast::AsArray, Array, ArrayRef, Int32Array, RecordBatch, RecordBatchIterator, - UInt32Array, + cast::AsArray, Array, ArrayRef, Int32Array, RecordBatch, RecordBatchIterator, UInt32Array, }; use itertools::Itertools; use lance_core::datatypes::OnMissing; @@ -2090,10 +2089,11 @@ mod tests { let planner = Planner::new(arrow_schema); let expr = planner.parse_filter("value IS NOT NULL").unwrap(); let index_info = dataset.scalar_index_info().await.unwrap(); - let filter_plan = planner - .create_filter_plan(expr, &index_info, true) - .unwrap(); - assert!(filter_plan.index_query.is_none(), "bloom filter IS NOT NULL should not use an index query"); + let filter_plan = planner.create_filter_plan(expr, &index_info, true).unwrap(); + assert!( + filter_plan.index_query.is_none(), + "bloom filter IS NOT NULL should not use an index query" + ); let options = FilteredReadOptions::basic_full_read(&dataset).with_filter_plan(filter_plan); let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap();