diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 6d07b5b8218..5de2657922f 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -22,6 +22,7 @@ use futures::{future::BoxFuture, FutureExt, Stream}; use inverted::query::{fill_fts_query_column, FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery}; use lance_core::utils::mask::{NullableRowAddrSet, RowAddrTreeMap}; use lance_core::{Error, Result}; +use roaring::RoaringBitmap; use serde::Serialize; use snafu::location; @@ -888,10 +889,14 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { ) -> Result; /// Add the new data into the index, creating an updated version of the index in `dest_store` + /// + /// If `valid_old_fragments` is provided, old index data for fragments not in the bitmap + /// will be filtered out during the merge. async fn update( &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + valid_old_fragments: Option<&RoaringBitmap>, ) -> Result; /// Returns the criteria that will be used to update the index diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 12d0b6232a1..d6353cf3108 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -589,6 +589,7 @@ impl ScalarIndex for BitmapIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + _valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { let mut state = HashMap::new(); diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 3057323b5da..b90bd4e98d2 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -490,6 +490,7 @@ impl ScalarIndex for BloomFilterIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + _valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { // Re-train bloom filters for the appended data using the shared trainer let params = BloomFilterIndexBuilderParams { diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index dbf3b99c088..f2c7b0d0f8a 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1302,20 +1302,21 @@ impl BTreeIndex { ))) } - async fn into_old_data(self) -> Result> { - let stream = self.into_data_stream().await?; - Ok(Arc::new(OneShotExec::new(stream))) - } - async fn combine_old_new( self, new_data: SendableRecordBatchStream, chunk_size: u64, + valid_old_fragments: Option, ) -> Result { let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?; let new_input = Arc::new(OneShotExec::new(new_data)); - let old_input = self.into_old_data().await?; + let old_stream = self.into_data_stream().await?; + let old_stream = match valid_old_fragments { + Some(valid_frags) => filter_row_ids_by_fragments(old_stream, valid_frags), + None => old_stream, + }; + let old_input = Arc::new(OneShotExec::new(old_stream)); debug_assert_eq!( old_input.schema().flattened_fields().len(), new_input.schema().flattened_fields().len() @@ -1344,6 +1345,29 @@ impl BTreeIndex { } } +/// Filter a stream of record batches to only include rows whose row address +/// belongs to a fragment in `valid_fragments`. Row addresses encode the fragment +/// ID in the upper 32 bits. +fn filter_row_ids_by_fragments( + stream: SendableRecordBatchStream, + valid_fragments: RoaringBitmap, +) -> SendableRecordBatchStream { + let schema = stream.schema(); + let filtered = stream.map(move |batch_result| { + let batch = batch_result?; + let row_ids = batch[ROW_ID] + .as_any() + .downcast_ref::() + .expect("expected UInt64Array for row_id column"); + let mask: arrow_array::BooleanArray = row_ids + .iter() + .map(|id| id.map(|id| valid_fragments.contains((id >> 32) as u32))) + .collect(); + Ok(arrow_select::filter::filter_record_batch(&batch, &mask)?) + }); + Box::pin(RecordBatchStreamAdapter::new(schema, filtered)) +} + fn wrap_bound(bound: &Bound) -> Bound { match bound { Bound::Unbounded => Bound::Unbounded, @@ -1570,11 +1594,12 @@ impl ScalarIndex for BTreeIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { // Merge the existing index data with the new data and then retrain the index on the merged stream let merged_data_source = self .clone() - .combine_old_new(new_data, self.batch_size) + .combine_old_new(new_data, self.batch_size, valid_old_fragments.cloned()) .await?; train_btree_index(merged_data_source, dest_store, self.batch_size, None, None).await?; @@ -3984,7 +4009,7 @@ mod tests { // update the ranged index ranged_index - .update(update_data_source, new_store.as_ref()) + .update(update_data_source, new_store.as_ref(), None) .await .expect("Error in updating ranged index"); diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index dab83131f8a..59079855165 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -624,6 +624,7 @@ impl ScalarIndex for InvertedIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + _valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { self.to_builder().update(new_data, dest_store).await?; diff --git a/rust/lance-index/src/scalar/json.rs b/rust/lance-index/src/scalar/json.rs index 82501444291..4f32d68828f 100644 --- a/rust/lance-index/src/scalar/json.rs +++ b/rust/lance-index/src/scalar/json.rs @@ -138,8 +138,12 @@ impl ScalarIndex for JsonIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { - let target_created = self.target_index.update(new_data, dest_store).await?; + let target_created = self + .target_index + .update(new_data, dest_store, valid_old_fragments) + .await?; let json_details = crate::pb::JsonIndexDetails { path: self.path.clone(), target_details: Some(target_created.index_details), diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index 0cfd00d4866..0ea3677aecf 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -207,9 +207,10 @@ impl ScalarIndex for LabelListIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { self.values_index - .update(unnest_chunks(new_data)?, dest_store) + .update(unnest_chunks(new_data)?, dest_store, valid_old_fragments) .await?; Ok(CreatedIndex { diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 817fb803c64..3f5f6ac5ca7 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -472,6 +472,7 @@ pub mod tests { .update( lance_datafusion::utils::reader_to_stream(Box::new(data)), updated_index_store.as_ref(), + None, ) .await .unwrap(); @@ -1290,6 +1291,7 @@ pub mod tests { .update( lance_datafusion::utils::reader_to_stream(Box::new(data)), updated_index_store.as_ref(), + None, ) .await .unwrap(); diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index 4d4c0bfeef2..0fd0a0bd35e 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -522,6 +522,7 @@ impl ScalarIndex for NGramIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + _valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { let mut builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions::default())?; let spill_files = builder.train(new_data).await?; @@ -1620,7 +1621,7 @@ mod tests { Arc::new(LanceCache::no_cache()), )); - index.update(data, test_store.as_ref()).await.unwrap(); + index.update(data, test_store.as_ref(), None).await.unwrap(); let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache()) .await @@ -1699,7 +1700,7 @@ mod tests { Arc::new(LanceCache::no_cache()), )); - index.update(data, test_store.as_ref()).await.unwrap(); + index.update(data, test_store.as_ref(), None).await.unwrap(); let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache()) .await diff --git a/rust/lance-index/src/scalar/rtree.rs b/rust/lance-index/src/scalar/rtree.rs index 3f36ee399ab..0441385cd4b 100644 --- a/rust/lance-index/src/scalar/rtree.rs +++ b/rust/lance-index/src/scalar/rtree.rs @@ -511,6 +511,7 @@ impl ScalarIndex for RTreeIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + _valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { let bbox_data = RTreeIndexPlugin::convert_bbox_stream(new_data)?; let tmpdir = Arc::new(TempDir::default()); @@ -1172,7 +1173,7 @@ mod tests { Arc::new(UInt64Array::from(new_rowaddr_arr.clone())), ); rtree_index - .update(stream, new_store.as_ref()) + .update(stream, new_store.as_ref(), None) .await .unwrap(); diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index b631ba89d48..69b9379c0c0 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -565,6 +565,7 @@ impl ScalarIndex for ZoneMapIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + _valid_old_fragments: Option<&RoaringBitmap>, ) -> Result { // Train new zones for the incoming data stream let schema = new_data.schema(); @@ -1111,7 +1112,7 @@ mod tests { // Directly pass the stream with proper row addresses instead of using MockTrainingSource // which would regenerate row addresses starting from 0 index - .update(new_data_stream, test_store.as_ref()) + .update(new_data_stream, test_store.as_ref(), None) .await .unwrap(); diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index aa35f1b6408..6c522f202dc 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -8,9 +8,12 @@ use crate::dataset::optimize::{compact_files, CompactionOptions}; use crate::dataset::transaction::{DataReplacementGroup, Operation}; use crate::dataset::WriteDestination; use crate::dataset::ROW_ID; -use crate::dataset::{AutoCleanupParams, ProjectionRequest}; +use crate::dataset::{AutoCleanupParams, MergeInsertBuilder, ProjectionRequest}; use crate::{Dataset, Error}; use lance_core::ROW_ADDR; +use lance_index::optimize::OptimizeOptions; +use lance_index::scalar::ScalarIndexParams; +use lance_index::{DatasetIndexExt, IndexType}; use mock_instant::thread_local::MockClock; use crate::dataset::write::{InsertBuilder, WriteMode, WriteParams}; @@ -25,12 +28,14 @@ use arrow_array::{Array, LargeBinaryArray, StructArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use lance_arrow::BLOB_META_KEY; use lance_core::utils::tempfile::{TempDir, TempStrDir}; +use lance_datafusion::utils::reader_to_stream; use lance_datagen::{array, gen_batch, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; use lance_file::writer::FileWriter; use lance_io::utils::CachedFileSize; use lance_table::format::DataFile; +use crate::dataset::write::merge_insert::{WhenMatched, WhenNotMatched}; use futures::TryStreamExt; use lance_datafusion::datagen::DatafusionDatagenExt; use object_store::path::Path; @@ -1490,3 +1495,213 @@ async fn test_issue_4429_nested_struct_encoding_v2_1_with_over_65k_structs() { dataset.validate().await.unwrap(); assert_eq!(dataset.count_rows(None).await.unwrap(), 3); } + +/// Regression test for https://github.com/lancedb/lance/issues/5321 +/// +/// merge_insert with reordered columns triggers the RewriteColumns path, +/// which prunes the index bitmap. After compact + optimize_indices, the old +/// stale B-tree data was being merged back in, causing "non-existent fragment" +/// errors on subsequent queries. +#[tokio::test] +async fn test_merge_insert_with_reordered_columns_and_index() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("value", DataType::Utf8, true), + ])); + + // Step 1: Create dataset with one row {id: 1, value: "a"} + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![0, 1])), + Arc::new(StringArray::from(vec!["x", "a"])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + "memory://test_5321", + Some(WriteParams { + max_rows_per_file: 1, // Force multiple fragments for testing + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 2: Create BTree index on 'id' + dataset + .create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Step 3: merge_insert with reversed column order (value, id) + // This triggers the RewriteColumns path, which prunes the index bitmap + let reversed_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("value", DataType::Utf8, true), + ArrowField::new("id", DataType::Int32, false), + ])); + let source_batch = RecordBatch::try_new( + reversed_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["b", "c"])), + Arc::new(Int32Array::from(vec![1, 2])), + ], + ) + .unwrap(); + + let merge_job = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let reader = Box::new(RecordBatchIterator::new( + vec![Ok(source_batch)], + reversed_schema.clone(), + )); + let (dataset, _stats) = merge_job.execute(reader_to_stream(reader)).await.unwrap(); + let mut dataset = dataset.as_ref().clone(); + + // Step 4: compact_files + compact_files(&mut dataset, CompactionOptions::default(), None) + .await + .unwrap(); + + // Step 5: optimize_indices + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + // Step 6: Another merge_insert should NOT error + let source_batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["d"])), + ], + ) + .unwrap(); + + let merge_job2 = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let reader2 = Box::new(RecordBatchIterator::new( + vec![Ok(source_batch2)], + schema.clone(), + )); + let (final_dataset, _) = merge_job2.execute(reader_to_stream(reader2)).await.unwrap(); + final_dataset.validate().await.unwrap(); +} + +/// DataReplacement should invalidate index fragment bitmaps for replaced fields. +#[tokio::test] +async fn test_data_replacement_invalidates_index_bitmap() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])); + + // Create dataset with 2 columns + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, "memory://test_replacement_idx", None) + .await + .unwrap(); + + // Create scalar index on column 'a' + dataset + .create_index( + &["a"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Verify fragment 0 is in the index bitmap + let indices = dataset.load_indices().await.unwrap(); + let a_index = indices.iter().find(|idx| idx.name == "a_idx").unwrap(); + assert!(a_index.fragment_bitmap.as_ref().unwrap().contains(0)); + + // Write a replacement data file for column 'a' + let single_col_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])); + let replacement_batch = RecordBatch::try_new( + single_col_schema.clone(), + vec![Arc::new(Int32Array::from(vec![4, 5, 6]))], + ) + .unwrap(); + + let object_writer = dataset + .object_store + .create(&Path::from("data/replacement.lance")) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + single_col_schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + writer.write_batch(&replacement_batch).await.unwrap(); + writer.finish().await.unwrap(); + + // Build replacement DataFile matching the existing data file for column 'a' + let frag = dataset.get_fragment(0).unwrap(); + let data_file = frag.data_file_for_field(0).unwrap(); + let mut new_data_file = data_file.clone(); + new_data_file.path = "replacement.lance".to_string(); + + // Commit DataReplacement + let read_version = dataset.version().version; + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, new_data_file)], + }, + Some(read_version), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + // The index bitmap for 'a' should no longer contain fragment 0 + let indices = dataset.load_indices().await.unwrap(); + let a_index = indices.iter().find(|idx| idx.name == "a_idx").unwrap(); + let effective = a_index + .effective_fragment_bitmap(&dataset.fragment_bitmap) + .unwrap(); + assert!( + !effective.contains(0), + "Fragment 0 should be removed from index bitmap after DataReplacement on indexed column" + ); +} diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 7c2509e51fd..5571a72e916 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -2059,6 +2059,18 @@ impl Transaction { let existing_fragments = maybe_existing_fragments?; + // Collect replaced field IDs before consuming new_datafiles + let replaced_fields: Vec = new_datafiles + .first() + .map(|f| { + f.fields + .iter() + .filter(|&&id| id >= 0) + .map(|&id| id as u32) + .collect() + }) + .unwrap_or_default(); + // 2. check that the fragments being modified have isomorphic layouts along the columns being replaced // 3. add modified fragments to final_fragments for (frag_id, new_file) in old_fragment_ids.iter().zip(new_datafiles) { @@ -2128,6 +2140,19 @@ impl Transaction { .collect::>(); final_fragments.extend(unmodified_fragments); + + // 5. Invalidate index bitmaps for replaced fields + let modified_fragments: Vec = final_fragments + .iter() + .filter(|f| fragments_changed.contains(&f.id)) + .cloned() + .collect(); + + Self::prune_updated_fields_from_indices( + &mut final_indices, + &modified_fragments, + &replaced_fields, + ); } Operation::UpdateMemWalState { merged_generations } => { update_mem_wal_index_merged_generations( @@ -2399,9 +2424,9 @@ impl Transaction { || is_system_index(existing_index) }); - // Fragment bitmaps are now immutable and always represent the fragments that - // the index contains row IDs for, regardless of whether those fragments still exist. - // This ensures consistent prefiltering behavior and clear semantics. + // Fragment bitmaps record which fragments the index was originally built for. + // Operations like updates and data replacement prune these bitmaps, and + // effective_fragment_bitmap intersects with existing fragments at query time. // Apply retention logic for indices with empty bitmaps per index name // (except for fragment reuse indices which are always kept) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index e488a2f2439..ad4798330af 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -3913,7 +3913,7 @@ mod tests { } else { let id_index = id_index.unwrap(); let id_frags_bitmap = RoaringBitmap::from_iter(id_frags.iter().copied()); - // Fragment bitmaps are now immutable, so we check the effective bitmap + // Check the effective bitmap (raw bitmap intersected with existing fragments) let effective_bitmap = id_index .effective_fragment_bitmap(&dataset.fragment_bitmap) .unwrap(); @@ -3930,7 +3930,7 @@ mod tests { } else { let value_index = value_index.unwrap(); let value_frags_bitmap = RoaringBitmap::from_iter(value_frags.iter().copied()); - // Fragment bitmaps are now immutable, so we check the effective bitmap + // Check the effective bitmap (raw bitmap intersected with existing fragments) let effective_bitmap = value_index .effective_fragment_bitmap(&dataset.fragment_bitmap) .unwrap(); @@ -3943,10 +3943,8 @@ mod tests { .unwrap() .unwrap(); - // With immutable fragment bitmaps, the other_value index behavior is: - // - Its fragment bitmap is never updated (it retains the original [0,1,2,3]) - // - The effective bitmap reflects what fragments are still valid for the index - // - For partial merges that don't include other_value, the index remains fully valid + // The other_value index retains its original bitmap [0,1,2,3] since + // partial merges that don't modify other_value won't prune it. let effective_bitmap = other_value_index .effective_fragment_bitmap(&dataset.fragment_bitmap) .unwrap(); diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 9657f144dc8..e04796cad34 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -110,11 +110,16 @@ pub async fn merge_indices_with_unindexed_frags<'a>( let index_type = indices[0].index_type(); let (new_uuid, indices_merged, created_index) = match index_type { it if it.is_scalar() => { - // There are no delta indices for scalar, so adding all indexed - // fragments to the new index. - old_indices.iter().for_each(|idx| { - frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); - }); + // Use effective bitmap (intersected with existing dataset fragments) + // to avoid carrying stale data from pruned indices. + let effective_old_frags: RoaringBitmap = old_indices + .iter() + .filter_map(|idx| idx.effective_fragment_bitmap(&dataset.fragment_bitmap)) + .fold(RoaringBitmap::new(), |mut acc, b| { + acc |= &b; + acc + }); + frag_bitmap |= &effective_old_frags; let index = dataset .open_scalar_index( @@ -143,8 +148,27 @@ pub async fn merge_indices_with_unindexed_frags<'a>( let new_uuid = Uuid::new_v4(); - let new_store = LanceIndexStore::from_dataset_for_new(&dataset, &new_uuid.to_string())?; - let created_index = index.update(new_data_stream, &new_store).await?; + let created_index = if effective_old_frags.is_empty() { + // Old data is fully stale (bitmap pruned to empty). Rebuild + // from scratch instead of merging stale entries. + let params = index.derive_index_params()?; + super::scalar::build_scalar_index( + dataset.as_ref(), + column.name.as_str(), + &new_uuid.to_string(), + ¶ms, + true, + None, + Some(new_data_stream), + ) + .await? + } else { + let new_store = + LanceIndexStore::from_dataset_for_new(&dataset, &new_uuid.to_string())?; + index + .update(new_data_stream, &new_store, Some(&effective_old_frags)) + .await? + }; // TODO: don't hard-code index version Ok((new_uuid, 1, created_index)) @@ -202,7 +226,9 @@ pub async fn merge_indices_with_unindexed_frags<'a>( let removed_indices = old_indices[old_indices.len() - indices_merged..].to_vec(); for removed in removed_indices.iter() { - frag_bitmap |= removed.fragment_bitmap.as_ref().unwrap(); + if let Some(effective) = removed.effective_fragment_bitmap(&dataset.fragment_bitmap) { + frag_bitmap |= &effective; + } } Ok(Some(IndexMergeResults {