From 9f0ccb6dc8d530fe449fc325ed6c04cb995aa499 Mon Sep 17 00:00:00 2001 From: stevie9868 Date: Thu, 12 Feb 2026 17:00:28 -0800 Subject: [PATCH] fix index filter --- rust/lance/src/dataset/scanner.rs | 109 +++++++++++++++++------------- 1 file changed, 63 insertions(+), 46 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index fee95dfcbcd..ee12d77f596 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3334,10 +3334,8 @@ impl Scanner { let (match_plan, flat_match_plan) = match &index { Some(index) => { // Get unindexed fragments and filter to target fragments - let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; - let target_bitmap = - RoaringBitmap::from_iter(target_fragments.iter().map(|f| f.id as u32)); - unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32)); + let unindexed_fragments = self + .retain_target_fragments(self.dataset.unindexed_fragments(&index.name).await?); // If all target fragments are unindexed, skip index entirely if unindexed_fragments.len() == target_fragments.len() { @@ -3605,11 +3603,8 @@ impl Scanner { filter_plan: &ExprFilterPlan, ) -> Result> { // Get unindexed fragments and filter to target fragments - let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; - if let Some(target_frags) = &self.fragments { - let target_bitmap = RoaringBitmap::from_iter(target_frags.iter().map(|f| f.id as u32)); - unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32)); - } + let unindexed_fragments = + self.retain_target_fragments(self.dataset.unindexed_fragments(&index.name).await?); if !unindexed_fragments.is_empty() { // need to set the metric type to be the same as the index @@ -4168,6 +4163,16 @@ impl Scanner { } } + /// Retain only fragments that are in the user-specified fragment list. + /// If no fragment list is specified, returns the fragments unchanged. + fn retain_target_fragments(&self, mut fragments: Vec) -> Vec { + if let Some(target) = &self.fragments { + let bitmap = RoaringBitmap::from_iter(target.iter().map(|f| f.id as u32)); + fragments.retain(|f| bitmap.contains(f.id as u32)); + } + fragments + } + fn get_indexed_frags(&self, index: &[IndexMetadata]) -> RoaringBitmap { let all_fragments = self.get_fragments_as_bitmap(); @@ -4309,20 +4314,23 @@ impl Scanner { filter_plan: &ExprFilterPlan, required_frags: RoaringBitmap, ) -> Result { - if filter_plan.is_empty() { + if filter_plan.is_empty() && self.fragments.is_none() { log::trace!("no filter plan, no prefilter"); return Ok(PreFilterSource::None); } - let fragments = Arc::new( - self.dataset - .manifest - .fragments - .iter() - .filter(|f| required_frags.contains(f.id as u32)) - .cloned() - .collect::>(), - ); + // get fragments covered by index + let fragments: Vec = self + .dataset + .manifest + .fragments + .iter() + .filter(|f| required_frags.contains(f.id as u32)) + .cloned() + .collect(); + + // If explicitly specified fragments with .with_fragments(), intersect with those + let fragments = Arc::new(self.retain_target_fragments(fragments)); // Can only use ScalarIndexExec when the scalar index is exact and we are not scanning // a subset of the fragments. @@ -9486,7 +9494,7 @@ mod test { ); } - // Common test function for fragment list filtering + // Common test function for fragment list filtering (unindexed + indexed fragments) async fn test_fragment_list_filtering( test_ds: &TestVectorDataset, fragments: &[Fragment], @@ -9517,9 +9525,9 @@ mod test { .unwrap(); assert_values_in_range(i_array, 400..410, "Should only get results from fragment 2"); - // Test 3: Query only indexed fragments (fragments 0 and 1) + // Test 3: Query a single indexed fragment (fragment 0 only) let mut scanner = build_scanner(&test_ds.dataset); - scanner.with_fragments(vec![fragments[0].clone(), fragments[1].clone()]); + scanner.with_fragments(vec![fragments[0].clone()]); let batch = scanner.try_into_batch().await.unwrap(); let i_array = batch .column_by_name("i") @@ -9527,11 +9535,7 @@ mod test { .as_any() .downcast_ref::() .unwrap(); - assert_values_in_range( - i_array, - 0..400, - "Should only get results from indexed fragments", - ); + assert_values_in_range(i_array, 0..200, "Should only get results from fragment 0"); // Test 4: Query all indexed fragments (0, 1) plus one unindexed fragment (2), excluding fragment 3 let mut scanner = build_scanner(&test_ds.dataset); @@ -9552,11 +9556,33 @@ mod test { 0..410, "Should get results from fragments 0, 1, and 2, excluding fragment 3", ); + + // Test 5: One indexed fragment (0) + one unindexed fragment (2), skipping indexed fragment 1 and unindexed fragment 3 + let mut scanner = build_scanner(&test_ds.dataset); + scanner.with_fragments(vec![fragments[0].clone(), fragments[2].clone()]); + let batch = scanner.try_into_batch().await.unwrap(); + let i_array = batch + .column_by_name("i") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!( + i_array + .iter() + .all(|v| v.is_some_and(|val| (0..200).contains(&val) || (400..410).contains(&val))) + && i_array + .iter() + .any(|v| v.is_some_and(|val| (0..200).contains(&val))) + && i_array + .iter() + .any(|v| v.is_some_and(|val| (400..410).contains(&val))), + "Should only get results from fragment 0 (indexed) and fragment 2 (unindexed)" + ); } #[tokio::test] async fn test_vector_search_respects_fragment_list() { - // Create dataset with 2 initial fragments (400 rows, max 200 per file) let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) .await .unwrap(); @@ -9564,21 +9590,17 @@ mod test { // Create index on first 2 fragments test_ds.make_vector_index().await.unwrap(); - // Append two more fragments after indexing (these will be unindexed) + let query: Float32Array = (0..32).map(|v| v as f32).collect(); + + // Append two more unindexed fragments test_ds.append_data_with_range(400, 410).await.unwrap(); test_ds.append_data_with_range(410, 420).await.unwrap(); - // Now we have 4 fragments: - // Fragment 0: i=0..200 (indexed) - // Fragment 1: i=200..400 (indexed) - // Fragment 2: i=400..410 (unindexed) - // Fragment 3: i=410..420 (unindexed) - + // Fragment 0: i=0..200 (indexed), Fragment 1: i=200..400 (indexed) + // Fragment 2: i=400..410 (unindexed), Fragment 3: i=410..420 (unindexed) let fragments = test_ds.dataset.fragments(); assert_eq!(fragments.len(), 4); - let query: Float32Array = (0..32).map(|v| v as f32).collect(); - test_fragment_list_filtering(&test_ds, fragments, |dataset| { let mut scanner = dataset.scan(); scanner.nearest("vec", &query, 420).unwrap(); @@ -9589,7 +9611,6 @@ mod test { #[tokio::test] async fn test_fts_respects_fragment_list() { - // Create dataset with 2 initial fragments (400 rows, max 200 per file) let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) .await .unwrap(); @@ -9597,20 +9618,16 @@ mod test { // Create FTS index on first 2 fragments test_ds.make_fts_index().await.unwrap(); - // Append two more fragments after indexing (these will be unindexed) + // Append two more unindexed fragments test_ds.append_data_with_range(400, 410).await.unwrap(); test_ds.append_data_with_range(410, 420).await.unwrap(); - // Now we have 4 fragments: - // Fragment 0: i=0..200 (indexed) - // Fragment 1: i=200..400 (indexed) - // Fragment 2: i=400..410 (unindexed) - // Fragment 3: i=410..420 (unindexed) - + // Fragment 0: i=0..200 (indexed), Fragment 1: i=200..400 (indexed) + // Fragment 2: i=400..410 (unindexed), Fragment 3: i=410..420 (unindexed) let fragments = test_ds.dataset.fragments(); assert_eq!(fragments.len(), 4); - // "s-5" matches: s-5, s-50-59, s-150-159 (frag 0), s-250-259, s-350-359 (frag 1), s-405 (frag 2), s-415 (frag 3) + // "s-5" matches: s-5, s-50..s-59, s-150..s-159 (frag 0), s-250..s-259, s-350..s-359 (frag 1), s-405 (frag 2), s-415 (frag 3) test_fragment_list_filtering(&test_ds, fragments, |dataset| { let mut scanner = dataset.scan(); scanner