diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 910963a5997..2e867bc9de8 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -712,10 +712,15 @@ impl IndexedExpression { fn maybe_not(self) -> Option { match (self.scalar_query, self.refine_expr) { (Some(_), Some(_)) => None, - (Some(scalar_query), None) => Some(Self { - scalar_query: Some(ScalarIndexExpr::Not(Box::new(scalar_query))), - refine_expr: None, - }), + (Some(scalar_query), None) => { + if scalar_query.needs_recheck() { + return None; + } + Some(Self { + scalar_query: Some(ScalarIndexExpr::Not(Box::new(scalar_query))), + refine_expr: None, + }) + } (None, Some(refine_expr)) => Some(Self { scalar_query: None, refine_expr: Some(Expr::Not(Box::new(refine_expr))), diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index fc18e33d1df..e1dd87195a4 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -1845,7 +1845,9 @@ mod tests { compute::concat_batches, datatypes::{Float32Type, UInt32Type, UInt64Type}, }; - use arrow_array::{cast::AsArray, Array, UInt32Array}; + use arrow_array::{ + cast::AsArray, Array, ArrayRef, Int32Array, RecordBatch, RecordBatchIterator, UInt32Array, + }; use itertools::Itertools; use lance_core::datatypes::OnMissing; use lance_core::utils::tempfile::TempStrDir; @@ -2042,12 +2044,66 @@ mod tests { } } + async fn dataset_with_bloom_filter_nulls() -> (TempStrDir, Arc) { + let tmp_path = TempStrDir::default(); + let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "value", + arrow_schema::DataType::Int32, + true, + )])); + let values: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); + let mut dataset = Dataset::write(reader, tmp_path.as_str(), None) + .await + .unwrap(); + dataset + .create_index( + &["value"], + IndexType::BloomFilter, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + (tmp_path, Arc::new(dataset)) + } + fn u32s(ranges: Vec>) -> Arc { Arc::new(UInt32Array::from_iter_values( ranges.into_iter().flat_map(|r| r.into_iter()), )) } + #[test_log::test(tokio::test)] + async fn test_bloom_filter_is_not_null_prefilter() { + let (_tmp_path, dataset) = dataset_with_bloom_filter_nulls().await; + let arrow_schema = Arc::new(arrow_schema::Schema::from(dataset.schema())); + let planner = Planner::new(arrow_schema); + let expr = planner.parse_filter("value IS NOT NULL").unwrap(); + let index_info = dataset.scalar_index_info().await.unwrap(); + let filter_plan = planner.create_filter_plan(expr, &index_info, true).unwrap(); + assert!( + filter_plan.index_query.is_none(), + "bloom filter IS NOT NULL should not use an index query" + ); + + let options = FilteredReadOptions::basic_full_read(&dataset).with_filter_plan(filter_plan); + let plan = FilteredReadExec::try_new(dataset.clone(), options, None).unwrap(); + let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); + let batches = stream.try_collect::>().await.unwrap(); + let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + + assert_eq!(row_count, 3); + } + #[test_log::test(tokio::test)] async fn test_range_no_scalar_index() { let fixture = TestFixture::new().await;