Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 63 additions & 46 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3334,10 +3334,8 @@ impl Scanner {
let (match_plan, flat_match_plan) = match &index {
Some(index) => {
// Get unindexed fragments and filter to target fragments
let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
let target_bitmap =
RoaringBitmap::from_iter(target_fragments.iter().map(|f| f.id as u32));
unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32));
let unindexed_fragments = self
.retain_target_fragments(self.dataset.unindexed_fragments(&index.name).await?);

// If all target fragments are unindexed, skip index entirely
if unindexed_fragments.len() == target_fragments.len() {
Expand Down Expand Up @@ -3605,11 +3603,8 @@ impl Scanner {
filter_plan: &ExprFilterPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
// Get unindexed fragments and filter to target fragments
let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
if let Some(target_frags) = &self.fragments {
let target_bitmap = RoaringBitmap::from_iter(target_frags.iter().map(|f| f.id as u32));
unindexed_fragments.retain(|f| target_bitmap.contains(f.id as u32));
}
let unindexed_fragments =
self.retain_target_fragments(self.dataset.unindexed_fragments(&index.name).await?);

if !unindexed_fragments.is_empty() {
// need to set the metric type to be the same as the index
Expand Down Expand Up @@ -4168,6 +4163,16 @@ impl Scanner {
}
}

/// Retain only fragments that are in the user-specified fragment list.
/// If no fragment list is specified, returns the fragments unchanged.
fn retain_target_fragments(&self, mut fragments: Vec<Fragment>) -> Vec<Fragment> {
if let Some(target) = &self.fragments {
let bitmap = RoaringBitmap::from_iter(target.iter().map(|f| f.id as u32));
fragments.retain(|f| bitmap.contains(f.id as u32));
}
fragments
}

fn get_indexed_frags(&self, index: &[IndexMetadata]) -> RoaringBitmap {
let all_fragments = self.get_fragments_as_bitmap();

Expand Down Expand Up @@ -4309,20 +4314,23 @@ impl Scanner {
filter_plan: &ExprFilterPlan,
required_frags: RoaringBitmap,
) -> Result<PreFilterSource> {
if filter_plan.is_empty() {
if filter_plan.is_empty() && self.fragments.is_none() {
log::trace!("no filter plan, no prefilter");
return Ok(PreFilterSource::None);
}

let fragments = Arc::new(
self.dataset
.manifest
.fragments
.iter()
.filter(|f| required_frags.contains(f.id as u32))
.cloned()
.collect::<Vec<_>>(),
);
// get fragments covered by index
let fragments: Vec<Fragment> = self
.dataset
.manifest
.fragments
.iter()
.filter(|f| required_frags.contains(f.id as u32))
.cloned()
.collect();

// If explicitly specified fragments with .with_fragments(), intersect with those
let fragments = Arc::new(self.retain_target_fragments(fragments));

// Can only use ScalarIndexExec when the scalar index is exact and we are not scanning
// a subset of the fragments.
Expand Down Expand Up @@ -9486,7 +9494,7 @@ mod test {
);
}

// Common test function for fragment list filtering
// Common test function for fragment list filtering (unindexed + indexed fragments)
async fn test_fragment_list_filtering(
test_ds: &TestVectorDataset,
fragments: &[Fragment],
Expand Down Expand Up @@ -9517,21 +9525,17 @@ mod test {
.unwrap();
assert_values_in_range(i_array, 400..410, "Should only get results from fragment 2");

// Test 3: Query only indexed fragments (fragments 0 and 1)
// Test 3: Query a single indexed fragment (fragment 0 only)
let mut scanner = build_scanner(&test_ds.dataset);
scanner.with_fragments(vec![fragments[0].clone(), fragments[1].clone()]);
scanner.with_fragments(vec![fragments[0].clone()]);
let batch = scanner.try_into_batch().await.unwrap();
let i_array = batch
.column_by_name("i")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_values_in_range(
i_array,
0..400,
"Should only get results from indexed fragments",
);
assert_values_in_range(i_array, 0..200, "Should only get results from fragment 0");

// Test 4: Query all indexed fragments (0, 1) plus one unindexed fragment (2), excluding fragment 3
let mut scanner = build_scanner(&test_ds.dataset);
Expand All @@ -9552,33 +9556,51 @@ mod test {
0..410,
"Should get results from fragments 0, 1, and 2, excluding fragment 3",
);

// Test 5: One indexed fragment (0) + one unindexed fragment (2), skipping indexed fragment 1 and unindexed fragment 3
let mut scanner = build_scanner(&test_ds.dataset);
scanner.with_fragments(vec![fragments[0].clone(), fragments[2].clone()]);
let batch = scanner.try_into_batch().await.unwrap();
let i_array = batch
.column_by_name("i")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert!(
i_array
.iter()
.all(|v| v.is_some_and(|val| (0..200).contains(&val) || (400..410).contains(&val)))
&& i_array
.iter()
.any(|v| v.is_some_and(|val| (0..200).contains(&val)))
&& i_array
.iter()
.any(|v| v.is_some_and(|val| (400..410).contains(&val))),
"Should only get results from fragment 0 (indexed) and fragment 2 (unindexed)"
);
}

#[tokio::test]
async fn test_vector_search_respects_fragment_list() {
// Create dataset with 2 initial fragments (400 rows, max 200 per file)
let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
.await
.unwrap();

// Create index on first 2 fragments
test_ds.make_vector_index().await.unwrap();

// Append two more fragments after indexing (these will be unindexed)
let query: Float32Array = (0..32).map(|v| v as f32).collect();

// Append two more unindexed fragments
test_ds.append_data_with_range(400, 410).await.unwrap();
test_ds.append_data_with_range(410, 420).await.unwrap();

// Now we have 4 fragments:
// Fragment 0: i=0..200 (indexed)
// Fragment 1: i=200..400 (indexed)
// Fragment 2: i=400..410 (unindexed)
// Fragment 3: i=410..420 (unindexed)

// Fragment 0: i=0..200 (indexed), Fragment 1: i=200..400 (indexed)
// Fragment 2: i=400..410 (unindexed), Fragment 3: i=410..420 (unindexed)
let fragments = test_ds.dataset.fragments();
assert_eq!(fragments.len(), 4);

let query: Float32Array = (0..32).map(|v| v as f32).collect();

test_fragment_list_filtering(&test_ds, fragments, |dataset| {
let mut scanner = dataset.scan();
scanner.nearest("vec", &query, 420).unwrap();
Expand All @@ -9589,28 +9611,23 @@ mod test {

#[tokio::test]
async fn test_fts_respects_fragment_list() {
// Create dataset with 2 initial fragments (400 rows, max 200 per file)
let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
.await
.unwrap();

// Create FTS index on first 2 fragments
test_ds.make_fts_index().await.unwrap();

// Append two more fragments after indexing (these will be unindexed)
// Append two more unindexed fragments
test_ds.append_data_with_range(400, 410).await.unwrap();
test_ds.append_data_with_range(410, 420).await.unwrap();

// Now we have 4 fragments:
// Fragment 0: i=0..200 (indexed)
// Fragment 1: i=200..400 (indexed)
// Fragment 2: i=400..410 (unindexed)
// Fragment 3: i=410..420 (unindexed)

// Fragment 0: i=0..200 (indexed), Fragment 1: i=200..400 (indexed)
// Fragment 2: i=400..410 (unindexed), Fragment 3: i=410..420 (unindexed)
let fragments = test_ds.dataset.fragments();
assert_eq!(fragments.len(), 4);

// "s-5" matches: s-5, s-50-59, s-150-159 (frag 0), s-250-259, s-350-359 (frag 1), s-405 (frag 2), s-415 (frag 3)
// "s-5" matches: s-5, s-50..s-59, s-150..s-159 (frag 0), s-250..s-259, s-350..s-359 (frag 1), s-405 (frag 2), s-415 (frag 3)
test_fragment_list_filtering(&test_ds, fragments, |dataset| {
let mut scanner = dataset.scan();
scanner
Expand Down
Loading