Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 217 additions & 13 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2938,16 +2938,36 @@ impl Scanner {
.load_scalar_index(IndexCriteria::default().for_column(&column).supports_fts())
.await?;

// Get target fragments
let target_fragments = self
.fragments
.clone()
.unwrap_or_else(|| self.dataset.fragments().to_vec());

let (match_plan, flat_match_plan) = match &index {
Some(index) => {
// Get unindexed fragments and filter to target fragments
let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
let target_bitmap =
RoaringBitmap::from_iter(target_fragments.iter().map(|f| f.id as u32));
unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32));

// If all target fragments are unindexed, skip index entirely
if unindexed_fragments.len() == target_fragments.len() {
let flat_match_plan = self
.plan_flat_match_query(unindexed_fragments, query, params, filter_plan)
.await?;
return Ok(flat_match_plan);
}

// Mixed case: use index + flat search for unindexed
let match_plan: Arc<dyn ExecutionPlan> = Arc::new(MatchQueryExec::new(
self.dataset.clone(),
query.clone(),
params.clone(),
prefilter_source.clone(),
));

let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
if unindexed_fragments.is_empty() {
(Some(match_plan), None)
} else {
Expand All @@ -2958,9 +2978,9 @@ impl Scanner {
}
}
None => {
let unindexed_fragments = self.dataset.fragments().iter().cloned().collect();
// No index: flat search all target fragments
let flat_match_plan = self
.plan_flat_match_query(unindexed_fragments, query, params, filter_plan)
.plan_flat_match_query(target_fragments.to_vec(), query, params, filter_plan)
.await?;
(None, Some(flat_match_plan))
}
Expand Down Expand Up @@ -3102,7 +3122,20 @@ impl Scanner {
None
};

if let Some((index, _idx, index_metric)) = matching_index {
// Only return index and deltas if there is an index on the column and at least one of the target fragments are indexed
let index_and_deltas = if let Some((index, _idx, index_metric)) = matching_index {
let deltas = self.dataset.load_indices_by_name(&index.name).await?;
let index_frags = self.get_indexed_frags(&deltas);
if !index_frags.is_empty() {
Some((index, deltas, index_metric))
} else {
None
}
} else {
None
};

if let Some((index, deltas, index_metric)) = index_and_deltas {
log::trace!("index found for vector search");
// Use the index's metric type
q.metric_type = Some(index_metric);
Expand All @@ -3114,9 +3147,6 @@ impl Scanner {
location!(),
));
}

// Find all deltas with the same index name.
let deltas = self.dataset.load_indices_by_name(&index.name).await?;
let ann_node = match vector_type {
DataType::FixedSizeList(_, _) => self.ann(&q, &deltas, filter_plan).await?,
DataType::List(_) => self.multivec_ann(&q, &deltas, filter_plan).await?,
Expand Down Expand Up @@ -3166,7 +3196,7 @@ impl Scanner {
filter_plan,
vector_scan_projection,
/*include_deleted_rows=*/ true,
None,
self.fragments.clone().map(Arc::new),
None,
/*is_prefilter= */ true,
)
Expand All @@ -3187,8 +3217,13 @@ impl Scanner {
mut knn_node: Arc<dyn ExecutionPlan>,
filter_plan: &ExprFilterPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check if we've created new versions since the index was built.
let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
// Get unindexed fragments and filter to target fragments
let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
if let Some(target_frags) = &self.fragments {
let target_bitmap = RoaringBitmap::from_iter(target_frags.iter().map(|f| f.id as u32));
unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32));
}

if !unindexed_fragments.is_empty() {
// need to set the metric type to be the same as the index
// to make sure the distance is comparable.
Expand Down Expand Up @@ -4242,16 +4277,21 @@ pub mod test_dataset {
}

pub async fn append_new_data(&mut self) -> Result<()> {
let vector_values: Float32Array = (0..10)
self.append_data_with_range(400, 410).await
}

pub async fn append_data_with_range(&mut self, start: i32, end: i32) -> Result<()> {
let count = (end - start) as usize;
let vector_values: Float32Array = (0..count)
.flat_map(|i| vec![i as f32; self.dimension as usize].into_iter())
.collect();
let new_vectors =
FixedSizeListArray::try_new_from_values(vector_values, self.dimension as i32)
.unwrap();
let new_data: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from_iter_values(400..410)), // 5 * 80
Arc::new(Int32Array::from_iter_values(start..end)),
Arc::new(StringArray::from_iter_values(
(400..410).map(|v| format!("s-{}", v)),
(start..end).map(|v| format!("s-{}", v)),
)),
Arc::new(new_vectors),
];
Expand Down Expand Up @@ -9079,4 +9119,168 @@ mod test {
runtime.handle().metrics().num_alive_tasks()
);
}

