diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 9b077e7498c..b20f7e5e1be 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -562,6 +562,7 @@ fn convert_to_java_operation_inner<'local>( mem_wal_to_merge: _, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter: _, } => { let removed_ids: Vec> = removed_fragment_ids .iter() @@ -1043,6 +1044,7 @@ fn convert_to_rust_operation( mem_wal_to_merge: None, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter: None, } } "DataReplacement" => { diff --git a/protos/transaction.proto b/protos/transaction.proto index bec8155eb11..638898f4197 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -184,6 +184,46 @@ message Transaction { optional string branch_name = 5; } + // Exact set of key hashes for conflict detection. + // Used when the number of inserted rows is small. + message ExactKeySetFilter { + // 64-bit hashes of the inserted row keys. + repeated uint64 key_hashes = 1; + } + + // Bloom filter for key existence tests. + // Used when the number of rows is large. + message BloomFilter { + // Bitset backing the bloom filter (SBBF format). + bytes bitmap = 1; + // Number of bits in the bitmap. + uint32 num_bits = 2; + // Number of items the filter was sized for. + // Used for intersection validation (filters with different sizes cannot be compared). + // Default: 8192 + uint64 number_of_items = 3; + // False positive probability the filter was sized for. + // Used for intersection validation (filters with different parameters cannot be compared). + // Default: 0.00057 + double probability = 4; + } + + // A filter for checking key existence in set of rows inserted by a merge insert operation. + // Only created when the merge insert's ON columns match the schema's unenforced primary key. + // The presence of this filter indicates strict primary key conflict detection should be used. + // Can use either an exact set (for small row counts) or a Bloom filter (for large row counts). + message KeyExistenceFilter { + // Field IDs of columns participating in the key (must match unenforced primary key). + repeated int32 field_ids = 1; + // The underlying data structure storing the key hashes. + oneof data { + // Exact set of key hashes (used for small number of rows). + ExactKeySetFilter exact = 2; + // Bloom filter (used for large number of rows). + BloomFilter bloom = 3; + } + } + // An operation that updates rows but does not add or remove rows. message Update { // The fragments that have been removed. These are fragments where all rows @@ -202,6 +242,9 @@ message Transaction { repeated uint32 fields_for_preserving_frag_bitmap = 6; // The mode of update UpdateMode update_mode = 7; + // Filter for checking existence of keys in newly inserted rows, used for conflict detection. + // Only tracks keys from INSERT operations during merge insert, not updates. + optional KeyExistenceFilter inserted_rows = 8; } // The mode of update operation diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 87400afe743..c89c5dd800c 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -232,6 +232,7 @@ impl FromPyObject<'_> for PyLance { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter: None, }; Ok(Self(op)) } diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 73851ca7aeb..3057323b5da 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -18,7 +18,7 @@ use crate::scalar::{ use crate::{pb, Any}; use arrow_array::{Array, UInt64Array}; mod as_bytes; -mod sbbf; +pub mod sbbf; use arrow_schema::{DataType, Field}; use serde::{Deserialize, Serialize}; diff --git a/rust/lance-index/src/scalar/bloomfilter/sbbf.rs b/rust/lance-index/src/scalar/bloomfilter/sbbf.rs index da9f07a29ab..7f506f796f3 100644 --- a/rust/lance-index/src/scalar/bloomfilter/sbbf.rs +++ b/rust/lance-index/src/scalar/bloomfilter/sbbf.rs @@ -352,6 +352,70 @@ impl Sbbf { pub fn estimated_memory_size(&self) -> usize { self.blocks.capacity() * std::mem::size_of::() } + + /// Check if this filter might intersect with another filter. + /// Returns true if there's at least one bit position where both filters have a 1. + /// This is a fast check that may return false positives but never false negatives. + /// + /// Returns an error if the filters have different sizes, as bloom filters with + /// different configurations cannot be reliably compared. + pub fn might_intersect(&self, other: &Self) -> Result { + if self.blocks.len() != other.blocks.len() { + return Err(SbbfError::InvalidData { + message: format!( + "Cannot compare bloom filters with different sizes: {} blocks vs {} blocks. \ + Both filters must use the same configuration.", + self.blocks.len(), + other.blocks.len() + ), + }); + } + for i in 0..self.blocks.len() { + for j in 0..8 { + if (self.blocks[i][j] & other.blocks[i][j]) != 0 { + return Ok(true); + } + } + } + Ok(false) + } + + /// Check if this filter might intersect with a raw bitmap. + /// The bitmap should be in the same format as produced by to_bytes(). + /// + /// Returns an error if the bitmaps have different sizes, as bloom filters with + /// different configurations cannot be reliably compared. + pub fn might_intersect_bytes(&self, other_bytes: &[u8]) -> Result { + Self::bytes_might_intersect(&self.to_bytes(), other_bytes) + } + + /// Check if two raw bloom filter bitmaps might intersect. + /// Returns true if there's at least one bit position where both filters have a 1. + /// + /// This is a fast probabilistic check: if it returns false, the filters definitely + /// have no common elements. If it returns true, they might have common elements + /// (with possible false positives). + /// + /// Returns an error if the bitmaps have different sizes, as bloom filters with + /// different configurations cannot be reliably compared. + pub fn bytes_might_intersect(a: &[u8], b: &[u8]) -> Result { + if a.len() != b.len() { + return Err(SbbfError::InvalidData { + message: format!( + "Cannot compare bloom filters with different sizes: {} bytes vs {} bytes. \ + Both filters must use the same configuration.", + a.len(), + b.len() + ), + }); + } + for i in 0..a.len() { + if (a[i] & b[i]) != 0 { + return Ok(true); + } + } + Ok(false) + } } // Per spec we use xxHash with seed=0 diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 4c22b75d0b1..099b883d355 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2858,6 +2858,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, }; let mut dataset1 = Dataset::commit( test_uri, @@ -2930,6 +2931,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, }; let dataset2 = Dataset::commit( test_uri, diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 68800cbb5e2..f954115ada3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -45,6 +45,7 @@ //! the operation does not modify the region of the column being replaced. //! +use super::write::merge_insert::inserted_rows::KeyExistenceFilter; use super::{blob::BLOB_VERSION_CONFIG_KEY, ManifestWriteConfig}; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::index::mem_wal::update_mem_wal_index_in_indices_list; @@ -251,6 +252,9 @@ pub enum Operation { fields_for_preserving_frag_bitmap: Vec, /// The mode of update update_mode: Option, + /// Optional filter for detecting conflicts on inserted row keys. + /// Only tracks keys from INSERT operations during merge insert, not updates. + inserted_rows_filter: Option, }, /// Project to a new schema. This only changes the schema, not the data. @@ -449,6 +453,7 @@ impl PartialEq for Operation { mem_wal_to_merge: a_mem_wal_to_merge, fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap, update_mode: a_update_mode, + inserted_rows_filter: a_inserted_rows_filter, }, Self::Update { removed_fragment_ids: b_removed, @@ -458,6 +463,7 @@ impl PartialEq for Operation { mem_wal_to_merge: b_mem_wal_to_merge, fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap, update_mode: b_update_mode, + inserted_rows_filter: b_inserted_rows_filter, }, ) => { compare_vec(a_removed, b_removed) @@ -470,6 +476,7 @@ impl PartialEq for Operation { b_fields_for_preserving_frag_bitmap, ) && a_update_mode == b_update_mode + && a_inserted_rows_filter == b_inserted_rows_filter } (Self::Project { schema: a }, Self::Project { schema: b }) => a == b, ( @@ -1707,6 +1714,7 @@ impl Transaction { mem_wal_to_merge, fields_for_preserving_frag_bitmap, update_mode, + .. } => { // Extract existing fragments once for reuse let existing_fragments = maybe_existing_fragments?; @@ -2888,6 +2896,7 @@ impl TryFrom for Transaction { mem_wal_to_merge, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows, })) => Operation::Update { removed_fragment_ids, updated_fragments: updated_fragments @@ -2906,6 +2915,9 @@ impl TryFrom for Transaction { 1 => Some(UpdateMode::RewriteColumns), _ => Some(UpdateMode::RewriteRows), }, + inserted_rows_filter: inserted_rows + .map(|ik| KeyExistenceFilter::try_from(&ik)) + .transpose()?, }, Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => { Operation::Project { @@ -3216,6 +3228,7 @@ impl From<&Transaction> for pb::Transaction { mem_wal_to_merge, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter, } => pb::transaction::Operation::Update(pb::transaction::Update { removed_fragment_ids: removed_fragment_ids.clone(), updated_fragments: updated_fragments @@ -3233,6 +3246,7 @@ impl From<&Transaction> for pb::Transaction { UpdateMode::RewriteColumns => 1, }) .unwrap_or(0), + inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()), }), Operation::Project { schema } => { pb::transaction::Operation::Project(pb::transaction::Project { diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index f5ac12d0559..5070ee5e65f 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -761,6 +761,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, read_version: 1, tag: None, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index b6ec4112171..12df92f534f 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -19,7 +19,10 @@ // Internal column name for the merge action. Using "__action" to avoid collisions with user columns. const MERGE_ACTION_COLUMN: &str = "__action"; +pub mod inserted_rows; + use assign_action::merge_insert_action; +use inserted_rows::KeyExistenceFilter; use super::retry::{execute_with_retry, RetryConfig, RetryExecutor}; use super::{write_fragments_internal, CommitBuilder, WriteParams}; @@ -148,7 +151,7 @@ fn unzip_batch(batch: &RecordBatch, schema: &Schema) -> RecordBatch { } /// Format key values for error messages via extracting "on" column values from the given RecordBatch. -fn format_key_values_on_columns( +pub fn format_key_values_on_columns( batch: &RecordBatch, row_idx: usize, on_columns: &[String], @@ -184,7 +187,7 @@ fn format_key_values_on_columns( } /// Create duplicate rows error via extracting "on" column values from the given RecordBatch. -fn create_duplicate_row_error( +pub fn create_duplicate_row_error( batch: &RecordBatch, row_idx: usize, on_columns: &[String], @@ -1353,7 +1356,12 @@ impl MergeInsertJob { async fn execute_uncommitted_v2( self, source: SendableRecordBatchStream, - ) -> Result<(Transaction, MergeStats, Option)> { + ) -> Result<( + Transaction, + MergeStats, + Option, + Option, + )> { let plan = self.create_plan(source).await?; // Execute the plan @@ -1390,7 +1398,7 @@ impl MergeInsertJob { } // Extract merge stats from the execution plan - let (stats, transaction, affected_rows) = if let Some(full_exec) = + let (stats, transaction, affected_rows, inserted_rows_filter) = if let Some(full_exec) = plan.as_any() .downcast_ref::() { @@ -1403,7 +1411,8 @@ impl MergeInsertJob { location: location!(), })?; let affected_rows = full_exec.affected_rows().map(RowAddrTreeMap::from); - (stats, transaction, affected_rows) + let inserted_rows_filter = full_exec.inserted_rows_filter(); + (stats, transaction, affected_rows, inserted_rows_filter) } else if let Some(delete_exec) = plan .as_any() .downcast_ref::() @@ -1417,7 +1426,7 @@ impl MergeInsertJob { location: location!(), })?; let affected_rows = delete_exec.affected_rows().map(RowAddrTreeMap::from); - (stats, transaction, affected_rows) + (stats, transaction, affected_rows, None) } else { return Err(Error::Internal { message: "Expected FullSchemaMergeInsertExec or DeleteOnlyMergeInsertExec".into(), @@ -1425,7 +1434,7 @@ impl MergeInsertJob { }); }; - Ok((transaction, stats, affected_rows)) + Ok((transaction, stats, affected_rows, inserted_rows_filter)) } /// Check if the merge insert operation can use the fast path (create_plan). @@ -1491,11 +1500,13 @@ impl MergeInsertJob { let can_use_fast_path = self.can_use_create_plan(source.schema().as_ref()).await?; if can_use_fast_path { - let (transaction, stats, affected_rows) = self.execute_uncommitted_v2(source).await?; + let (transaction, stats, affected_rows, inserted_rows_filter) = + self.execute_uncommitted_v2(source).await?; return Ok(UncommittedMergeInsert { transaction, affected_rows, stats, + inserted_rows_filter, }); } @@ -1553,6 +1564,7 @@ impl MergeInsertJob { mem_wal_to_merge: self.params.mem_wal_to_merge, fields_for_preserving_frag_bitmap: vec![], // in-place update do not affect preserving frag bitmap update_mode: Some(RewriteColumns), + inserted_rows_filter: None, // not implemented for v1 }; // We have rewritten the fragments, not just the deletion files, so // we can't use affected rows here. @@ -1627,6 +1639,7 @@ impl MergeInsertJob { .map(|f| f.id as u32) .collect(), update_mode: Some(RewriteRows), + inserted_rows_filter: None, // not implemented for v1 }; let affected_rows = Some(RowAddrTreeMap::from(removed_row_addrs)); @@ -1644,6 +1657,7 @@ impl MergeInsertJob { transaction, affected_rows, stats, + inserted_rows_filter: None, // not implemented for v1 }) } @@ -1814,6 +1828,7 @@ pub struct UncommittedMergeInsert { pub transaction: Transaction, pub affected_rows: Option, pub stats: MergeStats, + pub inserted_rows_filter: Option, } /// Wrapper struct that combines MergeInsertJob with the source iterator for retry functionality @@ -2164,6 +2179,7 @@ mod tests { use super::*; use crate::dataset::scanner::ColumnOrdering; use crate::index::vector::VectorIndexParams; + use crate::io::commit::read_transaction_file; use crate::{ dataset::{builder::DatasetBuilder, InsertBuilder, ReadParams, WriteMode, WriteParams}, session::Session, @@ -2173,11 +2189,13 @@ mod tests { }, }; use arrow_array::types::Float32Type; + use arrow_array::RecordBatch; use arrow_array::{ types::{Int32Type, UInt32Type}, FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatchIterator, RecordBatchReader, StringArray, UInt32Array, }; + use arrow_schema::{DataType, Field, Schema}; use arrow_select::concat::concat_batches; use datafusion::common::Column; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -4261,6 +4279,430 @@ mod tests { ); } + #[tokio::test] + async fn test_transaction_inserted_rows_filter_roundtrip() { + // Create dataset with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2])), + Arc::new(UInt32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(); + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Source with overlapping key 1 + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![1, 3])), + Arc::new(UInt32Array::from(vec![2, 2])), + ], + ) + .unwrap(); + let stream = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(new_batch)]), + ); + + let UncommittedMergeInsert { transaction, .. } = + MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_uncommitted(Box::pin(stream) as SendableRecordBatchStream) + .await + .unwrap(); + + // Commit and read back transaction file + let committed = CommitBuilder::new(dataset.clone()) + .execute(transaction) + .await + .unwrap(); + let tx_path = committed.manifest().transaction_file.clone().unwrap(); + let tx_read = read_transaction_file(dataset.object_store(), &dataset.base, &tx_path) + .await + .unwrap(); + // Check that inserted_rows_filter is present in the Operation::Update + if let Operation::Update { + inserted_rows_filter, + .. + } = &tx_read.operation + { + assert!(inserted_rows_filter.is_some()); + let filter = inserted_rows_filter.as_ref().unwrap(); + // Field IDs are assigned by Lance schema; check that we tracked exactly 1 key field + assert_eq!(filter.field_ids.len(), 1); + } else { + panic!("Expected Operation::Update"); + } + } + + /// Test that two merge insert operations on the same existing key conflict. + /// First merge insert commits successfully, second one fails with conflict error + /// because both operations updated the same key (detected via bloom filter). + #[tokio::test] + async fn test_inserted_rows_filter_bloom_conflict_detection_concurrent() { + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Both jobs update/insert the same key 2 + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![2])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![2])), + Arc::new(UInt32Array::from(vec![2])), + ], + ) + .unwrap(); + + // Create second merge insert job based on version 1 with 0 retries + let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) + .try_build() + .unwrap(); + + // First merge insert commits (creates version 2) + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + let result1 = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + assert!(result1.is_ok(), "First merge insert should succeed"); + + // Second merge insert tries to commit based on version 1, needs to rebase against version 2 + let s2 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch2.clone())]), + ); + let result2 = b2.execute(Box::pin(s2) as SendableRecordBatchStream).await; + + // Second merge insert should fail because bloom filters show both updated key 2 + assert!( + matches!(result2, Err(crate::Error::TooMuchWriteContention { .. })), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + result2 + ); + } + + /// Test that two merge insert operations inserting the same NEW key conflict. + /// First merge insert commits successfully (inserts id=100), second one fails + /// with conflict error because both inserted the same new key (detected via bloom filter). + #[tokio::test] + async fn test_concurrent_insert_same_new_key() { + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + // Initial dataset with ids 0, 1, 2, 3 - NOT containing id=100 + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Both jobs try to INSERT the same NEW key id=100 (doesn't exist in initial data) + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), // NEW key id=100 + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), // Same NEW key id=100 + Arc::new(UInt32Array::from(vec![2])), + ], + ) + .unwrap(); + + // Create second merge insert job based on version 1 with 0 retries + let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) + .try_build() + .unwrap(); + + // First merge insert commits (creates version 2, inserts id=100) + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + let result1 = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + assert!(result1.is_ok(), "First merge insert should succeed"); + + // Second merge insert tries to commit based on version 1, needs to rebase against version 2 + let s2 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch2.clone())]), + ); + let result2 = b2.execute(Box::pin(s2) as SendableRecordBatchStream).await; + + // Second merge insert should fail because bloom filters show both inserted key 100 + assert!( + matches!(result2, Err(crate::Error::TooMuchWriteContention { .. })), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + result2 + ); + } + + /// Test that merge_insert with bloom filter fails when committing against + /// an Update transaction that doesn't have a filter. We can't determine if + /// the Update operation conflicted with our inserted rows. + #[tokio::test] + async fn test_merge_insert_conflict_with_update_without_filter() { + use crate::dataset::UpdateBuilder; + + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Create merge insert job based on version 1 + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) + .try_build() + .unwrap(); + + // Regular Update without bloom filter commits first (creates version 2) + let update_result = UpdateBuilder::new(dataset.clone()) + .update_where("id = 0") + .unwrap() + .set("value", "999") + .unwrap() + .build() + .unwrap() + .execute() + .await; + assert!(update_result.is_ok(), "Update should succeed"); + + // Now merge insert tries to commit based on version 1, needs to rebase against version 2 + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let merge_result = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + + // Merge insert should fail with retryable conflict because it can't + // determine if Update conflicted (Update has no inserted_rows_filter) + assert!( + matches!( + merge_result, + Err(crate::Error::TooMuchWriteContention { .. }) + ), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + merge_result + ); + } + + /// Test that merge_insert with bloom filter fails when committing against + /// an Append operation. We can't determine if the appended rows conflict + /// with our inserted rows. + #[tokio::test] + async fn test_merge_insert_conflict_with_append() { + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Create merge insert job based on version 1 + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) + .try_build() + .unwrap(); + + // Append commits first (creates version 2) + let append_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![50])), + Arc::new(UInt32Array::from(vec![2])), + ], + ) + .unwrap(); + let append_result = InsertBuilder::new(dataset.clone()) + .with_params(&WriteParams { + mode: WriteMode::Append, + ..Default::default() + }) + .execute(vec![append_batch]) + .await; + assert!(append_result.is_ok(), "Append should succeed"); + + // Now merge insert tries to commit based on version 1, needs to rebase against version 2 + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let merge_result = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + + // Merge insert should fail with retryable conflict because it can't + // determine if Append added conflicting keys + assert!( + matches!( + merge_result, + Err(crate::Error::TooMuchWriteContention { .. }) + ), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + merge_result + ); + } + #[tokio::test] async fn test_explain_plan() { // Set up test data using lance_datagen diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs index 12ae2359bab..1503b5b21c4 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs @@ -283,6 +283,7 @@ impl ExecutionPlan for DeleteOnlyMergeInsertExec { .map(|f| f.id as u32) .collect(), update_mode: None, + inserted_rows_filter: None, // Delete-only operations don't insert rows }; let transaction = Transaction::new(dataset.manifest.version, operation, None); diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index b2cef2dc601..4f24c94ca41 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex}; use arrow_array::{Array, RecordBatch, UInt64Array, UInt8Array}; use arrow_schema::Schema; use arrow_select; -use datafusion::common::Result as DFResult; +use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::{ execution::{SendableRecordBatchStream, TaskContext}, @@ -26,6 +26,9 @@ use snafu::location; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::dataset::utils::CapturedRowIds; +use crate::dataset::write::merge_insert::inserted_rows::{ + extract_key_value_from_batch, KeyExistenceFilter, KeyExistenceFilterBuilder, +}; use crate::dataset::write::merge_insert::{ create_duplicate_row_error, format_key_values_on_columns, }; @@ -51,6 +54,8 @@ struct MergeState { delete_row_addrs: RoaringTreemap, /// Shared collection to capture row ids that need to be updated updating_row_ids: Arc>, + /// Track keys of newly inserted rows (not updates). + inserted_rows_filter: KeyExistenceFilterBuilder, /// Merge operation metrics metrics: MergeInsertMetrics, /// Whether the dataset uses stable row ids. @@ -62,10 +67,16 @@ struct MergeState { } impl MergeState { - fn new(metrics: MergeInsertMetrics, stable_row_ids: bool, on_columns: Vec) -> Self { + fn new( + metrics: MergeInsertMetrics, + stable_row_ids: bool, + on_columns: Vec, + field_ids: Vec, + ) -> Self { Self { delete_row_addrs: RoaringTreemap::new(), updating_row_ids: Arc::new(Mutex::new(CapturedRowIds::new(stable_row_ids))), + inserted_rows_filter: KeyExistenceFilterBuilder::new(field_ids), metrics, stable_row_ids, processed_row_ids: HashSet::new(), @@ -116,6 +127,14 @@ impl MergeState { } Action::Insert => { // Insert action - just insert new data + // Capture the key value for conflict detection (only for inserts, not updates) + if let Some(key_value) = + extract_key_value_from_batch(batch, row_idx, &self.on_columns) + { + self.inserted_rows_filter + .insert(key_value) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } self.metrics.num_inserted_rows.add(1); Ok(Some(row_idx)) // Keep this row for writing } @@ -151,6 +170,10 @@ pub struct FullSchemaMergeInsertExec { merge_stats: Arc>>, transaction: Arc>>, affected_rows: Arc>>, + inserted_rows_filter: Arc>>, + /// Whether the ON columns match the schema's unenforced primary key. + /// If true, inserted_rows_filter will be included in the transaction for conflict detection. + is_primary_key: bool, } impl FullSchemaMergeInsertExec { @@ -167,6 +190,20 @@ impl FullSchemaMergeInsertExec { Boundedness::Bounded, ); + // Check if ON columns match the schema's unenforced primary key + let field_ids: Vec = params + .on + .iter() + .filter_map(|name| dataset.schema().field(name).map(|f| f.id)) + .collect(); + let pk_field_ids: Vec = dataset + .schema() + .unenforced_primary_key() + .iter() + .map(|f| f.id) + .collect(); + let is_primary_key = !pk_field_ids.is_empty() && field_ids == pk_field_ids; + Ok(Self { input, dataset, @@ -176,6 +213,8 @@ impl FullSchemaMergeInsertExec { merge_stats: Arc::new(Mutex::new(None)), transaction: Arc::new(Mutex::new(None)), affected_rows: Arc::new(Mutex::new(None)), + inserted_rows_filter: Arc::new(Mutex::new(None)), + is_primary_key, }) } @@ -197,6 +236,16 @@ impl FullSchemaMergeInsertExec { .and_then(|mut guard| guard.take()) } + /// Returns the filter for inserted row keys if the execution has completed. + /// This contains keys of newly inserted rows (not updates) for conflict detection. + /// Returns `None` if the execution is still in progress or hasn't started. + pub fn inserted_rows_filter(&self) -> Option { + self.inserted_rows_filter + .lock() + .ok() + .and_then(|guard| guard.clone()) + } + /// Takes the affected rows (deleted/updated row addresses) if the execution has completed. /// Returns `None` if the execution is still in progress or hasn't started. pub fn affected_rows(&self) -> Option { @@ -725,6 +774,8 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { merge_stats: self.merge_stats.clone(), transaction: self.transaction.clone(), affected_rows: self.affected_rows.clone(), + inserted_rows_filter: self.inserted_rows_filter.clone(), + is_primary_key: self.is_primary_key, })) } @@ -766,10 +817,18 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let input_stream = self.input.execute(partition, context)?; // Step 1: Create shared state and streaming processor for row addresses and write data + // Get field IDs for the ON columns from the dataset schema + let field_ids: Vec = self + .params + .on + .iter() + .filter_map(|name| self.dataset.schema().field(name).map(|f| f.id)) + .collect(); let merge_state = Arc::new(Mutex::new(MergeState::new( MergeInsertMetrics::new(&self.metrics, partition), self.dataset.manifest.uses_stable_row_ids(), self.params.on.clone(), + field_ids, ))); let write_data_stream = self.create_filtered_write_stream(input_stream, merge_state.clone())?; @@ -779,7 +838,9 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let merge_stats_holder = self.merge_stats.clone(); let transaction_holder = self.transaction.clone(); let affected_rows_holder = self.affected_rows.clone(); + let inserted_rows_filter_holder = self.inserted_rows_filter.clone(); let mem_wal_to_merge = self.params.mem_wal_to_merge.clone(); + let is_primary_key = self.is_primary_key; let updating_row_ids = { let state = merge_state.lock().unwrap(); state.updating_row_ids.clone() @@ -832,6 +893,13 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let merge_state = Mutex::into_inner(merge_state).expect("MergeState lock should be available"); let delete_row_addrs_clone = merge_state.delete_row_addrs; + let inserted_rows_filter = if is_primary_key { + Some(KeyExistenceFilter::from_bloom_filter( + &merge_state.inserted_rows_filter, + )) + } else { + None + }; let (updated_fragments, removed_fragment_ids) = apply_deletions(&dataset, &delete_row_addrs_clone).await?; @@ -850,6 +918,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { .map(|f| f.id as u32) .collect(), update_mode: Some(RewriteRows), + inserted_rows_filter: inserted_rows_filter.clone(), }; // Step 5: Create and store the transaction @@ -876,6 +945,9 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { if let Ok(mut affected_rows_guard) = affected_rows_holder.lock() { affected_rows_guard.replace(delete_row_addrs_clone); } + if let Ok(mut filter_guard) = inserted_rows_filter_holder.lock() { + *filter_guard = inserted_rows_filter; + } }; // Step 7: Return empty result (write operations don't return data) @@ -900,7 +972,7 @@ mod tests { #[test] fn test_merge_state_duplicate_rowid_detection() { let metrics = MergeInsertMetrics::new(&ExecutionPlanMetricsSet::new(), 0); - let mut merge_state = MergeState::new(metrics, false, Vec::new()); + let mut merge_state = MergeState::new(metrics, false, Vec::new(), Vec::new()); let row_addr_array = UInt64Array::from(vec![1000, 2000, 3000]); let row_id_array = UInt64Array::from(vec![100, 100, 300]); // Duplicate row_id 100 diff --git a/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs b/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs new file mode 100644 index 00000000000..db81fd6f831 --- /dev/null +++ b/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs @@ -0,0 +1,337 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Key existence tracking for merge insert conflict detection. + +use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; +use std::hash::{Hash, Hasher}; + +use arrow_array::cast::AsArray; +use arrow_array::{BinaryArray, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray}; +use arrow_schema::DataType; +use deepsize::DeepSizeOf; +use lance_core::Result; +use lance_index::scalar::bloomfilter::sbbf::{Sbbf, SbbfBuilder}; +use lance_table::format::pb; +use snafu::location; + +// Default bloom filter config: 8192 items @ 0.00057 fpp -> 16KiB filter +pub const BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS: u64 = 8192; +pub const BLOOM_FILTER_DEFAULT_PROBABILITY: f64 = 0.00057; + +/// Key value for conflict detection. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum KeyValue { + String(String), + Int64(i64), + UInt64(u64), + Binary(Vec), + Composite(Vec), +} + +impl KeyValue { + pub fn to_bytes(&self) -> Vec { + match self { + Self::String(s) => s.as_bytes().to_vec(), + Self::Int64(i) => i.to_le_bytes().to_vec(), + Self::UInt64(u) => u.to_le_bytes().to_vec(), + Self::Binary(b) => b.clone(), + Self::Composite(values) => { + let mut result = Vec::new(); + for value in values { + result.extend_from_slice(&value.to_bytes()); + result.push(0); + } + result + } + } + } + + pub fn hash_value(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.to_bytes().hash(&mut hasher); + hasher.finish() + } +} + +/// Builder for KeyExistenceFilter using Split Block Bloom Filter. +#[derive(Debug, Clone)] +pub struct KeyExistenceFilterBuilder { + sbbf: Sbbf, + field_ids: Vec, + item_count: usize, +} + +impl KeyExistenceFilterBuilder { + pub fn new(field_ids: Vec) -> Self { + let sbbf = SbbfBuilder::new() + .expected_items(BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS) + .false_positive_probability(BLOOM_FILTER_DEFAULT_PROBABILITY) + .build() + .expect("Failed to build SBBF"); + Self { + sbbf, + field_ids, + item_count: 0, + } + } + + pub fn insert(&mut self, key: KeyValue) -> Result<()> { + self.sbbf.insert(&key.to_bytes()[..]); + self.item_count += 1; + Ok(()) + } + + pub fn contains(&self, key: &KeyValue) -> bool { + self.sbbf.check(&key.to_bytes()[..]) + } + + pub fn might_intersect(&self, other: &Self) -> Result { + self.sbbf + .might_intersect(&other.sbbf) + .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!())) + } + + pub fn field_ids(&self) -> &[i32] { + &self.field_ids + } + + pub fn estimated_size_bytes(&self) -> usize { + self.sbbf.size_bytes() + } + + pub fn len(&self) -> usize { + self.item_count + } + + pub fn is_empty(&self) -> bool { + self.item_count == 0 + } + + pub fn build(&self) -> KeyExistenceFilter { + KeyExistenceFilter { + field_ids: self.field_ids.clone(), + filter: FilterType::Bloom { + bitmap: self.sbbf.to_bytes(), + num_bits: (self.sbbf.size_bytes() as u32) * 8, + number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, + probability: BLOOM_FILTER_DEFAULT_PROBABILITY, + }, + } + } +} + +impl From<&KeyExistenceFilterBuilder> for pb::transaction::KeyExistenceFilter { + fn from(builder: &KeyExistenceFilterBuilder) -> Self { + Self { + field_ids: builder.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Bloom( + pb::transaction::BloomFilter { + bitmap: builder.sbbf.to_bytes(), + num_bits: (builder.sbbf.size_bytes() as u32) * 8, + number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, + probability: BLOOM_FILTER_DEFAULT_PROBABILITY, + }, + )), + } + } +} + +/// Filter type for key existence data. +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] +pub enum FilterType { + ExactSet(HashSet), + Bloom { + bitmap: Vec, + num_bits: u32, + number_of_items: u64, + probability: f64, + }, +} + +/// Tracks keys of inserted rows for conflict detection. +/// Only created when ON columns match the schema's unenforced primary key. +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] +pub struct KeyExistenceFilter { + pub field_ids: Vec, + pub filter: FilterType, +} + +impl KeyExistenceFilter { + pub fn from_bloom_filter(bloom: &KeyExistenceFilterBuilder) -> Self { + bloom.build() + } + + /// Check if two filters intersect. Returns (has_intersection, might_be_false_positive). + /// Errors if bloom filter configs don't match. + pub fn intersects(&self, other: &Self) -> Result<(bool, bool)> { + match (&self.filter, &other.filter) { + (FilterType::ExactSet(a), FilterType::ExactSet(b)) => { + Ok((a.iter().any(|h| b.contains(h)), false)) + } + (FilterType::ExactSet(_), FilterType::Bloom { .. }) + | (FilterType::Bloom { .. }, FilterType::ExactSet(_)) => { + // Can't compare different hash schemes, assume intersection + Ok((true, true)) + } + ( + FilterType::Bloom { + bitmap: a_bits, + number_of_items: a_num_items, + probability: a_prob, + .. + }, + FilterType::Bloom { + bitmap: b_bits, + number_of_items: b_num_items, + probability: b_prob, + .. + }, + ) => { + if a_num_items != b_num_items || (a_prob - b_prob).abs() > f64::EPSILON { + return Err(lance_core::Error::invalid_input( + format!( + "Bloom filter config mismatch: ({}, {}) vs ({}, {})", + a_num_items, a_prob, b_num_items, b_prob + ), + location!(), + )); + } + let has = Sbbf::bytes_might_intersect(a_bits, b_bits) + .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!()))?; + Ok((has, has)) + } + } + } +} + +impl From<&KeyExistenceFilter> for pb::transaction::KeyExistenceFilter { + fn from(filter: &KeyExistenceFilter) -> Self { + match &filter.filter { + FilterType::ExactSet(hashes) => Self { + field_ids: filter.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Exact( + pb::transaction::ExactKeySetFilter { + key_hashes: hashes.iter().copied().collect(), + }, + )), + }, + FilterType::Bloom { + bitmap, + num_bits, + number_of_items, + probability, + } => Self { + field_ids: filter.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Bloom( + pb::transaction::BloomFilter { + bitmap: bitmap.clone(), + num_bits: *num_bits, + number_of_items: *number_of_items, + probability: *probability, + }, + )), + }, + } + } +} + +impl TryFrom<&pb::transaction::KeyExistenceFilter> for KeyExistenceFilter { + type Error = lance_core::Error; + + fn try_from(message: &pb::transaction::KeyExistenceFilter) -> Result { + let filter = match message.data.as_ref() { + Some(pb::transaction::key_existence_filter::Data::Exact(exact)) => { + FilterType::ExactSet(exact.key_hashes.iter().copied().collect()) + } + Some(pb::transaction::key_existence_filter::Data::Bloom(b)) => { + // Use defaults for backwards compatibility + let number_of_items = if b.number_of_items == 0 { + BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS + } else { + b.number_of_items + }; + let probability = if b.probability == 0.0 { + BLOOM_FILTER_DEFAULT_PROBABILITY + } else { + b.probability + }; + FilterType::Bloom { + bitmap: b.bitmap.clone(), + num_bits: b.num_bits, + number_of_items, + probability, + } + } + None => FilterType::ExactSet(HashSet::new()), + }; + Ok(Self { + field_ids: message.field_ids.clone(), + filter, + }) + } +} + +/// Extract key value from a batch row. Returns None if null or unsupported type. +pub fn extract_key_value_from_batch( + batch: &RecordBatch, + row_idx: usize, + on_columns: &[String], +) -> Option { + let mut parts: Vec = Vec::with_capacity(on_columns.len()); + + for col_name in on_columns { + let (col_idx, _) = batch.schema().column_with_name(col_name)?; + let column = batch.column(col_idx); + + if column.is_null(row_idx) { + return None; + } + + let key_part = match column.data_type() { + DataType::Utf8 => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::String(arr.value(row_idx).to_string()) + } + DataType::LargeUtf8 => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::String(arr.value(row_idx).to_string()) + } + DataType::UInt64 => { + let arr = column.as_primitive::(); + KeyValue::UInt64(arr.value(row_idx)) + } + DataType::Int64 => { + let arr = column.as_primitive::(); + KeyValue::Int64(arr.value(row_idx)) + } + DataType::UInt32 => { + let arr = column.as_primitive::(); + KeyValue::UInt64(arr.value(row_idx) as u64) + } + DataType::Int32 => { + let arr = column.as_primitive::(); + KeyValue::Int64(arr.value(row_idx) as i64) + } + DataType::Binary => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::Binary(arr.value(row_idx).to_vec()) + } + DataType::LargeBinary => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::Binary(arr.value(row_idx).to_vec()) + } + _ => return None, + }; + parts.push(key_part); + } + + if parts.is_empty() { + None + } else if parts.len() == 1 { + Some(parts.into_iter().next().unwrap()) + } else { + Some(KeyValue::Composite(parts)) + } +} diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index e301444ee48..d2cac743f92 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -391,6 +391,7 @@ impl UpdateJob { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap, update_mode: Some(RewriteRows), + inserted_rows_filter: None, }; let transaction = Transaction::new(dataset.manifest.version, operation, None); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 8c588a36277..5900c92fa8d 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -63,7 +63,7 @@ use log; use object_store::path::Path; use prost::Message; -mod conflict_resolver; +pub mod conflict_resolver; #[cfg(all(feature = "dynamodb_tests", test))] mod dynamodb; #[cfg(test)] diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 5669608a4dd..0fd52ab2a8b 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -179,8 +179,8 @@ impl<'a> TransactionRebase<'a> { } /// Check whether the transaction conflicts with another transaction. - /// Mutate the current [TransactionRebase] based on [other_transaction] to be used for - /// eventually [finish] the rebase process. + /// Mutate the current [TransactionRebase] based on `other_transaction` to be used for + /// eventually finishing the rebase process. /// /// Will return an error if the transaction is not valid. Otherwise, it will /// return Ok(()). @@ -343,17 +343,84 @@ impl<'a> TransactionRebase<'a> { other_version: u64, ) -> Result<()> { if let Operation::Update { - mem_wal_to_merge, .. + mem_wal_to_merge, + inserted_rows_filter: self_inserted_rows_filter, + .. } = &self.transaction.operation { + if let Operation::Update { + inserted_rows_filter: other_inserted_rows_filter, + .. + } = &other_transaction.operation + { + // The presence of inserted_rows_filter means this is a primary key operation + // and strict conflict detection should be applied. + match (self_inserted_rows_filter, other_inserted_rows_filter) { + (Some(self_keys), Some(other_keys)) => { + if self_keys.field_ids != other_keys.field_ids { + // Different key columns - can't verify conflicts + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + // Check for intersection. If the bloom filter configs don't match + // (e.g., different number_of_items or probability), intersects() returns + // an error and we treat it as a conflict to be safe. + let Ok((has_intersection, _maybe_false_positive)) = + self_keys.intersects(other_keys) + else { + // Bloom filter configs don't match - treat as conflict + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + }; + if has_intersection { + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + } + (Some(_), None) => { + // Current transaction has primary key conflict detection but + // the already committed transaction doesn't have a filter. + // We can't determine what rows were inserted by the other + // transaction, so we must fail to be safe. + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + _ => {} + } + } + match &other_transaction.operation { Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } | Operation::Project { .. } - | Operation::Append { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } | Operation::UpdateBases { .. } => Ok(()), + Operation::Append { .. } => { + // If current transaction has primary key conflict detection, + // we can't safely commit against an Append because we don't + // know if the appended rows conflict with inserted rows. + if self_inserted_rows_filter.is_some() { + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + Ok(()) + } Operation::Rewrite { groups, .. } => { if groups .iter() @@ -1797,6 +1864,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }; let transaction = Transaction::new_from_version(1, operation); let other_operations = [ @@ -1808,6 +1876,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, Operation::Delete { deleted_fragment_ids: vec![3], @@ -1822,6 +1891,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ]; let other_transactions = other_operations.map(|op| Transaction::new_from_version(2, op)); @@ -1923,6 +1993,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, Operation::Delete { updated_fragments: vec![apply_deletion(&[1], &mut fragment, &dataset).await], @@ -1937,6 +2008,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ]; let transactions = @@ -2058,6 +2130,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ), ( @@ -2070,6 +2143,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ), ( @@ -2228,6 +2302,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, create_update_config_for_test( Some(HashMap::from_iter(vec![( @@ -2423,6 +2498,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, [ Compatible, // append @@ -2845,6 +2921,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ]; @@ -3149,7 +3226,7 @@ mod tests { "{}: expected NotCompatible but got {:?}", description, result - ); + ) } Retryable => { assert!(