diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index d46140020f0..69a79a8f2b8 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -215,6 +215,7 @@ impl FromJObjectWithEnv for JObject<'_> { created_at, base_id, files: None, + invalidated_fragment_bitmap: None, }) } } diff --git a/protos/table.proto b/protos/table.proto index e73d22b6b93..56fd33f3152 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -286,6 +286,36 @@ message IndexMetadata { // of index sizes without extra IO. // If this is empty, the index files sizes are unknown. repeated IndexFile files = 10; + + // A bitmap of fragment IDs that were removed from fragment_bitmap because + // the indexed column values changed in those fragments but the index data + // was not rewritten to reflect the change. The index data still contains + // stale entries for these fragments and readers must mask out any index + // results from these fragments when querying the index. + // + // Several operations can introduce invalidated fragments: + // + // - A partial-schema merge_insert modifies indexed column values in a + // fragment without rewriting the index. + // - A data replacement operation changes column values without + // rewriting the index. + // - Some index types do not rewrite the full index during incremental + // updates (e.g. the FTS inverted index), so stale entries for + // replaced fragments may persist after an optimize. + // + // Future operations that modify indexed data without a full index + // rebuild may also add entries here. + // + // When absent, there are no invalidated fragments. Always cleared when + // the index is fully rebuilt. An optimize operation may or may not clear + // this depending on the index type — index types that rewrite all data + // during optimize (e.g. btree) will clear it, while index types that + // perform incremental merges (e.g. FTS) may retain invalidated + // fragments that were not reprocessed. + // + // The bitmap is stored as a 32-bit Roaring bitmap (same encoding as + // fragment_bitmap). + bytes invalidated_fragment_bitmap = 11; } // Metadata about a single file within an index segment. diff --git a/python/src/indices.rs b/python/src/indices.rs index 62f3c0c64ec..c774f1d69de 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -448,6 +448,7 @@ async fn do_load_shuffled_vectors( created_at: Some(Utc::now()), base_id: None, files: Some(files), + invalidated_fragment_bitmap: None, }; ds.commit_existing_index_segments(index_name, column, vec![metadata]) .await diff --git a/python/src/transaction.rs b/python/src/transaction.rs index eae5b49a15d..df57404ba40 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -102,6 +102,7 @@ impl FromPyObject<'_> for PyLance { created_at, base_id, files, + invalidated_fragment_bitmap: None, })) } } diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 945d8364123..38af7fe3413 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -76,6 +76,16 @@ pub struct IndexMetadata { /// This is None if the file sizes are unknown. This happens for indices created /// before this field was added. pub files: Option>, + + /// Fragment IDs that were removed from `fragment_bitmap` because indexed + /// column values changed but the index data was not rewritten. The index + /// still contains stale entries for these fragments; readers must treat + /// all rows from them as deleted when querying the index. + /// + /// `None` means no fragments have been invalidated. Always cleared on a + /// full index rebuild. An optimize may or may not clear this depending + /// on the index type (see proto comment for details). + pub invalidated_fragment_bitmap: Option, } impl IndexMetadata { @@ -132,6 +142,11 @@ impl DeepSizeOf for IndexMetadata { .map(|fragment_bitmap| fragment_bitmap.serialized_size()) .unwrap_or(0) + self.files.deep_size_of_children(context) + + self + .invalidated_fragment_bitmap + .as_ref() + .map(|bitmap| bitmap.serialized_size()) + .unwrap_or(0) } } @@ -162,6 +177,14 @@ impl TryFrom for IndexMetadata { ) }; + let invalidated_fragment_bitmap = if proto.invalidated_fragment_bitmap.is_empty() { + None + } else { + Some(RoaringBitmap::deserialize_from( + &mut proto.invalidated_fragment_bitmap.as_slice(), + )?) + }; + Ok(Self { uuid: proto.uuid.as_ref().map(Uuid::try_from).ok_or_else(|| { Error::invalid_input("uuid field does not exist in Index metadata".to_string()) @@ -178,20 +201,18 @@ impl TryFrom for IndexMetadata { }), base_id: proto.base_id, files, + invalidated_fragment_bitmap, }) } } -impl From<&IndexMetadata> for pb::IndexMetadata { - fn from(idx: &IndexMetadata) -> Self { +impl TryFrom<&IndexMetadata> for pb::IndexMetadata { + type Error = Error; + + fn try_from(idx: &IndexMetadata) -> Result { let mut fragment_bitmap = Vec::new(); - if let Some(bitmap) = &idx.fragment_bitmap - && let Err(e) = bitmap.serialize_into(&mut fragment_bitmap) - { - // In theory, this should never error. But if we do, just - // recover gracefully. - log::error!("Failed to serialize fragment bitmap: {}", e); - fragment_bitmap.clear(); + if let Some(bitmap) = &idx.fragment_bitmap { + bitmap.serialize_into(&mut fragment_bitmap)?; } let files = idx @@ -208,7 +229,12 @@ impl From<&IndexMetadata> for pb::IndexMetadata { }) .unwrap_or_default(); - Self { + let mut invalidated_fragment_bitmap = Vec::new(); + if let Some(bitmap) = &idx.invalidated_fragment_bitmap { + bitmap.serialize_into(&mut invalidated_fragment_bitmap)?; + } + + Ok(Self { uuid: Some((&idx.uuid).into()), name: idx.name.clone(), fields: idx.fields.clone(), @@ -222,7 +248,8 @@ impl From<&IndexMetadata> for pb::IndexMetadata { created_at: idx.created_at.map(|dt| dt.timestamp_millis() as u64), base_id: idx.base_id, files, - } + invalidated_fragment_bitmap, + }) } } @@ -244,7 +271,10 @@ fn serialize_index_metadata( .downcast_ref::>() .expect("index_metadata_codec: wrong type (this is a bug in the cache layer)"); let section = pb::IndexSection { - indices: vec.iter().map(pb::IndexMetadata::from).collect(), + indices: vec + .iter() + .map(pb::IndexMetadata::try_from) + .collect::>()?, }; writer.write_all(§ion.encode_to_vec())?; Ok(()) @@ -319,6 +349,7 @@ mod tests { path: "index.idx".to_string(), size_bytes: 1024, }]), + invalidated_fragment_bitmap: Some(RoaringBitmap::from_iter([4, 5])), }, IndexMetadata { uuid: Uuid::new_v4(), @@ -331,6 +362,7 @@ mod tests { created_at: None, base_id: Some(7), files: None, + invalidated_fragment_bitmap: None, }, ]; diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 7df1d656263..14a8c34d933 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -143,7 +143,10 @@ async fn do_write_manifest( // Write indices if presented. if let Some(indices) = indices.as_ref() { let section = pb::IndexSection { - indices: indices.iter().map(|i| i.into()).collect(), + indices: indices + .iter() + .map(pb::IndexMetadata::try_from) + .collect::>()?, }; let pos = writer.write_protobuf(§ion).await?; manifest.index_section = Some(pos); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b45aee6b814..3012a1dfec4 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3196,7 +3196,7 @@ pub(crate) async fn write_manifest_file( object_store, write_manifest_file_to_path, naming_scheme, - transaction.take().map(|tx| tx.into()), + transaction.take().map(|tx| tx.try_into()).transpose()?, ) .await } diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 0c6b78253b9..86ac50787a1 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1582,6 +1582,7 @@ mod tests { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, } } diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index 4a46ed062c0..8e8a3dafc50 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -477,6 +477,7 @@ impl MemTableFlusher { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, }; // Commit the index to the dataset @@ -721,6 +722,7 @@ impl MemTableFlusher { created_at: Some(chrono::Utc::now()), index_version: 1, files: None, + invalidated_fragment_bitmap: None, }; Ok(index_meta) diff --git a/rust/lance/src/dataset/optimize/remapping.rs b/rust/lance/src/dataset/optimize/remapping.rs index dab62bf6166..a3b5ac80e94 100644 --- a/rust/lance/src/dataset/optimize/remapping.rs +++ b/rust/lance/src/dataset/optimize/remapping.rs @@ -286,6 +286,9 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { created_at: curr_index_meta.created_at, base_id: None, files: curr_index_meta.files.clone(), + invalidated_fragment_bitmap: curr_index_meta + .invalidated_fragment_bitmap + .clone(), }, RemapResult::Remapped(remapped_index) => IndexMetadata { uuid: remapped_index.new_id, @@ -298,6 +301,9 @@ async fn remap_index(dataset: &mut Dataset, index_id: &Uuid) -> Result<()> { created_at: curr_index_meta.created_at, base_id: None, files: remapped_index.files, + invalidated_fragment_bitmap: curr_index_meta + .invalidated_fragment_bitmap + .clone(), }, }; diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index 371e8769c8a..ff0a263f39f 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -1718,6 +1718,430 @@ async fn test_data_replacement_invalidates_index_bitmap() { "Fragment 0 should be removed from index bitmap after DataReplacement on indexed column" ); } + +/// DataReplacement on an indexed column should remove the fragment from +/// fragment_bitmap AND add it to invalidated_fragment_bitmap so that +/// stale index entries are blocked at query time. +#[tokio::test] +async fn test_data_replacement_populates_invalidated_bitmap() { + use lance_file::writer::FileWriter; + use object_store::path::Path; + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("value", DataType::Int32, true), + ])); + + // Create dataset with one fragment + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, "memory://test_replacement_invalidated", None) + .await + .unwrap(); + + // Create BTree index on 'value' + dataset + .create_index( + &["value"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + // Verify initial state: fragment 0 in bitmap, no invalidated fragments + let indices = dataset.load_indices().await.unwrap(); + let idx = indices.iter().find(|i| i.name == "value_idx").unwrap(); + assert!(idx.fragment_bitmap.as_ref().unwrap().contains(0)); + assert!(idx.invalidated_fragment_bitmap.is_none()); + + // Write a replacement data file for column 'value' + let value_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "value", + DataType::Int32, + true, + )])); + let replacement_batch = RecordBatch::try_new( + value_schema.clone(), + vec![Arc::new(Int32Array::from(vec![40, 50, 60]))], + ) + .unwrap(); + + let object_writer = dataset + .object_store + .create(&Path::from("data/replacement_inv.lance")) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + value_schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + writer.write_batch(&replacement_batch).await.unwrap(); + writer.finish().await.unwrap(); + + // Build replacement DataFile + let frag = dataset.get_fragment(0).unwrap(); + let lance_schema: lance_core::datatypes::Schema = schema.as_ref().try_into().unwrap(); + let value_field_id = lance_schema.field("value").unwrap().id; + let data_file = frag.data_file_for_field(value_field_id as u32).unwrap(); + let mut new_data_file = data_file.clone(); + new_data_file.path = "replacement_inv.lance".to_string(); + + // Commit DataReplacement + let read_version = dataset.version().version; + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, new_data_file)], + }, + Some(read_version), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + // Check: fragment 0 removed from fragment_bitmap + let indices = dataset.load_indices().await.unwrap(); + let idx = indices.iter().find(|i| i.name == "value_idx").unwrap(); + assert!( + !idx.fragment_bitmap.as_ref().unwrap().contains(0), + "Fragment 0 should be removed from fragment_bitmap" + ); + + // Check: fragment 0 added to invalidated_fragment_bitmap + assert!( + idx.invalidated_fragment_bitmap + .as_ref() + .expect("invalidated_fragment_bitmap should be Some") + .contains(0), + "Fragment 0 should be in invalidated_fragment_bitmap" + ); +} + +/// Regression test (lance-format/lance#6283): after in-place update via +/// DataReplacement, stale FTS index entries for the replaced fragment must +/// be blocked at query time so searches reflect the new data. +#[tokio::test] +async fn test_fts_stale_entries_after_data_replacement() { + use lance_index::scalar::{FullTextSearchQuery, inverted::InvertedIndexParams}; + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("text", DataType::Utf8, true), + ])); + + // Step 1: Create dataset with 2 rows in separate fragments + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![0, 1])), + Arc::new(StringArray::from(vec![ + "the quick brown fox", + "the lazy dog", + ])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + "memory://test_fts_incremental_reindex", + Some(WriteParams { + max_rows_per_file: 1, // Force 2 fragments + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 2: Create FTS inverted index on 'text' + let params = InvertedIndexParams::default(); + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + // Sanity check: "quick" and "lazy" should each return 1 result + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("quick".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("lazy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + + // Step 3: Replace fragment 1's data file via DataReplacement. + // The fragment ID stays the same, but the text changes from + // "the lazy dog" to "a speedy cat". This prunes fragment 1 + // from the FTS index's fragment_bitmap. + let frag1 = dataset.get_fragment(1).unwrap(); + let old_data_file = frag1.metadata().files[0].clone(); + + // Write replacement data file with updated text + let replacement_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["a speedy cat"])), + ], + ) + .unwrap(); + let replacement_path = dataset.data_dir().child("replacement.lance"); + let object_writer = dataset + .object_store + .create(&replacement_path) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + writer.write_batch(&replacement_batch).await.unwrap(); + writer.finish().await.unwrap(); + + let mut new_data_file = old_data_file.clone(); + new_data_file.path = "replacement.lance".to_string(); + + let read_version = dataset.manifest.version; + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(1, new_data_file)], + }, + Some(read_version), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + // Verify the replacement worked — fragment 1 now has the new text + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 2); + + // Step 4: FTS search should reflect the new data, not the old. + // Fragment 1 is unindexed (pruned from fragment_bitmap), so the + // scanner does a flat FTS scan on it and uses the index for fragment 0. + + // "speedy" is in the new text for fragment 1 — found via flat scan. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("speedy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + + // "lazy" was in the OLD text for fragment 1. The index has stale + // posting entries for it, but fragment 1 was pruned from the + // fragment_bitmap. Flat scan of fragment 1 sees "a speedy cat" + // which doesn't match. So 0 results. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("lazy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 0, + "Expected 0 results for 'lazy' (stale data, fragment 1 pruned from index)" + ); + + // "quick" is in fragment 0 which is still indexed — should still work. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("quick".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); +} + +/// Same scenario as test_fts_index_incremental_reindex_after_in_place_update +/// but with a vector (IVF_PQ) index instead of FTS. +#[tokio::test] +async fn test_vector_index_after_data_replacement() { + use arrow_array::FixedSizeListArray; + use lance_arrow::FixedSizeListArrayExt; + use lance_index::vector::{ivf::IvfBuildParams, pq::PQBuildParams}; + use lance_testing::datagen::generate_random_array; + + const DIM: usize = 32; + const ROWS_PER_FRAG: usize = 256; + const TOTAL: usize = ROWS_PER_FRAG * 2; + + let fsl_field = ArrowField::new( + "vector", + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float32, true)), + DIM as i32, + ), + true, + ); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + fsl_field, + ])); + + // Step 1: Create dataset with TOTAL rows in 2 fragments. + let vectors = generate_random_array(TOTAL * DIM); + let vector_array = + Arc::new(FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap()); + let ids: Vec = (0..TOTAL as i32).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(ids)), vector_array.clone()], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + "memory://test_vec_data_replacement", + Some(WriteParams { + max_rows_per_file: ROWS_PER_FRAG, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 2); + + // Step 2: Create IVF_PQ vector index + let ivf_params = IvfBuildParams::new(2); + let pq_params = PQBuildParams { + num_sub_vectors: 1, + ..Default::default() + }; + let params = crate::index::vector::VectorIndexParams::with_ivf_pq_params( + lance_linalg::distance::MetricType::L2, + ivf_params, + pq_params, + ); + dataset + .create_index(&["vector"], IndexType::Vector, None, ¶ms, true) + .await + .unwrap(); + + // Sanity: nearest to all-zeros query with refine should return + // results from both fragments. + let query_zeros = Float32Array::from(vec![0.0_f32; DIM]); + let results = dataset + .scan() + .nearest("vector", &query_zeros, 10) + .unwrap() + .refine(10) + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 10); + + // Step 3: DataReplacement — replace fragment 1's data. + // Write all-999.0 vectors so they are very far from origin. + let frag1 = dataset.get_fragment(1).unwrap(); + let frag1_id = frag1.id() as u64; + let old_data_file = frag1.metadata().files[0].clone(); + + let far_values: Vec = vec![999.0_f32; ROWS_PER_FRAG * DIM]; + let far_vectors = Float32Array::from(far_values); + let far_vector_array = + FixedSizeListArray::try_new_from_values(far_vectors, DIM as i32).unwrap(); + let replacement_ids: Vec = (ROWS_PER_FRAG as i32..(TOTAL as i32)).collect(); + let replacement_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(replacement_ids)), + Arc::new(far_vector_array), + ], + ) + .unwrap(); + + let replacement_path = dataset.data_dir().child("replacement.lance"); + let object_writer = dataset + .object_store + .create(&replacement_path) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + writer.write_batch(&replacement_batch).await.unwrap(); + writer.finish().await.unwrap(); + + let mut new_data_file = old_data_file.clone(); + new_data_file.path = "replacement.lance".to_string(); + + let read_version = dataset.manifest.version; + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(frag1_id, new_data_file)], + }, + Some(read_version), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + // Step 4: Search — nearest to all-zeros WITHOUT refine. + // Fragment 1's vectors are now all-999.0 (very far from origin). + // The top-10 nearest should all come from fragment 0. + // If stale index entries leak through, results from fragment 1 + // would appear with their old (closer) PQ-approximated distances. + let results = dataset + .scan() + .nearest("vector", &query_zeros, 10) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let ids = results["id"].as_any().downcast_ref::().unwrap(); + for i in 0..results.num_rows() { + assert!( + (ids.value(i) as usize) < ROWS_PER_FRAG, + "Result {} has id={} which is from fragment 1 (stale index entry leaked through)", + i, + ids.value(i) + ); + } +} + /// Regression test: inverted (FTS) index should not carry stale data after /// merge_insert + compact + optimize_indices. /// diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 62f30696234..47fd132dfbd 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -292,12 +292,14 @@ impl std::fmt::Display for Operation { } } -impl From<&Transaction> for lance_table::format::Transaction { - fn from(value: &Transaction) -> Self { - let pb_transaction: pb::Transaction = value.into(); - Self { +impl TryFrom<&Transaction> for lance_table::format::Transaction { + type Error = lance_core::Error; + + fn try_from(value: &Transaction) -> lance_core::Result { + let pb_transaction: pb::Transaction = value.try_into()?; + Ok(Self { inner: pb_transaction, - } + }) } } @@ -2329,7 +2331,14 @@ impl Transaction { && let Some(fragment_bitmap) = &mut index.fragment_bitmap { for fragment_id in updated_fragments.iter().map(|f| f.id as u32) { - fragment_bitmap.remove(fragment_id); + if fragment_bitmap.remove(fragment_id) { + // The fragment was indexed but is now stale. Track it so + // query-time deletion masks can block rows from it. + index + .invalidated_fragment_bitmap + .get_or_insert_with(RoaringBitmap::new) + .insert(fragment_id); + } } } } @@ -3076,8 +3085,10 @@ impl TryFrom for RewriteGroup { } } -impl From<&Transaction> for pb::Transaction { - fn from(value: &Transaction) -> Self { +impl TryFrom<&Transaction> for pb::Transaction { + type Error = lance_core::Error; + + fn try_from(value: &Transaction) -> lance_core::Result { let operation = match &value.operation { Operation::Append { fragments } => { pb::transaction::Operation::Append(pb::transaction::Append { @@ -3158,11 +3169,14 @@ impl From<&Transaction> for pb::Transaction { new_indices, removed_indices, } => pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex { - new_indices: new_indices.iter().map(pb::IndexMetadata::from).collect(), + new_indices: new_indices + .iter() + .map(pb::IndexMetadata::try_from) + .collect::>()?, removed_indices: removed_indices .iter() - .map(pb::IndexMetadata::from) - .collect(), + .map(pb::IndexMetadata::try_from) + .collect::>()?, }), Operation::Merge { fragments, schema } => { pb::transaction::Operation::Merge(pb::transaction::Merge { @@ -3269,13 +3283,13 @@ impl From<&Transaction> for pb::Transaction { .as_ref() .map(|arc| arc.as_ref().clone()) .unwrap_or_default(); - Self { + Ok(Self { read_version: value.read_version, uuid: value.uuid.clone(), operation: Some(operation), tag: value.tag.clone().unwrap_or("".to_string()), transaction_properties, - } + }) } } @@ -3514,6 +3528,7 @@ mod tests { created_at: Some(Utc::now()), base_id: None, files: None, + invalidated_fragment_bitmap: None, } } @@ -4021,6 +4036,7 @@ mod tests { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, } } @@ -4043,6 +4059,7 @@ mod tests { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, } } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 8ea25c7285f..afdc4f23bc4 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -2297,7 +2297,7 @@ mod tests { use lance_datafusion::{datagen::DatafusionDatagenExt, utils::reader_to_stream}; use lance_datagen::{BatchCount, Dimension, RowCount, Seed, array}; use lance_index::IndexType; - use lance_index::scalar::ScalarIndexParams; + use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams}; use lance_io::object_store::ObjectStoreParams; use lance_linalg::distance::MetricType; use mock_instant::thread_local::MockClock; @@ -7658,4 +7658,676 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n } } } + + /// Creates a 3-fragment dataset (100 rows each) with columns (id: Utf8, category: Utf8, + /// value_a: Float64, value_b: Float64) and a BTree index on `id`. + /// + /// Fragment 0: id-0000..id-0099 + /// Fragment 1: id-0100..id-0199 + /// Fragment 2: id-0200..id-0299 + async fn create_indexed_3frag_dataset() -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new("value_a", DataType::Float64, false), + Field::new("value_b", DataType::Float64, false), + ])); + + let make_batch = |frag_idx: usize| { + let start = frag_idx * 100; + let ids: Vec = (start..start + 100).map(|j| format!("id-{j:04}")).collect(); + let categories: Vec<&str> = vec!["A"; 100]; + let value_a: Vec = (0..100) + .map(|i| i as f64 + frag_idx as f64 * 100.0) + .collect(); + let value_b: Vec = (0..100).map(|i| i as f64 * 0.1).collect(); + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(StringArray::from(categories)), + Arc::new(Float64Array::from(value_a)), + Arc::new(Float64Array::from(value_b)), + ], + ) + .unwrap() + }; + + // Write first fragment + let batch0 = make_batch(0); + let reader = Box::new(RecordBatchIterator::new([Ok(batch0)], schema.clone())); + let mut ds = Dataset::write(reader, "memory://indexed_3frag", None) + .await + .unwrap(); + + // Append fragments 1 and 2 + for frag_idx in 1..3 { + let batch = make_batch(frag_idx); + let reader = Box::new(RecordBatchIterator::new([Ok(batch)], schema.clone())); + ds.append(reader, None).await.unwrap(); + } + + // Create BTree index on id + ds.create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + Arc::new(ds) + } + + /// Perform a partial-schema merge_insert (only id + value_a) targeting specific id ranges. + /// This causes touched fragments to drop from the index bitmap while btree data retains + /// stale entries. + async fn partial_merge_insert( + dataset: Arc, + id_range: std::ops::Range, + value_a_val: f64, + ) -> Arc { + let ids: Vec = id_range.map(|j| format!("id-{j:04}")).collect(); + let n = ids.len(); + let sub_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("value_a", DataType::Float64, false), + ])); + let batch = RecordBatch::try_new( + sub_schema.clone(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(Float64Array::from(vec![value_a_val; n])), + ], + ) + .unwrap(); + let reader = Box::new(RecordBatchIterator::new([Ok(batch)], sub_schema)); + + let (ds, _) = MergeInsertBuilder::try_new(dataset, vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap() + .execute_reader(reader) + .await + .unwrap(); + ds + } + + // Regression test: partial-schema merge_insert followed by another partial merge_insert + // on the same rows should not produce "Ambiguous merge inserts" errors. + // + // The bug: the first partial merge_insert drops the touched fragment from the index bitmap + // but leaves stale btree entries. The second merge_insert finds the same rows via both + // the stale btree lookup AND the unindexed fragment scan, causing duplicates. + #[tokio::test] + async fn test_partial_merge_insert_stale_index_ambiguous() { + let dataset = create_indexed_3frag_dataset().await; + + // Step 2: Partial merge_insert on fragment 1 rows -> fragment 1 drops from bitmap + let dataset = partial_merge_insert(dataset, 100..200, 999.0).await; + + // Step 3: Another partial merge_insert on the same rows. + // This should succeed, not fail with "Ambiguous merge inserts". + let dataset = partial_merge_insert(dataset, 100..200, 888.0).await; + + // Verify correctness: all 300 rows present, updated values correct + let batches = dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let all_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new("value_a", DataType::Float64, false), + Field::new("value_b", DataType::Float64, false), + ])); + let combined = concat_batches(&all_schema, &batches).unwrap(); + assert_eq!(combined.num_rows(), 300); + + // Check the updated rows have value_a = 888.0 + let result = dataset + .scan() + .filter("id >= 'id-0100' AND id < 'id-0200'") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let result = concat_batches(&all_schema, &result).unwrap(); + assert_eq!(result.num_rows(), 100); + let values = result + .column_by_name("value_a") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..100 { + assert_eq!(values.value(i), 888.0, "row {i} should have value_a=888.0"); + } + } + + // Regression test: partial-schema merge_insert followed by update (deleting all rows + // in a fragment) followed by partial merge_insert should not produce + // "fragment id N does not exist" errors. + // + // The bug: stale btree entries reference the deleted fragment. The deletion mask doesn't + // block those addresses because the fragment isn't in the index bitmap. TakeExec tries + // to read from a non-existent fragment. + #[tokio::test] + async fn test_partial_merge_insert_stale_index_fragment_not_exist() { + let dataset = create_indexed_3frag_dataset().await; + + // Step 2: Partial merge_insert on fragment 1 rows -> fragment 1 drops from bitmap + let dataset = partial_merge_insert(dataset, 100..200, 999.0).await; + + // Step 3: Update all rows that were in fragment 1, causing fragment 1 to be + // fully deleted and replaced by a new fragment. + let update_result = crate::dataset::UpdateBuilder::new(Arc::new((*dataset).clone())) + .update_where("id >= 'id-0100' AND id < 'id-0200'") + .unwrap() + .set("category", "'B'") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + let dataset = update_result.new_dataset; + + // Step 4: Partial merge_insert on the same rows. + // This should succeed, not fail with "fragment does not exist". + let dataset = partial_merge_insert(dataset, 100..200, 888.0).await; + + // Verify correctness + let batches = dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let all_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new("value_a", DataType::Float64, false), + Field::new("value_b", DataType::Float64, false), + ])); + let combined = concat_batches(&all_schema, &batches).unwrap(); + assert_eq!(combined.num_rows(), 300); + } + + // Regression test: partial-schema merge_insert followed by update (deleting SOME rows + // in a fragment) followed by partial merge_insert should not produce + // "RecordBatch size mismatch" errors. + // + // The bug: stale btree entries reference deleted rows in a fragment that still exists. + // The deletion mask doesn't block those addresses (fragment not in bitmap). TakeExec + // reads the fragment but the rows have deletion markers, returning 0 rows where N + // were expected. + #[tokio::test] + async fn test_partial_merge_insert_stale_index_batch_size_mismatch() { + let dataset = create_indexed_3frag_dataset().await; + + // Step 2: Partial merge_insert on fragment 1 rows -> fragment 1 drops from bitmap + let dataset = partial_merge_insert(dataset, 100..200, 999.0).await; + + // Step 3: Update HALF of the rows that were in fragment 1. Fragment 1 survives + // but the updated rows are deleted from it (moved to a new fragment). + let update_result = crate::dataset::UpdateBuilder::new(Arc::new((*dataset).clone())) + .update_where("id >= 'id-0100' AND id < 'id-0150'") + .unwrap() + .set("category", "'B'") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + let dataset = update_result.new_dataset; + + // Step 4: Partial merge_insert targeting the rows that were updated (and thus + // deleted from fragment 1). Should succeed, not fail with batch size mismatch. + let dataset = partial_merge_insert(dataset, 100..150, 888.0).await; + + // Verify correctness + let batches = dataset + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let all_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new("value_a", DataType::Float64, false), + Field::new("value_b", DataType::Float64, false), + ])); + let combined = concat_batches(&all_schema, &batches).unwrap(); + assert_eq!(combined.num_rows(), 300); + } + + // Regression test: after a partial-schema merge_insert drops a fragment from the vector + // index bitmap, a vector search should not return duplicate rows. The stale vector index + // data still references the dropped fragment, and the scanner also flat-scans unindexed + // fragments, causing the same rows to appear from both paths. + #[tokio::test] + async fn test_partial_merge_insert_stale_vector_index_duplicates() { + let dim = 4i32; + let rows_per_frag = 10usize; + let num_frags = 3usize; + let total_rows = rows_per_frag * num_frags; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new( + "vec", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), dim), + false, + ), + ])); + + let make_batch = |frag_idx: usize, offset: f32| { + let start = frag_idx * rows_per_frag; + let ids: Vec = (start..start + rows_per_frag) + .map(|j| format!("id-{j:04}")) + .collect(); + let cats: Vec<&str> = vec!["A"; rows_per_frag]; + let values: Vec = (0..rows_per_frag * dim as usize) + .map(|i| (start * dim as usize + i) as f32 + offset) + .collect(); + let vectors = + FixedSizeListArray::try_new_from_values(Float32Array::from(values), dim).unwrap(); + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(StringArray::from(cats)), + Arc::new(vectors), + ], + ) + .unwrap() + }; + + // Write 3 fragments + let batch0 = make_batch(0, 0.0); + let reader = Box::new(RecordBatchIterator::new([Ok(batch0)], schema.clone())); + let mut ds = Dataset::write(reader, "memory://vector_stale_test", None) + .await + .unwrap(); + for frag_idx in 1..num_frags { + let batch = make_batch(frag_idx, 0.0); + let reader = Box::new(RecordBatchIterator::new([Ok(batch)], schema.clone())); + ds.append(reader, None).await.unwrap(); + } + + // Create IVF_FLAT vector index on vec + let params = VectorIndexParams::ivf_flat(1, MetricType::L2); + ds.create_index(&["vec"], IndexType::Vector, None, ¶ms, false) + .await + .unwrap(); + + let ds = Arc::new(ds); + + // Partial merge_insert with (id, vec) on fragment 1 rows - slightly different vectors. + // This drops fragment 1 from the vector index bitmap. + let frag1_start = rows_per_frag; + let ids: Vec = (frag1_start..frag1_start + rows_per_frag) + .map(|j| format!("id-{j:04}")) + .collect(); + let sub_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new( + "vec", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), dim), + false, + ), + ])); + let values: Vec = (0..rows_per_frag * dim as usize) + .map(|i| (frag1_start * dim as usize + i) as f32 + 0.5) + .collect(); + let vectors = + FixedSizeListArray::try_new_from_values(Float32Array::from(values), dim).unwrap(); + let update_batch = RecordBatch::try_new( + sub_schema.clone(), + vec![Arc::new(StringArray::from(ids)), Arc::new(vectors)], + ) + .unwrap(); + let reader = Box::new(RecordBatchIterator::new([Ok(update_batch)], sub_schema)); + let (ds, _) = MergeInsertBuilder::try_new(ds, vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap() + .execute_reader(reader) + .await + .unwrap(); + + // KNN search with k = total_rows to retrieve all rows + let query: Float32Array = (0..dim) + .map(|i| (frag1_start * dim as usize + i as usize) as f32 + 0.5) + .collect(); + let results = ds + .scan() + .nearest("vec", &query, total_rows) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + // Check no duplicate ids + let ids = results + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let unique_ids: std::collections::HashSet<&str> = + (0..ids.len()).map(|i| ids.value(i)).collect(); + assert_eq!( + unique_ids.len(), + ids.len(), + "Found duplicate ids in KNN results: {} unique out of {} total", + unique_ids.len(), + ids.len() + ); + } + + // Regression test: after a partial-schema merge_insert drops a fragment from the FTS + // index bitmap, a full text search should not return duplicate rows. The stale inverted + // index data still references the dropped fragment, and the scanner also flat-scans + // unindexed fragments, causing the same rows to appear from both paths. + #[tokio::test] + async fn test_partial_merge_insert_stale_fts_index_duplicates() { + let rows_per_frag = 10usize; + let num_frags = 3usize; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new("text", DataType::Utf8, false), + ])); + + let make_batch = |frag_idx: usize| { + let start = frag_idx * rows_per_frag; + let ids: Vec = (start..start + rows_per_frag) + .map(|j| format!("id-{j:04}")) + .collect(); + let cats: Vec<&str> = vec!["A"; rows_per_frag]; + // Every row contains "common" so we can search for it and expect all rows + let texts: Vec = (start..start + rows_per_frag) + .map(|j| format!("common unique{j:04}")) + .collect(); + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(StringArray::from(cats)), + Arc::new(StringArray::from(texts)), + ], + ) + .unwrap() + }; + + // Write 3 fragments + let batch0 = make_batch(0); + let reader = Box::new(RecordBatchIterator::new([Ok(batch0)], schema.clone())); + let mut ds = Dataset::write(reader, "memory://fts_stale_test", None) + .await + .unwrap(); + for frag_idx in 1..num_frags { + let batch = make_batch(frag_idx); + let reader = Box::new(RecordBatchIterator::new([Ok(batch)], schema.clone())); + ds.append(reader, None).await.unwrap(); + } + + // Create inverted index on text + let params = InvertedIndexParams::default(); + ds.create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + let ds = Arc::new(ds); + + // Partial merge_insert with (id, text) on fragment 1 rows. + // Text still contains "common" so FTS will find them via both paths. + // This drops fragment 1 from the inverted index bitmap. + let frag1_start = rows_per_frag; + let ids: Vec = (frag1_start..frag1_start + rows_per_frag) + .map(|j| format!("id-{j:04}")) + .collect(); + let texts: Vec = (frag1_start..frag1_start + rows_per_frag) + .map(|j| format!("common updated{j:04}")) + .collect(); + let sub_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("text", DataType::Utf8, false), + ])); + let update_batch = RecordBatch::try_new( + sub_schema.clone(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(StringArray::from(texts)), + ], + ) + .unwrap(); + let reader = Box::new(RecordBatchIterator::new([Ok(update_batch)], sub_schema)); + let (ds, _) = MergeInsertBuilder::try_new(ds, vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap() + .execute_reader(reader) + .await + .unwrap(); + + // FTS search for "common" — every row should match exactly once + let query = FullTextSearchQuery::new("common".to_string()); + let results = ds + .scan() + .full_text_search(query) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + // Check no duplicate ids + let ids = results + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let unique_ids: std::collections::HashSet<&str> = + (0..ids.len()).map(|i| ids.value(i)).collect(); + assert_eq!( + unique_ids.len(), + ids.len(), + "Found duplicate ids in FTS results: {} unique out of {} total", + unique_ids.len(), + ids.len() + ); + // Also verify we got all rows + assert_eq!( + unique_ids.len(), + rows_per_frag * num_frags, + "Expected {} rows but got {}", + rows_per_frag * num_frags, + unique_ids.len() + ); + } + + // Regression test: after a partial-schema merge_insert invalidates a fragment, + // compaction should succeed and subsequent searches should return correct results. + // + // The compaction planner separates indexed and unindexed fragments into different + // groups. After invalidating the middle fragment, the indexed fragments on either + // side form separate compactable groups. After compaction the old invalidated + // fragment ID may remain in invalidated_fragment_bitmap but this is harmless + // because the fragment no longer exists and no index results reference it. + #[tokio::test] + async fn test_compaction_after_invalidated_fragment() { + use crate::dataset::optimize::{CompactionOptions, compact_files}; + + // Use 5 small fragments so that after invalidating the middle one (fragment 2), + // the planner has enough neighbors to form compactable groups on each side: + // {0,1} (indexed) and {3,4} (indexed), with {2} (unindexed) separate. + let rows_per_frag = 20; + let num_frags = 5; + let total_rows = rows_per_frag * num_frags; + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("category", DataType::Utf8, false), + Field::new("value_a", DataType::Float64, false), + Field::new("value_b", DataType::Float64, false), + ])); + + let make_batch = |frag_idx: usize| { + let start = frag_idx * rows_per_frag; + let ids: Vec = (start..start + rows_per_frag) + .map(|j| format!("id-{j:04}")) + .collect(); + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(StringArray::from(vec!["A"; rows_per_frag])), + Arc::new(Float64Array::from( + (0..rows_per_frag).map(|i| i as f64).collect::>(), + )), + Arc::new(Float64Array::from( + (0..rows_per_frag) + .map(|i| i as f64 * 0.1) + .collect::>(), + )), + ], + ) + .unwrap() + }; + + let batch0 = make_batch(0); + let reader = Box::new(RecordBatchIterator::new([Ok(batch0)], schema.clone())); + let mut ds = Dataset::write(reader, "memory://compaction_test", None) + .await + .unwrap(); + for frag_idx in 1..num_frags { + let batch = make_batch(frag_idx); + let reader = Box::new(RecordBatchIterator::new([Ok(batch)], schema.clone())); + ds.append(reader, None).await.unwrap(); + } + ds.create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + let ds = Arc::new(ds); + + // Invalidate fragment 2 (the middle one) + let frag2_start = 2 * rows_per_frag; + let ds = partial_merge_insert(ds, frag2_start..frag2_start + rows_per_frag, 999.0).await; + + // Verify pre-compaction state + let indices = ds.load_indices().await.unwrap(); + let idx = indices.iter().find(|i| i.name == "id_idx").unwrap(); + assert!(!idx.fragment_bitmap.as_ref().unwrap().contains(2)); + assert!( + idx.invalidated_fragment_bitmap + .as_ref() + .unwrap() + .contains(2) + ); + + // Run compaction with a target that forces merging of the small fragments. + let mut ds = (*ds).clone(); + let opts = CompactionOptions { + target_rows_per_fragment: total_rows, + ..Default::default() + }; + compact_files(&mut ds, opts, None).await.unwrap(); + + // The indexed fragments (0,1 and 3,4) should be compacted. + // Fragment 2 (unindexed) may or may not be compacted on its own. + // Either way, the old fragment IDs in the bitmap should be replaced. + let indices = ds.load_indices().await.unwrap(); + let idx = indices.iter().find(|i| i.name == "id_idx").unwrap(); + let bitmap = idx.fragment_bitmap.as_ref().unwrap(); + for &old_id in &[0u32, 1, 3, 4] { + assert!( + !bitmap.contains(old_id), + "Old indexed fragment {} should not be in bitmap after compaction", + old_id + ); + } + assert!( + !bitmap.is_empty(), + "Bitmap should have new compacted fragments" + ); + + // The invalidated bitmap may still reference old fragment 2. + // This is harmless — fragment 2 no longer exists (or was compacted into + // a new fragment), so blocking it is a no-op. + + // Verify search works correctly despite stale invalidated entries. + let ds = Arc::new(ds); + let ds = partial_merge_insert(ds, frag2_start..frag2_start + rows_per_frag, 888.0).await; + + // All rows present + let batches = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let combined = concat_batches(&schema, &batches).unwrap(); + assert_eq!(combined.num_rows(), total_rows); + + // Updated rows have correct value + let result = ds + .scan() + .filter(&format!( + "id >= 'id-{:04}' AND id < 'id-{:04}'", + frag2_start, + frag2_start + rows_per_frag + )) + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let result = concat_batches(&schema, &result).unwrap(); + assert_eq!(result.num_rows(), rows_per_frag); + let values = result + .column_by_name("value_a") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..rows_per_frag { + assert_eq!(values.value(i), 888.0, "row {i} should have value_a=888.0"); + } + } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 40788265172..e5b525fbae7 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1100,6 +1100,7 @@ impl DatasetIndexExt for Dataset { created_at: Some(chrono::Utc::now()), base_id: None, // New merged index file locates in the cloned dataset. files: res.files, + invalidated_fragment_bitmap: None, }; removed_indices.extend(res.removed_indices.iter().map(|&idx| idx.clone())); new_indices.push(new_idx); @@ -2300,6 +2301,7 @@ mod tests { path: INDEX_FILE_NAME.to_string(), size_bytes: payload.len() as u64, }]), + invalidated_fragment_bitmap: None, } } diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 8aed939787a..39a7103a5aa 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -455,6 +455,7 @@ impl<'a> CreateIndexBuilder<'a> { created_at: Some(chrono::Utc::now()), base_id: None, files: created_index.files, + invalidated_fragment_bitmap: None, }) } diff --git a/rust/lance/src/index/frag_reuse.rs b/rust/lance/src/index/frag_reuse.rs index e5f63514d86..98128c5aade 100644 --- a/rust/lance/src/index/frag_reuse.rs +++ b/rust/lance/src/index/frag_reuse.rs @@ -174,5 +174,6 @@ pub(crate) async fn build_frag_reuse_index_metadata( base_id: None, // Fragment reuse index is inline (no files) files: None, + invalidated_fragment_bitmap: None, }) } diff --git a/rust/lance/src/index/mem_wal.rs b/rust/lance/src/index/mem_wal.rs index a9a966f4ee9..ebc9e3ba180 100644 --- a/rust/lance/src/index/mem_wal.rs +++ b/rust/lance/src/index/mem_wal.rs @@ -113,6 +113,7 @@ pub(crate) fn new_mem_wal_index_meta( base_id: None, // Memory WAL index is inline (no files) files: None, + invalidated_fragment_bitmap: None, }) } diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index de1d97e1e31..5678a055b00 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -74,10 +74,24 @@ impl DatasetPreFilter { Self::create_deletion_mask(dataset, fragments).map(SharedPrerequisite::spawn); let filtered_ids = filter .map(|filtered_ids| SharedPrerequisite::spawn(filtered_ids.load().in_current_span())); + // Block rows from invalidated fragments — fragments whose indexed column + // values changed but whose index data was not rewritten (see + // IndexMetadata::invalidated_fragment_bitmap for details). + let mut invalidated = RoaringBitmap::new(); + for idx in indices { + if let Some(inv) = &idx.invalidated_fragment_bitmap { + invalidated |= inv; + } + } + let deleted_fragments = if invalidated.is_empty() { + None + } else { + Some(invalidated) + }; Self { deleted_ids, filtered_ids, - deleted_fragments: None, + deleted_fragments, final_mask: Mutex::new(OnceCell::new()), } } @@ -183,7 +197,11 @@ impl DatasetPreFilter { /// Used by FTS indices which track fragments that have been removed from the /// dataset but whose data is still present in the index (merge-on-read). pub fn set_deleted_fragments(&mut self, fragments: RoaringBitmap) { - self.deleted_fragments = Some(fragments); + if let Some(existing) = &mut self.deleted_fragments { + *existing |= fragments; + } else { + self.deleted_fragments = Some(fragments); + } } /// Creates a task to load mask to filter out deleted rows. diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 44739454bec..48fa41c6c91 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -642,6 +642,7 @@ mod tests { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, } } diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index e137237b9a0..5697af85931 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -1795,6 +1795,7 @@ pub async fn initialize_vector_index( created_at: Some(chrono::Utc::now()), base_id: None, files: Some(files), + invalidated_fragment_bitmap: None, }; let transaction = Transaction::new( diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index a38cf3a57de..60de8e3cce4 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2873,6 +2873,7 @@ mod tests { created_at: Some(chrono::Utc::now()), base_id: None, files: None, + invalidated_fragment_bitmap: None, }; // We need to commit this index to the dataset so that it can be found @@ -2912,6 +2913,7 @@ mod tests { created_at: None, // Test index, not setting timestamp base_id: None, files: None, + invalidated_fragment_bitmap: None, }; let prefilter = Arc::new(DatasetPreFilter::new(dataset.clone(), &[index_meta], None)); @@ -2972,6 +2974,7 @@ mod tests { created_at: Some(chrono::Utc::now()), base_id: None, files: None, + invalidated_fragment_bitmap: None, }; // We need to commit this new index to the dataset so it can be found diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 9e8db7caf0f..d0340632b03 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -118,7 +118,7 @@ pub(crate) async fn write_transaction_file( let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid); let path = base_path.child(TRANSACTIONS_DIR).child(file_name.as_str()); - let message = pb::Transaction::from(transaction); + let message = pb::Transaction::try_from(transaction)?; let buf = message.encode_to_vec(); object_store.inner.put(&path, buf.into()).await?; diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index ec03ba596db..7b6b616b506 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -2176,6 +2176,7 @@ mod tests { created_at: None, // Test index, not setting timestamp base_id: None, files: None, + invalidated_fragment_bitmap: None, }; let fragment0 = Fragment::new(0); let fragment1 = Fragment::new(1); @@ -2676,6 +2677,7 @@ mod tests { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, }; let index1 = IndexMetadata { uuid: uuid::Uuid::new_v4(), @@ -2742,6 +2744,7 @@ mod tests { created_at: None, base_id: None, files: None, + invalidated_fragment_bitmap: None, }], removed_indices: vec![], }, diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index f587ec22a91..a8d1d43276e 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -342,11 +342,24 @@ impl MapIndexExec { .unwrap(); let deletion_mask_fut = DatasetPreFilter::create_deletion_mask(dataset.clone(), index.fragment_bitmap.unwrap()); - let deletion_mask = if let Some(deletion_mask_fut) = deletion_mask_fut { + let mut deletion_mask = if let Some(deletion_mask_fut) = deletion_mask_fut { Some(deletion_mask_fut.await?) } else { None }; + // Block rows from invalidated fragments — fragments whose indexed + // column values changed but whose index data was not rewritten. + if let Some(invalidated) = &index.invalidated_fragment_bitmap { + let mut block_list = lance_core::utils::mask::RowAddrTreeMap::new(); + for frag_id in invalidated.iter() { + block_list.insert_fragment(frag_id); + } + let inv_mask = Arc::new(lance_core::utils::mask::RowAddrMask::from_block(block_list)); + deletion_mask = Some(match deletion_mask { + Some(existing) => Arc::new((*existing).clone() & (*inv_mask).clone()), + None => inv_mask, + }); + } Ok(input.and_then(move |res| { let column_name = column_name.clone(); let index_name = index_name.clone();