fn assert_values_in_range(array: &Int32Array, range: std::ops::Range<i32>, msg: &str) {
assert!(!array.is_empty(), "Expected some results but got none");
assert!(
array
.iter()
.all(|v| v.is_some_and(|val| range.contains(&val))),
"{msg} (expected range {range:?})"
);
}

// Helper to assert that results exist from all fragment ranges
fn assert_has_all_fragments(array: &Int32Array) {
assert!(
array
.iter()
.any(|v| v.is_some_and(|val| (0..200).contains(&val)))
&& array
.iter()
.any(|v| v.is_some_and(|val| (200..400).contains(&val)))
&& array
.iter()
.any(|v| v.is_some_and(|val| (400..410).contains(&val)))
&& array
.iter()
.any(|v| v.is_some_and(|val| (410..420).contains(&val))),
"Expected results from all fragments"
);
}

// Common test function for fragment list filtering
async fn test_fragment_list_filtering(
test_ds: &TestVectorDataset,
fragments: &[Fragment],
mut build_scanner: impl FnMut(&Dataset) -> Scanner,
) {
// Test 1: Query without fragment filter - should get results from all fragments
let batch = build_scanner(&test_ds.dataset)
.try_into_batch()
.await
.unwrap();
let i_array = batch
.column_by_name("i")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_has_all_fragments(i_array);

// Test 2: Query only one unindexed fragment (fragment 2), excluding fragment 3
let mut scanner = build_scanner(&test_ds.dataset);
scanner.with_fragments(vec![fragments[2].clone()]);
let batch = scanner.try_into_batch().await.unwrap();
let i_array = batch
.column_by_name("i")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_values_in_range(i_array, 400..410, "Should only get results from fragment 2");

// Test 3: Query only indexed fragments (fragments 0 and 1)
let mut scanner = build_scanner(&test_ds.dataset);
scanner.with_fragments(vec![fragments[0].clone(), fragments[1].clone()]);
let batch = scanner.try_into_batch().await.unwrap();
let i_array = batch
.column_by_name("i")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_values_in_range(
i_array,
0..400,
"Should only get results from indexed fragments",
);

// Test 4: Query all indexed fragments (0, 1) plus one unindexed fragment (2), excluding fragment 3
let mut scanner = build_scanner(&test_ds.dataset);
scanner.with_fragments(vec![
fragments[0].clone(),
fragments[1].clone(),
fragments[2].clone(),
]);
let batch = scanner.try_into_batch().await.unwrap();
let i_array = batch
.column_by_name("i")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_values_in_range(
i_array,
0..410,
"Should get results from fragments 0, 1, and 2, excluding fragment 3",
);
}

#[tokio::test]
async fn test_vector_search_respects_fragment_list() {
// Create dataset with 2 initial fragments (400 rows, max 200 per file)
let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
.await
.unwrap();

// Create index on first 2 fragments
test_ds.make_vector_index().await.unwrap();

// Append two more fragments after indexing (these will be unindexed)
test_ds.append_data_with_range(400, 410).await.unwrap();
test_ds.append_data_with_range(410, 420).await.unwrap();

// Now we have 4 fragments:
// Fragment 0: i=0..200 (indexed)
// Fragment 1: i=200..400 (indexed)
// Fragment 2: i=400..410 (unindexed)
// Fragment 3: i=410..420 (unindexed)

let fragments = test_ds.dataset.fragments();
assert_eq!(fragments.len(), 4);

let query: Float32Array = (0..32).map(|v| v as f32).collect();

test_fragment_list_filtering(&test_ds, fragments, |dataset| {
let mut scanner = dataset.scan();
scanner.nearest("vec", &query, 420).unwrap();
scanner
})
.await;
}

#[tokio::test]
async fn test_fts_respects_fragment_list() {
// Create dataset with 2 initial fragments (400 rows, max 200 per file)
let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
.await
.unwrap();

// Create FTS index on first 2 fragments
test_ds.make_fts_index().await.unwrap();

// Append two more fragments after indexing (these will be unindexed)
test_ds.append_data_with_range(400, 410).await.unwrap();
test_ds.append_data_with_range(410, 420).await.unwrap();

// Now we have 4 fragments:
// Fragment 0: i=0..200 (indexed)
// Fragment 1: i=200..400 (indexed)
// Fragment 2: i=400..410 (unindexed)
// Fragment 3: i=410..420 (unindexed)

let fragments = test_ds.dataset.fragments();
assert_eq!(fragments.len(), 4);

// "s-5" matches: s-5, s-50-59, s-150-159 (frag 0), s-250-259, s-350-359 (frag 1), s-405 (frag 2), s-415 (frag 3)
test_fragment_list_filtering(&test_ds, fragments, |dataset| {
let mut scanner = dataset.scan();
scanner
.full_text_search(FullTextSearchQuery::new("s-5".into()))
.unwrap();
scanner
})
.await;
}
}