diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a9265517fe5..fe25e9ba6a1 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2938,8 +2938,29 @@ impl Scanner { .load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts()) .await?; + // Get target fragments + let target_fragments = self + .fragments + .clone() + .unwrap_or_else(|| self.dataset.fragments().to_vec()); + let (match_plan, flat_match_plan) = match &index { Some(index) => { + // Get unindexed fragments and filter to target fragments + let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; + let target_bitmap = + RoaringBitmap::from_iter(target_fragments.iter().map(|f| f.id as u32)); + unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32)); + + // If all target fragments are unindexed, skip index entirely + if unindexed_fragments.len() == target_fragments.len() { + let flat_match_plan = self + .plan_flat_match_query(unindexed_fragments, query, params, filter_plan) + .await?; + return Ok(flat_match_plan); + } + + // Mixed case: use index + flat search for unindexed let match_plan: Arc = Arc::new(MatchQueryExec::new( self.dataset.clone(), query.clone(), @@ -2947,7 +2968,6 @@ impl Scanner { prefilter_source.clone(), )); - let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; if unindexed_fragments.is_empty() { (Some(match_plan), None) } else { @@ -2958,9 +2978,9 @@ impl Scanner { } } None => { - let unindexed_fragments = self.dataset.fragments().iter().cloned().collect(); + // No index: flat search all target fragments let flat_match_plan = self - .plan_flat_match_query(unindexed_fragments, query, params, filter_plan) + .plan_flat_match_query(target_fragments.to_vec(), query, params, filter_plan) .await?; (None, Some(flat_match_plan)) } @@ -3102,7 +3122,20 @@ impl Scanner { None }; - if let Some((index, _idx, index_metric)) = matching_index { + // Only return index and deltas if there is an index on the column and at least one of the target fragments are indexed + let index_and_deltas = if let Some((index, _idx, index_metric)) = matching_index { + let deltas = self.dataset.load_indices_by_name(&index.name).await?; + let index_frags = self.get_indexed_frags(&deltas); + if !index_frags.is_empty() { + Some((index, deltas, index_metric)) + } else { + None + } + } else { + None + }; + + if let Some((index, deltas, index_metric)) = index_and_deltas { log::trace!("index found for vector search"); // Use the index's metric type q.metric_type = Some(index_metric); @@ -3114,9 +3147,6 @@ impl Scanner { location!(), )); } - - // Find all deltas with the same index name. - let deltas = self.dataset.load_indices_by_name(&index.name).await?; let ann_node = match vector_type { DataType::FixedSizeList(_, _) => self.ann(&q, &deltas, filter_plan).await?, DataType::List(_) => self.multivec_ann(&q, &deltas, filter_plan).await?, @@ -3166,7 +3196,7 @@ impl Scanner { filter_plan, vector_scan_projection, /*include_deleted_rows=*/ true, - None, + self.fragments.clone().map(Arc::new), None, /*is_prefilter= */ true, ) @@ -3187,8 +3217,13 @@ impl Scanner { mut knn_node: Arc, filter_plan: &ExprFilterPlan, ) -> Result> { - // Check if we've created new versions since the index was built. - let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; + // Get unindexed fragments and filter to target fragments + let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?; + if let Some(target_frags) = &self.fragments { + let target_bitmap = RoaringBitmap::from_iter(target_frags.iter().map(|f| f.id as u32)); + unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32)); + } + if !unindexed_fragments.is_empty() { // need to set the metric type to be the same as the index // to make sure the distance is comparable. @@ -4242,16 +4277,21 @@ pub mod test_dataset { } pub async fn append_new_data(&mut self) -> Result<()> { - let vector_values: Float32Array = (0..10) + self.append_data_with_range(400, 410).await + } + + pub async fn append_data_with_range(&mut self, start: i32, end: i32) -> Result<()> { + let count = (end - start) as usize; + let vector_values: Float32Array = (0..count) .flat_map(|i| vec![i as f32; self.dimension as usize].into_iter()) .collect(); let new_vectors = FixedSizeListArray::try_new_from_values(vector_values, self.dimension as i32) .unwrap(); let new_data: Vec = vec![ - Arc::new(Int32Array::from_iter_values(400..410)), // 5 * 80 + Arc::new(Int32Array::from_iter_values(start..end)), Arc::new(StringArray::from_iter_values( - (400..410).map(|v| format!("s-{}", v)), + (start..end).map(|v| format!("s-{}", v)), )), Arc::new(new_vectors), ]; @@ -9079,4 +9119,168 @@ mod test { runtime.handle().metrics().num_alive_tasks() ); } + + fn assert_values_in_range(array: &Int32Array, range: std::ops::Range, msg: &str) { + assert!(!array.is_empty(), "Expected some results but got none"); + assert!( + array + .iter() + .all(|v| v.is_some_and(|val| range.contains(&val))), + "{msg} (expected range {range:?})" + ); + } + + // Helper to assert that results exist from all fragment ranges + fn assert_has_all_fragments(array: &Int32Array) { + assert!( + array + .iter() + .any(|v| v.is_some_and(|val| (0..200).contains(&val))) + && array + .iter() + .any(|v| v.is_some_and(|val| (200..400).contains(&val))) + && array + .iter() + .any(|v| v.is_some_and(|val| (400..410).contains(&val))) + && array + .iter() + .any(|v| v.is_some_and(|val| (410..420).contains(&val))), + "Expected results from all fragments" + ); + } + + // Common test function for fragment list filtering + async fn test_fragment_list_filtering( + test_ds: &TestVectorDataset, + fragments: &[Fragment], + mut build_scanner: impl FnMut(&Dataset) -> Scanner, + ) { + // Test 1: Query without fragment filter - should get results from all fragments + let batch = build_scanner(&test_ds.dataset) + .try_into_batch() + .await + .unwrap(); + let i_array = batch + .column_by_name("i") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_has_all_fragments(i_array); + + // Test 2: Query only one unindexed fragment (fragment 2), excluding fragment 3 + let mut scanner = build_scanner(&test_ds.dataset); + scanner.with_fragments(vec![fragments[2].clone()]); + let batch = scanner.try_into_batch().await.unwrap(); + let i_array = batch + .column_by_name("i") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_values_in_range(i_array, 400..410, "Should only get results from fragment 2"); + + // Test 3: Query only indexed fragments (fragments 0 and 1) + let mut scanner = build_scanner(&test_ds.dataset); + scanner.with_fragments(vec![fragments[0].clone(), fragments[1].clone()]); + let batch = scanner.try_into_batch().await.unwrap(); + let i_array = batch + .column_by_name("i") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_values_in_range( + i_array, + 0..400, + "Should only get results from indexed fragments", + ); + + // Test 4: Query all indexed fragments (0, 1) plus one unindexed fragment (2), excluding fragment 3 + let mut scanner = build_scanner(&test_ds.dataset); + scanner.with_fragments(vec![ + fragments[0].clone(), + fragments[1].clone(), + fragments[2].clone(), + ]); + let batch = scanner.try_into_batch().await.unwrap(); + let i_array = batch + .column_by_name("i") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_values_in_range( + i_array, + 0..410, + "Should get results from fragments 0, 1, and 2, excluding fragment 3", + ); + } + + #[tokio::test] + async fn test_vector_search_respects_fragment_list() { + // Create dataset with 2 initial fragments (400 rows, max 200 per file) + let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + // Create index on first 2 fragments + test_ds.make_vector_index().await.unwrap(); + + // Append two more fragments after indexing (these will be unindexed) + test_ds.append_data_with_range(400, 410).await.unwrap(); + test_ds.append_data_with_range(410, 420).await.unwrap(); + + // Now we have 4 fragments: + // Fragment 0: i=0..200 (indexed) + // Fragment 1: i=200..400 (indexed) + // Fragment 2: i=400..410 (unindexed) + // Fragment 3: i=410..420 (unindexed) + + let fragments = test_ds.dataset.fragments(); + assert_eq!(fragments.len(), 4); + + let query: Float32Array = (0..32).map(|v| v as f32).collect(); + + test_fragment_list_filtering(&test_ds, fragments, |dataset| { + let mut scanner = dataset.scan(); + scanner.nearest("vec", &query, 420).unwrap(); + scanner + }) + .await; + } + + #[tokio::test] + async fn test_fts_respects_fragment_list() { + // Create dataset with 2 initial fragments (400 rows, max 200 per file) + let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + // Create FTS index on first 2 fragments + test_ds.make_fts_index().await.unwrap(); + + // Append two more fragments after indexing (these will be unindexed) + test_ds.append_data_with_range(400, 410).await.unwrap(); + test_ds.append_data_with_range(410, 420).await.unwrap(); + + // Now we have 4 fragments: + // Fragment 0: i=0..200 (indexed) + // Fragment 1: i=200..400 (indexed) + // Fragment 2: i=400..410 (unindexed) + // Fragment 3: i=410..420 (unindexed) + + let fragments = test_ds.dataset.fragments(); + assert_eq!(fragments.len(), 4); + + // "s-5" matches: s-5, s-50-59, s-150-159 (frag 0), s-250-259, s-350-359 (frag 1), s-405 (frag 2), s-415 (frag 3) + test_fragment_list_filtering(&test_ds, fragments, |dataset| { + let mut scanner = dataset.scan(); + scanner + .full_text_search(FullTextSearchQuery::new("s-5".into())) + .unwrap(); + scanner + }) + .await; + } }