From ec156fa5841b2fa9987c5bd90fc5e20e4409e72d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 4 Aug 2025 10:22:51 -0700 Subject: [PATCH 1/4] feat: conflict resolution for DataReplacement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #3686 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- rust/lance/src/dataset/transaction.rs | 398 ++++++++++++++++++++++++++ 1 file changed, 398 insertions(+) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 6637b76d58f..807eb185243 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1418,6 +1418,57 @@ impl From<&pb::transaction::UpdateMap> for UpdateMap { replace: pb_update_map.replace, } } + + /// Returns the IDs of fragments that have been modified by this operation. + /// + /// This does not include new fragments. + fn modified_fragment_ids(&self) -> Box + '_> { + match self { + // These operations add new fragments or don't modify any. + Self::Append { .. } + | Self::Overwrite { .. } + | Self::CreateIndex { .. } + | Self::ReserveFragments { .. } + | Self::Project { .. } + | Self::UpdateConfig { .. } + | Self::Restore { .. } + | Self::UpdateMemWalState { .. } => Box::new(std::iter::empty()), + Self::Delete { + updated_fragments, + deleted_fragment_ids, + .. + } => Box::new( + updated_fragments + .iter() + .map(|f| f.id) + .chain(deleted_fragment_ids.iter().copied()), + ), + Self::Rewrite { groups, .. } => Box::new( + groups + .iter() + .flat_map(|f| f.old_fragments.iter().map(|f| f.id)), + ), + Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)), + Self::Update { + updated_fragments, + removed_fragment_ids, + .. + } => Box::new( + updated_fragments + .iter() + .map(|f| f.id) + .chain(removed_fragment_ids.iter().copied()), + ), + Self::DataReplacement { replacements } => Box::new(replacements.iter().map(|r| r.0)), + } + } + + /// Returns true if this operation modifies any of the same fragment IDs as another operation. + fn modifies_same_ids(&self, other: &Self) -> bool { + let self_ids: HashSet = self.modified_fragment_ids().collect(); + let other_ids: HashSet = other.modified_fragment_ids().collect(); + !self_ids.is_disjoint(&other_ids) + } } /// Add TransactionBuilder for flexibly setting option without using `mut` @@ -1484,6 +1535,187 @@ impl Transaction { .build() } + /// Returns true if the transaction cannot be committed if the other + /// transaction is committed first. + pub fn conflicts_with(&self, other: &Self) -> bool { + // This assumes IsolationLevel is Snapshot Isolation, which is more + // permissive than Serializable. In particular, it allows a Delete + // transaction to succeed after a concurrent Append, even if the Append + // added rows that would be deleted. + match &self.operation { + Operation::Append { .. } => match &other.operation { + // Append is compatible with anything that doesn't change the schema + Operation::Append { .. } => false, + Operation::Rewrite { .. } => false, + Operation::CreateIndex { .. } => false, + Operation::Delete { .. } | Operation::Update { .. } => false, + Operation::ReserveFragments { .. } => false, + Operation::Project { .. } => false, + Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, + _ => true, + }, + Operation::Rewrite { .. } => match &other.operation { + // Rewrite is only compatible with operations that don't touch + // existing fragments. + // TODO: it could also be compatible with operations that update + // fragments we don't touch. + Operation::Append { .. } => false, + Operation::ReserveFragments { .. } => false, + Operation::Delete { .. } + | Operation::Rewrite { .. } + | Operation::Update { .. } + | Operation::DataReplacement { .. } => { + // As long as they rewrite disjoint fragments they shouldn't conflict. + self.operation.modifies_same_ids(&other.operation) + } + Operation::Project { .. } => false, + Operation::UpdateConfig { .. } => false, + _ => true, + }, + // Restore always succeeds + Operation::Restore { .. } => false, + // ReserveFragments is compatible with anything that doesn't reset the + // max fragment id. + Operation::ReserveFragments { .. } => matches!( + &other.operation, + Operation::Overwrite { .. } | Operation::Restore { .. } + ), + Operation::CreateIndex { new_indices, .. } => { + let affected_fields = new_indices + .iter() + .flat_map(|index| index.fields.iter()) + .collect::>(); + match &other.operation { + Operation::Append { .. } => false, + // Indices are identified by UUIDs, so they shouldn't conflict. + Operation::CreateIndex { .. } => false, + // Although some of the rows we indexed may have been deleted / moved, + // row ids are still valid, so we allow this optimistically. + Operation::Delete { .. } | Operation::Update { .. } => false, + // Merge & reserve don't change row ids, so this should be fine. + Operation::Merge { .. } => false, + Operation::ReserveFragments { .. } => false, + // Rewrite likely changed many of the row ids, so our index is + // likely useless. It should be rebuilt. + // TODO: we could be smarter here and only invalidate the index + // if the rewrite changed more than X% of row ids. + Operation::Rewrite { .. } => true, + Operation::UpdateConfig { .. } => false, + // Because we modify the fragment in place, the index cannot be + // re-used if it's on something that we've modified. + Operation::DataReplacement { replacements } => { + replacements.iter().any(|replacement| { + replacement + .1 + .fields + .iter() + .any(|field| affected_fields.contains(field)) + }) + } + _ => true, + } + } + Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { + Operation::CreateIndex { .. } => false, + Operation::ReserveFragments { .. } => false, + Operation::Delete { .. } + | Operation::Rewrite { .. } + | Operation::Update { .. } + | Operation::DataReplacement { .. } => { + // If we update the same fragments, we conflict. + self.operation.modifies_same_ids(&other.operation) + } + Operation::Project { .. } => false, + Operation::Append { .. } => false, + Operation::UpdateConfig { .. } => false, + _ => true, + }, + Operation::Overwrite { .. } => match &other.operation { + // Overwrite only conflicts with another operation modifying the same update config + Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { + self.operation.upsert_key_conflict(&other.operation) + } + _ => false, + }, + Operation::UpdateConfig { + schema_metadata, + field_metadata, + .. + } => match &other.operation { + Operation::Overwrite { .. } => { + // Updates to schema metadata or field metadata conflict with any kind + // of overwrite. + if schema_metadata.is_some() || field_metadata.is_some() { + true + } else { + self.operation.upsert_key_conflict(&other.operation) + } + } + Operation::UpdateConfig { .. } => { + self.operation.upsert_key_conflict(&other.operation) + | self.operation.modifies_same_metadata(&other.operation) + } + _ => false, + }, + // Merge changes the schema, but preserves row ids, so the only operations + // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. + Operation::Merge { .. } => !matches!( + &other.operation, + Operation::CreateIndex { .. } + | Operation::ReserveFragments { .. } + | Operation::UpdateConfig { .. } + ), + Operation::Project { .. } => match &other.operation { + // Project is compatible with anything that doesn't change the schema + Operation::CreateIndex { .. } => false, + Operation::Overwrite { .. } => false, + Operation::UpdateConfig { .. } => false, + _ => true, + }, + Operation::DataReplacement { replacements } => match &other.operation { + Operation::Append { .. } + | Operation::Delete { .. } + | Operation::Update { .. } + | Operation::Merge { .. } + | Operation::UpdateConfig { .. } => false, + Operation::CreateIndex { new_indices, .. } => { + let affected_fields = new_indices + .iter() + .flat_map(|index| index.fields.iter()) + .collect::>(); + + replacements.iter().any(|replacement| { + replacement + .1 + .fields + .iter() + .any(|field| affected_fields.contains(field)) + }) + } + Operation::Rewrite { .. } => self.operation.modifies_same_ids(&other.operation), + Operation::DataReplacement { + replacements: other_replacements, + } => replacements.iter().any(|replacement| { + other_replacements.iter().any(|other_replacement| { + replacement.0 == other_replacement.0 + && replacement + .1 + .fields + .iter() + .any(|field| other_replacement.1.fields.contains(field)) + }) + }), + _ => true, + }, + Operation::UpdateMemWalState { .. } => { + // UpdateMemWalState doesn't conflict with any other operations + // as it only modifies the MemWAL state tracking + false + } + } + } + fn fragments_with_ids<'a, T>( new_fragments: T, fragment_id: &'a mut u64, @@ -4351,4 +4583,170 @@ mod tests { // Verify idx_e removed (bad field) assert!(!indices.iter().any(|idx| idx.name == "idx_e")); } + + #[test] + fn test_data_replacement_conflicts() { + let data_file0 = DataFile { + path: "file0".to_string(), + fields: vec![0], + column_indices: vec![], + file_major_version: 1, + file_minor_version: 0, + file_size_bytes: CachedFileSize::new(1000), + base_id: None, + }; + + let data_file1 = DataFile { + path: "file1".to_string(), + fields: vec![1], + column_indices: vec![], + file_major_version: 1, + file_minor_version: 0, + file_size_bytes: CachedFileSize::new(1000), + base_id: None, + }; + + let data_file2 = DataFile { + path: "file2".to_string(), + fields: vec![0], + column_indices: vec![], + file_major_version: 1, + file_minor_version: 0, + file_size_bytes: CachedFileSize::new(1000), + base_id: None, + }; + + let replacements = vec![ + DataReplacementGroup(0, data_file0), + DataReplacementGroup(1, data_file1.clone()), + ]; + + let other_replacements = vec![ + DataReplacementGroup(1, data_file1), + DataReplacementGroup(2, data_file2.clone()), + ]; + + let operation = Operation::DataReplacement { replacements }; + let other_operation = Operation::DataReplacement { + replacements: other_replacements, + }; + + let transaction = Transaction::new(0, operation, None, None); + let other_transaction = Transaction::new(0, other_operation, None, None); + + // Conflicts because fragment 1 is being replaced in both transactions + assert!(transaction.conflicts_with(&other_transaction)); + + let non_conflicting_replacements = vec![DataReplacementGroup(2, data_file2)]; + let non_conflicting_operation = Operation::DataReplacement { + replacements: non_conflicting_replacements, + }; + let non_conflicting_transaction = + Transaction::new(0, non_conflicting_operation, None, None); + + // No conflict because the fragments being replaced are disjoint + assert!(!transaction.conflicts_with(&non_conflicting_transaction)); + } + + #[test] + fn test_data_replacement_with_create_index_conflict() { + let data_file = DataFile { + path: "file0".to_string(), + fields: vec![0], + column_indices: vec![], + file_major_version: 1, + file_minor_version: 0, + file_size_bytes: CachedFileSize::new(1000), + base_id: None, + }; + + let replacements = vec![DataReplacementGroup(0, data_file)]; + let operation = Operation::DataReplacement { replacements }; + + let index = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "test_index".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: None, + index_details: None, + created_at: None, + index_version: 1, + base_id: None, + }; + + let create_index_operation = Operation::CreateIndex { + new_indices: vec![index], + removed_indices: vec![], + }; + + let transaction = Transaction::new(0, operation, None, None); + let create_index_transaction = Transaction::new(0, create_index_operation, None, None); + + // Conflicts because the index is being created on the same field being replaced + assert!(transaction.conflicts_with(&create_index_transaction)); + } + + #[test] + fn test_data_replacement_with_rewrite_conflict() { + let fragment0 = Fragment::new(0); + let fragment1 = Fragment::new(1); + + let data_file = DataFile { + path: "file0".to_string(), + fields: vec![0], + column_indices: vec![], + file_major_version: 1, + file_minor_version: 0, + file_size_bytes: CachedFileSize::new(1000), + base_id: None, + }; + + let replacements = vec![DataReplacementGroup(0, data_file)]; + let operation = Operation::DataReplacement { replacements }; + + let rewrite_group = RewriteGroup { + old_fragments: vec![fragment0], + new_fragments: vec![fragment1], + }; + + let rewrite_operation = Operation::Rewrite { + groups: vec![rewrite_group], + rewritten_indices: vec![], + frag_reuse_index: None, + }; + + let transaction = Transaction::new(0, operation, None, None); + let rewrite_transaction = Transaction::new(0, rewrite_operation, None, None); + + // Conflicts because the rewrite modifies the same fragment being replaced + assert!(transaction.conflicts_with(&rewrite_transaction)); + } + + #[test] + fn test_data_replacement_no_conflict_with_append() { + let data_file = DataFile { + path: "file0".to_string(), + fields: vec![0], + column_indices: vec![], + file_major_version: 1, + file_minor_version: 0, + file_size_bytes: CachedFileSize::new(1000), + base_id: None, + }; + + let replacements = vec![DataReplacementGroup(0, data_file)]; + let operation = Operation::DataReplacement { replacements }; + + let fragment = Fragment::new(1); + let append_operation = Operation::Append { + fragments: vec![fragment], + }; + + let transaction = Transaction::new(0, operation, None, None); + let append_transaction = Transaction::new(0, append_operation, None, None); + + // No conflict because append does not modify existing fragments + assert!(!transaction.conflicts_with(&append_transaction)); + } } From 55b80f9d7ebd23bc7080926290b35186436300b8 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 10 Nov 2025 16:20:47 -0800 Subject: [PATCH 2/4] feat: check fragment bitmap overlap for DataReplacement vs CreateIndex conflicts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, conflict detection between DataReplacement and CreateIndex only checked if the fields overlapped. Now also checks if the indexed fragment bitmap contains the fragment ID being replaced, providing more granular conflict detection. When an index has no fragment_bitmap (None), we conservatively assume conflict to maintain safety. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- rust/lance/src/dataset/transaction.rs | 158 ++++++++++++++++++++++---- 1 file changed, 135 insertions(+), 23 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 807eb185243..f5b740a3e81 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1582,10 +1582,6 @@ impl Transaction { Operation::Overwrite { .. } | Operation::Restore { .. } ), Operation::CreateIndex { new_indices, .. } => { - let affected_fields = new_indices - .iter() - .flat_map(|index| index.fields.iter()) - .collect::>(); match &other.operation { Operation::Append { .. } => false, // Indices are identified by UUIDs, so they shouldn't conflict. @@ -1605,12 +1601,29 @@ impl Transaction { // Because we modify the fragment in place, the index cannot be // re-used if it's on something that we've modified. Operation::DataReplacement { replacements } => { - replacements.iter().any(|replacement| { - replacement - .1 - .fields - .iter() - .any(|field| affected_fields.contains(field)) + new_indices.iter().any(|index| { + // Check if the index fields overlap with the replacement fields + let fields_overlap = replacements.iter().any(|replacement| { + replacement + .1 + .fields + .iter() + .any(|field| index.fields.contains(field)) + }); + + if !fields_overlap { + return false; + } + + // If fields overlap, check if the fragment bitmap overlaps + if let Some(ref bitmap) = index.fragment_bitmap { + replacements + .iter() + .any(|replacement| bitmap.contains(replacement.0 as u32)) + } else { + // If no fragment bitmap, conservatively assume conflict + true + } }) } _ => true, @@ -1680,17 +1693,29 @@ impl Transaction { | Operation::Merge { .. } | Operation::UpdateConfig { .. } => false, Operation::CreateIndex { new_indices, .. } => { - let affected_fields = new_indices - .iter() - .flat_map(|index| index.fields.iter()) - .collect::>(); + new_indices.iter().any(|index| { + // Check if the index fields overlap with the replacement fields + let fields_overlap = replacements.iter().any(|replacement| { + replacement + .1 + .fields + .iter() + .any(|field| index.fields.contains(field)) + }); - replacements.iter().any(|replacement| { - replacement - .1 - .fields - .iter() - .any(|field| affected_fields.contains(field)) + if !fields_overlap { + return false; + } + + // If fields overlap, check if the fragment bitmap overlaps + if let Some(ref bitmap) = index.fragment_bitmap { + replacements + .iter() + .any(|replacement| bitmap.contains(replacement.0 as u32)) + } else { + // If no fragment bitmap, conservatively assume conflict + true + } }) } Operation::Rewrite { .. } => self.operation.modifies_same_ids(&other.operation), @@ -4663,7 +4688,8 @@ mod tests { let replacements = vec![DataReplacementGroup(0, data_file)]; let operation = Operation::DataReplacement { replacements }; - let index = IndexMetadata { + // Test 1: Index without fragment_bitmap conflicts when fields overlap + let index_no_bitmap = IndexMetadata { uuid: uuid::Uuid::new_v4(), name: "test_index".to_string(), fields: vec![0], @@ -4676,15 +4702,101 @@ mod tests { }; let create_index_operation = Operation::CreateIndex { - new_indices: vec![index], + new_indices: vec![index_no_bitmap], removed_indices: vec![], }; - let transaction = Transaction::new(0, operation, None, None); + let transaction = Transaction::new(0, operation.clone(), None, None); let create_index_transaction = Transaction::new(0, create_index_operation, None, None); // Conflicts because the index is being created on the same field being replaced + // and we don't have fragment bitmap info assert!(transaction.conflicts_with(&create_index_transaction)); + + // Test 2: Index with fragment_bitmap that includes the replaced fragment conflicts + let mut bitmap_with_fragment = RoaringBitmap::new(); + bitmap_with_fragment.insert(0); + bitmap_with_fragment.insert(1); + + let index_with_overlapping_bitmap = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "test_index".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: Some(bitmap_with_fragment), + index_details: None, + created_at: None, + index_version: 1, + base_id: None, + }; + + let create_index_overlapping = Operation::CreateIndex { + new_indices: vec![index_with_overlapping_bitmap], + removed_indices: vec![], + }; + + let transaction2 = Transaction::new(0, operation.clone(), None, None); + let create_index_transaction2 = Transaction::new(0, create_index_overlapping, None, None); + + // Conflicts because fragment 0 is in the index bitmap + assert!(transaction2.conflicts_with(&create_index_transaction2)); + + // Test 3: Index with fragment_bitmap that doesn't include the replaced fragment doesn't conflict + let mut bitmap_without_fragment = RoaringBitmap::new(); + bitmap_without_fragment.insert(1); + bitmap_without_fragment.insert(2); + + let index_with_non_overlapping_bitmap = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "test_index".to_string(), + fields: vec![0], + dataset_version: 1, + fragment_bitmap: Some(bitmap_without_fragment), + index_details: None, + created_at: None, + index_version: 1, + base_id: None, + }; + + let create_index_non_overlapping = Operation::CreateIndex { + new_indices: vec![index_with_non_overlapping_bitmap], + removed_indices: vec![], + }; + + let transaction3 = Transaction::new(0, operation.clone(), None, None); + let create_index_transaction3 = + Transaction::new(0, create_index_non_overlapping, None, None); + + // No conflict because fragment 0 is not in the index bitmap + assert!(!transaction3.conflicts_with(&create_index_transaction3)); + + // Test 4: Index on different field doesn't conflict even if fragment overlaps + let mut bitmap_with_fragment2 = RoaringBitmap::new(); + bitmap_with_fragment2.insert(0); + + let index_different_field = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + name: "test_index".to_string(), + fields: vec![1], // Different field + dataset_version: 1, + fragment_bitmap: Some(bitmap_with_fragment2), + index_details: None, + created_at: None, + index_version: 1, + base_id: None, + }; + + let create_index_different_field = Operation::CreateIndex { + new_indices: vec![index_different_field], + removed_indices: vec![], + }; + + let transaction4 = Transaction::new(0, operation, None, None); + let create_index_transaction4 = + Transaction::new(0, create_index_different_field, None, None); + + // No conflict because fields don't overlap + assert!(!transaction4.conflicts_with(&create_index_transaction4)); } #[test] From 25fa93c89cc07f5f2b2d462e420122b7e8cdd318 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 14 Nov 2025 14:07:58 -0800 Subject: [PATCH 3/4] setup better impl --- rust/lance/src/dataset/transaction.rs | 510 ------------------ rust/lance/src/io/commit/conflict_resolver.rs | 82 ++- 2 files changed, 66 insertions(+), 526 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f5b740a3e81..6637b76d58f 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1418,57 +1418,6 @@ impl From<&pb::transaction::UpdateMap> for UpdateMap { replace: pb_update_map.replace, } } - - /// Returns the IDs of fragments that have been modified by this operation. - /// - /// This does not include new fragments. - fn modified_fragment_ids(&self) -> Box + '_> { - match self { - // These operations add new fragments or don't modify any. - Self::Append { .. } - | Self::Overwrite { .. } - | Self::CreateIndex { .. } - | Self::ReserveFragments { .. } - | Self::Project { .. } - | Self::UpdateConfig { .. } - | Self::Restore { .. } - | Self::UpdateMemWalState { .. } => Box::new(std::iter::empty()), - Self::Delete { - updated_fragments, - deleted_fragment_ids, - .. - } => Box::new( - updated_fragments - .iter() - .map(|f| f.id) - .chain(deleted_fragment_ids.iter().copied()), - ), - Self::Rewrite { groups, .. } => Box::new( - groups - .iter() - .flat_map(|f| f.old_fragments.iter().map(|f| f.id)), - ), - Self::Merge { fragments, .. } => Box::new(fragments.iter().map(|f| f.id)), - Self::Update { - updated_fragments, - removed_fragment_ids, - .. - } => Box::new( - updated_fragments - .iter() - .map(|f| f.id) - .chain(removed_fragment_ids.iter().copied()), - ), - Self::DataReplacement { replacements } => Box::new(replacements.iter().map(|r| r.0)), - } - } - - /// Returns true if this operation modifies any of the same fragment IDs as another operation. - fn modifies_same_ids(&self, other: &Self) -> bool { - let self_ids: HashSet = self.modified_fragment_ids().collect(); - let other_ids: HashSet = other.modified_fragment_ids().collect(); - !self_ids.is_disjoint(&other_ids) - } } /// Add TransactionBuilder for flexibly setting option without using `mut` @@ -1535,212 +1484,6 @@ impl Transaction { .build() } - /// Returns true if the transaction cannot be committed if the other - /// transaction is committed first. - pub fn conflicts_with(&self, other: &Self) -> bool { - // This assumes IsolationLevel is Snapshot Isolation, which is more - // permissive than Serializable. In particular, it allows a Delete - // transaction to succeed after a concurrent Append, even if the Append - // added rows that would be deleted. - match &self.operation { - Operation::Append { .. } => match &other.operation { - // Append is compatible with anything that doesn't change the schema - Operation::Append { .. } => false, - Operation::Rewrite { .. } => false, - Operation::CreateIndex { .. } => false, - Operation::Delete { .. } | Operation::Update { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Project { .. } => false, - Operation::UpdateConfig { .. } => false, - Operation::DataReplacement { .. } => false, - _ => true, - }, - Operation::Rewrite { .. } => match &other.operation { - // Rewrite is only compatible with operations that don't touch - // existing fragments. - // TODO: it could also be compatible with operations that update - // fragments we don't touch. - Operation::Append { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Delete { .. } - | Operation::Rewrite { .. } - | Operation::Update { .. } - | Operation::DataReplacement { .. } => { - // As long as they rewrite disjoint fragments they shouldn't conflict. - self.operation.modifies_same_ids(&other.operation) - } - Operation::Project { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - // Restore always succeeds - Operation::Restore { .. } => false, - // ReserveFragments is compatible with anything that doesn't reset the - // max fragment id. - Operation::ReserveFragments { .. } => matches!( - &other.operation, - Operation::Overwrite { .. } | Operation::Restore { .. } - ), - Operation::CreateIndex { new_indices, .. } => { - match &other.operation { - Operation::Append { .. } => false, - // Indices are identified by UUIDs, so they shouldn't conflict. - Operation::CreateIndex { .. } => false, - // Although some of the rows we indexed may have been deleted / moved, - // row ids are still valid, so we allow this optimistically. - Operation::Delete { .. } | Operation::Update { .. } => false, - // Merge & reserve don't change row ids, so this should be fine. - Operation::Merge { .. } => false, - Operation::ReserveFragments { .. } => false, - // Rewrite likely changed many of the row ids, so our index is - // likely useless. It should be rebuilt. - // TODO: we could be smarter here and only invalidate the index - // if the rewrite changed more than X% of row ids. - Operation::Rewrite { .. } => true, - Operation::UpdateConfig { .. } => false, - // Because we modify the fragment in place, the index cannot be - // re-used if it's on something that we've modified. - Operation::DataReplacement { replacements } => { - new_indices.iter().any(|index| { - // Check if the index fields overlap with the replacement fields - let fields_overlap = replacements.iter().any(|replacement| { - replacement - .1 - .fields - .iter() - .any(|field| index.fields.contains(field)) - }); - - if !fields_overlap { - return false; - } - - // If fields overlap, check if the fragment bitmap overlaps - if let Some(ref bitmap) = index.fragment_bitmap { - replacements - .iter() - .any(|replacement| bitmap.contains(replacement.0 as u32)) - } else { - // If no fragment bitmap, conservatively assume conflict - true - } - }) - } - _ => true, - } - } - Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { - Operation::CreateIndex { .. } => false, - Operation::ReserveFragments { .. } => false, - Operation::Delete { .. } - | Operation::Rewrite { .. } - | Operation::Update { .. } - | Operation::DataReplacement { .. } => { - // If we update the same fragments, we conflict. - self.operation.modifies_same_ids(&other.operation) - } - Operation::Project { .. } => false, - Operation::Append { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::Overwrite { .. } => match &other.operation { - // Overwrite only conflicts with another operation modifying the same update config - Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { - self.operation.upsert_key_conflict(&other.operation) - } - _ => false, - }, - Operation::UpdateConfig { - schema_metadata, - field_metadata, - .. - } => match &other.operation { - Operation::Overwrite { .. } => { - // Updates to schema metadata or field metadata conflict with any kind - // of overwrite. - if schema_metadata.is_some() || field_metadata.is_some() { - true - } else { - self.operation.upsert_key_conflict(&other.operation) - } - } - Operation::UpdateConfig { .. } => { - self.operation.upsert_key_conflict(&other.operation) - | self.operation.modifies_same_metadata(&other.operation) - } - _ => false, - }, - // Merge changes the schema, but preserves row ids, so the only operations - // it's compatible with is CreateIndex, ReserveFragments, SetMetadata and DeleteMetadata. - Operation::Merge { .. } => !matches!( - &other.operation, - Operation::CreateIndex { .. } - | Operation::ReserveFragments { .. } - | Operation::UpdateConfig { .. } - ), - Operation::Project { .. } => match &other.operation { - // Project is compatible with anything that doesn't change the schema - Operation::CreateIndex { .. } => false, - Operation::Overwrite { .. } => false, - Operation::UpdateConfig { .. } => false, - _ => true, - }, - Operation::DataReplacement { replacements } => match &other.operation { - Operation::Append { .. } - | Operation::Delete { .. } - | Operation::Update { .. } - | Operation::Merge { .. } - | Operation::UpdateConfig { .. } => false, - Operation::CreateIndex { new_indices, .. } => { - new_indices.iter().any(|index| { - // Check if the index fields overlap with the replacement fields - let fields_overlap = replacements.iter().any(|replacement| { - replacement - .1 - .fields - .iter() - .any(|field| index.fields.contains(field)) - }); - - if !fields_overlap { - return false; - } - - // If fields overlap, check if the fragment bitmap overlaps - if let Some(ref bitmap) = index.fragment_bitmap { - replacements - .iter() - .any(|replacement| bitmap.contains(replacement.0 as u32)) - } else { - // If no fragment bitmap, conservatively assume conflict - true - } - }) - } - Operation::Rewrite { .. } => self.operation.modifies_same_ids(&other.operation), - Operation::DataReplacement { - replacements: other_replacements, - } => replacements.iter().any(|replacement| { - other_replacements.iter().any(|other_replacement| { - replacement.0 == other_replacement.0 - && replacement - .1 - .fields - .iter() - .any(|field| other_replacement.1.fields.contains(field)) - }) - }), - _ => true, - }, - Operation::UpdateMemWalState { .. } => { - // UpdateMemWalState doesn't conflict with any other operations - // as it only modifies the MemWAL state tracking - false - } - } - } - fn fragments_with_ids<'a, T>( new_fragments: T, fragment_id: &'a mut u64, @@ -4608,257 +4351,4 @@ mod tests { // Verify idx_e removed (bad field) assert!(!indices.iter().any(|idx| idx.name == "idx_e")); } - - #[test] - fn test_data_replacement_conflicts() { - let data_file0 = DataFile { - path: "file0".to_string(), - fields: vec![0], - column_indices: vec![], - file_major_version: 1, - file_minor_version: 0, - file_size_bytes: CachedFileSize::new(1000), - base_id: None, - }; - - let data_file1 = DataFile { - path: "file1".to_string(), - fields: vec![1], - column_indices: vec![], - file_major_version: 1, - file_minor_version: 0, - file_size_bytes: CachedFileSize::new(1000), - base_id: None, - }; - - let data_file2 = DataFile { - path: "file2".to_string(), - fields: vec![0], - column_indices: vec![], - file_major_version: 1, - file_minor_version: 0, - file_size_bytes: CachedFileSize::new(1000), - base_id: None, - }; - - let replacements = vec![ - DataReplacementGroup(0, data_file0), - DataReplacementGroup(1, data_file1.clone()), - ]; - - let other_replacements = vec![ - DataReplacementGroup(1, data_file1), - DataReplacementGroup(2, data_file2.clone()), - ]; - - let operation = Operation::DataReplacement { replacements }; - let other_operation = Operation::DataReplacement { - replacements: other_replacements, - }; - - let transaction = Transaction::new(0, operation, None, None); - let other_transaction = Transaction::new(0, other_operation, None, None); - - // Conflicts because fragment 1 is being replaced in both transactions - assert!(transaction.conflicts_with(&other_transaction)); - - let non_conflicting_replacements = vec![DataReplacementGroup(2, data_file2)]; - let non_conflicting_operation = Operation::DataReplacement { - replacements: non_conflicting_replacements, - }; - let non_conflicting_transaction = - Transaction::new(0, non_conflicting_operation, None, None); - - // No conflict because the fragments being replaced are disjoint - assert!(!transaction.conflicts_with(&non_conflicting_transaction)); - } - - #[test] - fn test_data_replacement_with_create_index_conflict() { - let data_file = DataFile { - path: "file0".to_string(), - fields: vec![0], - column_indices: vec![], - file_major_version: 1, - file_minor_version: 0, - file_size_bytes: CachedFileSize::new(1000), - base_id: None, - }; - - let replacements = vec![DataReplacementGroup(0, data_file)]; - let operation = Operation::DataReplacement { replacements }; - - // Test 1: Index without fragment_bitmap conflicts when fields overlap - let index_no_bitmap = IndexMetadata { - uuid: uuid::Uuid::new_v4(), - name: "test_index".to_string(), - fields: vec![0], - dataset_version: 1, - fragment_bitmap: None, - index_details: None, - created_at: None, - index_version: 1, - base_id: None, - }; - - let create_index_operation = Operation::CreateIndex { - new_indices: vec![index_no_bitmap], - removed_indices: vec![], - }; - - let transaction = Transaction::new(0, operation.clone(), None, None); - let create_index_transaction = Transaction::new(0, create_index_operation, None, None); - - // Conflicts because the index is being created on the same field being replaced - // and we don't have fragment bitmap info - assert!(transaction.conflicts_with(&create_index_transaction)); - - // Test 2: Index with fragment_bitmap that includes the replaced fragment conflicts - let mut bitmap_with_fragment = RoaringBitmap::new(); - bitmap_with_fragment.insert(0); - bitmap_with_fragment.insert(1); - - let index_with_overlapping_bitmap = IndexMetadata { - uuid: uuid::Uuid::new_v4(), - name: "test_index".to_string(), - fields: vec![0], - dataset_version: 1, - fragment_bitmap: Some(bitmap_with_fragment), - index_details: None, - created_at: None, - index_version: 1, - base_id: None, - }; - - let create_index_overlapping = Operation::CreateIndex { - new_indices: vec![index_with_overlapping_bitmap], - removed_indices: vec![], - }; - - let transaction2 = Transaction::new(0, operation.clone(), None, None); - let create_index_transaction2 = Transaction::new(0, create_index_overlapping, None, None); - - // Conflicts because fragment 0 is in the index bitmap - assert!(transaction2.conflicts_with(&create_index_transaction2)); - - // Test 3: Index with fragment_bitmap that doesn't include the replaced fragment doesn't conflict - let mut bitmap_without_fragment = RoaringBitmap::new(); - bitmap_without_fragment.insert(1); - bitmap_without_fragment.insert(2); - - let index_with_non_overlapping_bitmap = IndexMetadata { - uuid: uuid::Uuid::new_v4(), - name: "test_index".to_string(), - fields: vec![0], - dataset_version: 1, - fragment_bitmap: Some(bitmap_without_fragment), - index_details: None, - created_at: None, - index_version: 1, - base_id: None, - }; - - let create_index_non_overlapping = Operation::CreateIndex { - new_indices: vec![index_with_non_overlapping_bitmap], - removed_indices: vec![], - }; - - let transaction3 = Transaction::new(0, operation.clone(), None, None); - let create_index_transaction3 = - Transaction::new(0, create_index_non_overlapping, None, None); - - // No conflict because fragment 0 is not in the index bitmap - assert!(!transaction3.conflicts_with(&create_index_transaction3)); - - // Test 4: Index on different field doesn't conflict even if fragment overlaps - let mut bitmap_with_fragment2 = RoaringBitmap::new(); - bitmap_with_fragment2.insert(0); - - let index_different_field = IndexMetadata { - uuid: uuid::Uuid::new_v4(), - name: "test_index".to_string(), - fields: vec![1], // Different field - dataset_version: 1, - fragment_bitmap: Some(bitmap_with_fragment2), - index_details: None, - created_at: None, - index_version: 1, - base_id: None, - }; - - let create_index_different_field = Operation::CreateIndex { - new_indices: vec![index_different_field], - removed_indices: vec![], - }; - - let transaction4 = Transaction::new(0, operation, None, None); - let create_index_transaction4 = - Transaction::new(0, create_index_different_field, None, None); - - // No conflict because fields don't overlap - assert!(!transaction4.conflicts_with(&create_index_transaction4)); - } - - #[test] - fn test_data_replacement_with_rewrite_conflict() { - let fragment0 = Fragment::new(0); - let fragment1 = Fragment::new(1); - - let data_file = DataFile { - path: "file0".to_string(), - fields: vec![0], - column_indices: vec![], - file_major_version: 1, - file_minor_version: 0, - file_size_bytes: CachedFileSize::new(1000), - base_id: None, - }; - - let replacements = vec![DataReplacementGroup(0, data_file)]; - let operation = Operation::DataReplacement { replacements }; - - let rewrite_group = RewriteGroup { - old_fragments: vec![fragment0], - new_fragments: vec![fragment1], - }; - - let rewrite_operation = Operation::Rewrite { - groups: vec![rewrite_group], - rewritten_indices: vec![], - frag_reuse_index: None, - }; - - let transaction = Transaction::new(0, operation, None, None); - let rewrite_transaction = Transaction::new(0, rewrite_operation, None, None); - - // Conflicts because the rewrite modifies the same fragment being replaced - assert!(transaction.conflicts_with(&rewrite_transaction)); - } - - #[test] - fn test_data_replacement_no_conflict_with_append() { - let data_file = DataFile { - path: "file0".to_string(), - fields: vec![0], - column_indices: vec![], - file_major_version: 1, - file_minor_version: 0, - file_size_bytes: CachedFileSize::new(1000), - base_id: None, - }; - - let replacements = vec![DataReplacementGroup(0, data_file)]; - let operation = Operation::DataReplacement { replacements }; - - let fragment = Fragment::new(1); - let append_operation = Operation::Append { - fragments: vec![fragment], - }; - - let transaction = Transaction::new(0, operation, None, None); - let append_transaction = Transaction::new(0, append_operation, None, None); - - // No conflict because append does not modify existing fragments - assert!(!transaction.conflicts_with(&append_transaction)); - } } diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 5e98b634f4e..c1977dd35cd 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -685,8 +685,24 @@ impl<'a> TransactionRebase<'a> { Ok(()) } } - Operation::DataReplacement { .. } | Operation::Merge { .. } => { - // TODO(rmeng): check that the fragments being replaced are not part of the groups + Operation::DataReplacement { replacements } => { + // These conflict if the rewrite touches any of the fragments being replaced. + for replacement in replacements { + for group in groups { + for old_fragment in &group.old_fragments { + if replacement.0 == old_fragment.id { + return Err(self.incompatible_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + } + } + } + Ok(()) + } + Operation::Merge { .. } => { Err(self.retryable_conflict_err(other_transaction, other_version, location!())) } Operation::CreateIndex { @@ -884,21 +900,46 @@ impl<'a> TransactionRebase<'a> { } Ok(()) } - Operation::Rewrite { .. } => { - // TODO(rmeng): check that the fragments being replaced are not part of the groups - Err(self.incompatible_conflict_err( - other_transaction, - other_version, - location!(), - )) + Operation::Rewrite { groups, .. } => { + // These conflict if the rewrite touches any of the fragments being replaced. + for replacement in replacements { + for group in groups { + for old_fragment in &group.old_fragments { + if replacement.0 == old_fragment.id { + return Err(self.incompatible_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + } + } + } + + Ok(()) } - Operation::DataReplacement { .. } => { - // TODO(rmeng): check cell conflicts - Err(self.incompatible_conflict_err( - other_transaction, - other_version, - location!(), - )) + Operation::DataReplacement { + replacements: other_replacements, + } => { + // These conflict if there is overlap in fragment id && fields. + for replacement in replacements { + for other_replacement in other_replacements { + if replacement.0 != other_replacement.0 { + continue; + } + + for field in &replacement.1.fields { + if other_replacement.1.fields.contains(field) { + return Err(self.incompatible_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + } + } + } + Ok(()) } Operation::Overwrite { .. } | Operation::Restore { .. } @@ -2994,4 +3035,13 @@ mod tests { } } } + + #[tokio::test] + async fn test_conflicts_data_replacement() { + // Test: two data replacements on different fragments should be compatible + // Test: two data replacements on same fragment but different fields should be compatible + // Test: two data replacements on same fragment and same fields should be retryable + // Test: a data replacement and rewrite on same fragment should be retryable + // Test: a data replacement and write on different fragments should be compatible + } } From c8c048b0d5553f49157b2c2bf94ba1c751e83c9e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 14 Nov 2025 15:24:34 -0800 Subject: [PATCH 4/4] add tests --- rust/lance/src/dataset.rs | 8 +- rust/lance/src/io/commit/conflict_resolver.rs | 142 ++++++++++++++++-- 2 files changed, 134 insertions(+), 16 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ec0ac823f4d..d3fd1bc0a8c 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -8873,12 +8873,7 @@ mod tests { } fn make_tx(read_version: u64) -> Transaction { - Transaction::new( - read_version, - Operation::Append { fragments: vec![] }, - None, - None, - ) + Transaction::new(read_version, Operation::Append { fragments: vec![] }, None) } async fn delete_external_tx_file(ds: &Dataset) { @@ -8939,7 +8934,6 @@ mod tests { ds.load_indices().await.unwrap().as_ref().clone(), &tx_file, &ManifestWriteConfig::default(), - None, ) .unwrap(); let location = write_manifest_file( diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index c1977dd35cd..0968f24db72 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -691,7 +691,7 @@ impl<'a> TransactionRebase<'a> { for group in groups { for old_fragment in &group.old_fragments { if replacement.0 == old_fragment.id { - return Err(self.incompatible_conflict_err( + return Err(self.retryable_conflict_err( other_transaction, other_version, location!(), @@ -906,7 +906,7 @@ impl<'a> TransactionRebase<'a> { for group in groups { for old_fragment in &group.old_fragments { if replacement.0 == old_fragment.id { - return Err(self.incompatible_conflict_err( + return Err(self.retryable_conflict_err( other_transaction, other_version, location!(), @@ -930,7 +930,7 @@ impl<'a> TransactionRebase<'a> { for field in &replacement.1.fields { if other_replacement.1.fields.contains(field) { - return Err(self.incompatible_conflict_err( + return Err(self.retryable_conflict_err( other_transaction, other_version, location!(), @@ -1706,12 +1706,13 @@ mod tests { use lance_table::io::deletion::{deletion_file_path, read_deletion_file}; use super::*; - use crate::dataset::transaction::RewriteGroup; + use crate::dataset::transaction::{DataReplacementGroup, RewriteGroup}; use crate::session::caches::DeletionFileKey; use crate::{ dataset::{CommitBuilder, InsertBuilder, WriteParams}, io, }; + use lance_table::format::DataFile; async fn test_dataset(num_rows: usize, num_fragments: usize) -> (Dataset, Arc) { let io_tracker = Arc::new(IOTracker::default()); @@ -3038,10 +3039,133 @@ mod tests { #[tokio::test] async fn test_conflicts_data_replacement() { - // Test: two data replacements on different fragments should be compatible - // Test: two data replacements on same fragment but different fields should be compatible - // Test: two data replacements on same fragment and same fields should be retryable - // Test: a data replacement and rewrite on same fragment should be retryable - // Test: a data replacement and write on different fragments should be compatible + use io::commit::conflict_resolver::tests::{modified_fragment_ids, ConflictResult::*}; + + let fragment0 = Fragment::new(0); + let fragment1 = Fragment::new(1); + + let data_file_frag0_fields01 = + DataFile::new_legacy_from_fields("path0_01", vec![0, 1], None); + let data_file_frag0_fields23 = + DataFile::new_legacy_from_fields("path0_23", vec![2, 3], None); + let data_file_frag1_fields01 = + DataFile::new_legacy_from_fields("path1_01", vec![0, 1], None); + + let cases = vec![ + ( + "Different fragments", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(1, data_file_frag1_fields01)], + }, + Compatible, + ), + ( + "Same fragment, different fields", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields23)], + }, + Compatible, + ), + ( + "Same fragment, same fields", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Retryable, + ), + ( + "Same fragment, overlapping fields", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::DataReplacement { + replacements: vec![DataReplacementGroup( + 0, + DataFile::new_legacy_from_fields("path0_12", vec![1, 2], None), + )], + }, + Retryable, + ), + ( + "DataReplacement vs Rewrite on same fragment", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::Rewrite { + groups: vec![RewriteGroup { + old_fragments: vec![fragment0.clone()], + new_fragments: vec![fragment1.clone()], + }], + rewritten_indices: vec![], + frag_reuse_index: None, + }, + Retryable, + ), + ( + "DataReplacement vs Rewrite on different fragment", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01)], + }, + Operation::Rewrite { + groups: vec![RewriteGroup { + old_fragments: vec![fragment1], + new_fragments: vec![fragment0], + }], + rewritten_indices: vec![], + frag_reuse_index: None, + }, + Compatible, + ), + ]; + + for (description, op1, op2, expected) in cases { + let txn1 = Transaction::new(0, op1.clone(), None); + let txn2 = Transaction::new(0, op2.clone(), None); + + let mut rebase = TransactionRebase { + transaction: txn1, + initial_fragments: HashMap::new(), + modified_fragment_ids: modified_fragment_ids(&op1).collect::>(), + affected_rows: None, + conflicting_frag_reuse_indices: Vec::new(), + }; + + let result = rebase.check_txn(&txn2, 1); + match expected { + Compatible => { + assert!( + result.is_ok(), + "{}: expected Compatible but got {:?}", + description, + result + ); + } + NotCompatible => { + assert!( + matches!(result, Err(Error::CommitConflict { .. })), + "{}: expected NotCompatible but got {:?}", + description, + result + ); + } + Retryable => { + assert!( + matches!(result, Err(Error::RetryableCommitConflict { .. })), + "{}: expected Retryable but got {:?}", + description, + result + ); + } + } + } } }