From dd6bb27f96a1ca97242884a10f2876fdc4a2f6d7 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 6 Jan 2026 21:45:03 -0800 Subject: [PATCH 1/2] commit --- java/lance-jni/src/transaction.rs | 2 + protos/transaction.proto | 43 ++ python/src/transaction.rs | 1 + rust/lance-index/src/scalar/bloomfilter.rs | 2 +- .../src/scalar/bloomfilter/sbbf.rs | 85 +++ rust/lance/src/dataset/fragment.rs | 2 + rust/lance/src/dataset/transaction.rs | 14 + rust/lance/src/dataset/write/commit.rs | 1 + rust/lance/src/dataset/write/merge_insert.rs | 533 +++++++++++++++++- .../dataset/write/merge_insert/exec/delete.rs | 1 + .../dataset/write/merge_insert/exec/write.rs | 78 ++- .../write/merge_insert/inserted_rows.rs | 412 ++++++++++++++ rust/lance/src/dataset/write/update.rs | 1 + rust/lance/src/io/commit.rs | 2 +- rust/lance/src/io/commit/conflict_resolver.rs | 87 ++- 15 files changed, 1246 insertions(+), 18 deletions(-) create mode 100644 rust/lance/src/dataset/write/merge_insert/inserted_rows.rs diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 9b077e7498c..b20f7e5e1be 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -562,6 +562,7 @@ fn convert_to_java_operation_inner<'local>( mem_wal_to_merge: _, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter: _, } => { let removed_ids: Vec> = removed_fragment_ids .iter() @@ -1043,6 +1044,7 @@ fn convert_to_rust_operation( mem_wal_to_merge: None, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter: None, } } "DataReplacement" => { diff --git a/protos/transaction.proto b/protos/transaction.proto index bec8155eb11..638898f4197 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -184,6 +184,46 @@ message Transaction { optional string branch_name = 5; } + // Exact set of key hashes for conflict detection. + // Used when the number of inserted rows is small. + message ExactKeySetFilter { + // 64-bit hashes of the inserted row keys. + repeated uint64 key_hashes = 1; + } + + // Bloom filter for key existence tests. + // Used when the number of rows is large. + message BloomFilter { + // Bitset backing the bloom filter (SBBF format). + bytes bitmap = 1; + // Number of bits in the bitmap. + uint32 num_bits = 2; + // Number of items the filter was sized for. + // Used for intersection validation (filters with different sizes cannot be compared). + // Default: 8192 + uint64 number_of_items = 3; + // False positive probability the filter was sized for. + // Used for intersection validation (filters with different parameters cannot be compared). + // Default: 0.00057 + double probability = 4; + } + + // A filter for checking key existence in set of rows inserted by a merge insert operation. + // Only created when the merge insert's ON columns match the schema's unenforced primary key. + // The presence of this filter indicates strict primary key conflict detection should be used. + // Can use either an exact set (for small row counts) or a Bloom filter (for large row counts). + message KeyExistenceFilter { + // Field IDs of columns participating in the key (must match unenforced primary key). + repeated int32 field_ids = 1; + // The underlying data structure storing the key hashes. + oneof data { + // Exact set of key hashes (used for small number of rows). + ExactKeySetFilter exact = 2; + // Bloom filter (used for large number of rows). + BloomFilter bloom = 3; + } + } + // An operation that updates rows but does not add or remove rows. message Update { // The fragments that have been removed. These are fragments where all rows @@ -202,6 +242,9 @@ message Transaction { repeated uint32 fields_for_preserving_frag_bitmap = 6; // The mode of update UpdateMode update_mode = 7; + // Filter for checking existence of keys in newly inserted rows, used for conflict detection. + // Only tracks keys from INSERT operations during merge insert, not updates. + optional KeyExistenceFilter inserted_rows = 8; } // The mode of update operation diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 87400afe743..c89c5dd800c 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -232,6 +232,7 @@ impl FromPyObject<'_> for PyLance { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter: None, }; Ok(Self(op)) } diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 73851ca7aeb..3057323b5da 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -18,7 +18,7 @@ use crate::scalar::{ use crate::{pb, Any}; use arrow_array::{Array, UInt64Array}; mod as_bytes; -mod sbbf; +pub mod sbbf; use arrow_schema::{DataType, Field}; use serde::{Deserialize, Serialize}; diff --git a/rust/lance-index/src/scalar/bloomfilter/sbbf.rs b/rust/lance-index/src/scalar/bloomfilter/sbbf.rs index da9f07a29ab..1ffce69c3c8 100644 --- a/rust/lance-index/src/scalar/bloomfilter/sbbf.rs +++ b/rust/lance-index/src/scalar/bloomfilter/sbbf.rs @@ -352,6 +352,58 @@ impl Sbbf { pub fn estimated_memory_size(&self) -> usize { self.blocks.capacity() * std::mem::size_of::() } + + /// Check if this filter might intersect with another filter. + /// Returns true if there's at least one bit position where both filters have a 1. + /// This is a fast check that may return false positives but never false negatives. + /// + /// Returns an error if the filters have different sizes, as bloom filters with + /// different configurations cannot be reliably compared. + pub fn might_intersect(&self, other: &Self) -> Result { + if self.blocks.len() != other.blocks.len() { + return Err(SbbfError::InvalidData { + message: format!( + "Cannot compare bloom filters with different sizes: {} blocks vs {} blocks. \ + Both filters must use the same configuration.", + self.blocks.len(), + other.blocks.len() + ), + }); + } + for i in 0..self.blocks.len() { + for j in 0..8 { + if (self.blocks[i][j] & other.blocks[i][j]) != 0 { + return Ok(true); + } + } + } + Ok(false) + } + + /// Check if this filter might intersect with a raw bitmap. + /// The bitmap should be in the same format as produced by to_bytes(). + /// + /// Returns an error if the bitmaps have different sizes, as bloom filters with + /// different configurations cannot be reliably compared. + pub fn might_intersect_bytes(&self, other_bytes: &[u8]) -> Result { + let self_bytes = self.to_bytes(); + if self_bytes.len() != other_bytes.len() { + return Err(SbbfError::InvalidData { + message: format!( + "Cannot compare bloom filters with different sizes: {} bytes vs {} bytes. \ + Both filters must use the same configuration.", + self_bytes.len(), + other_bytes.len() + ), + }); + } + for i in 0..self_bytes.len() { + if (self_bytes[i] & other_bytes[i]) != 0 { + return Ok(true); + } + } + Ok(false) + } } // Per spec we use xxHash with seed=0 @@ -418,6 +470,39 @@ impl Default for SbbfBuilder { } } +// ============================================================================ +// Bloom Filter helper functions for raw bitmap operations +// ============================================================================ + +/// Check if two serialized bloom filter bitmaps might contain overlapping elements. +/// Returns true if there's at least one bit position where both filters have a 1, +/// indicating a potential intersection of the sets they represent. +/// +/// This is a fast probabilistic check: if it returns false, the filters definitely +/// have no common elements. If it returns true, they might have common elements +/// (with possible false positives). +/// +/// Returns an error if the bitmaps have different sizes, as bloom filters with +/// different configurations cannot be reliably compared. +pub fn bloom_filters_might_overlap(a: &[u8], b: &[u8]) -> Result { + if a.len() != b.len() { + return Err(SbbfError::InvalidData { + message: format!( + "Cannot compare bloom filters with different sizes: {} bytes vs {} bytes. \ + Both filters must use the same configuration.", + a.len(), + b.len() + ), + }); + } + for i in 0..a.len() { + if (a[i] & b[i]) != 0 { + return Ok(true); + } + } + Ok(false) +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 4c22b75d0b1..099b883d355 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2858,6 +2858,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, }; let mut dataset1 = Dataset::commit( test_uri, @@ -2930,6 +2931,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, }; let dataset2 = Dataset::commit( test_uri, diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 68800cbb5e2..21fea6c9e4c 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -45,6 +45,7 @@ //! the operation does not modify the region of the column being replaced. //! +use super::write::merge_insert::inserted_rows::KeyExistenceFilter; use super::{blob::BLOB_VERSION_CONFIG_KEY, ManifestWriteConfig}; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::index::mem_wal::update_mem_wal_index_in_indices_list; @@ -251,6 +252,9 @@ pub enum Operation { fields_for_preserving_frag_bitmap: Vec, /// The mode of update update_mode: Option, + /// Optional filter for detecting conflicts on inserted row keys. + /// Only tracks keys from INSERT operations during merge insert, not updates. + inserted_rows_filter: Option, }, /// Project to a new schema. This only changes the schema, not the data. @@ -449,6 +453,7 @@ impl PartialEq for Operation { mem_wal_to_merge: a_mem_wal_to_merge, fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap, update_mode: a_update_mode, + inserted_rows_filter: a_inserted_rows_filter, }, Self::Update { removed_fragment_ids: b_removed, @@ -458,6 +463,7 @@ impl PartialEq for Operation { mem_wal_to_merge: b_mem_wal_to_merge, fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap, update_mode: b_update_mode, + inserted_rows_filter: b_inserted_rows_filter, }, ) => { compare_vec(a_removed, b_removed) @@ -470,6 +476,7 @@ impl PartialEq for Operation { b_fields_for_preserving_frag_bitmap, ) && a_update_mode == b_update_mode + && a_inserted_rows_filter == b_inserted_rows_filter } (Self::Project { schema: a }, Self::Project { schema: b }) => a == b, ( @@ -1707,6 +1714,7 @@ impl Transaction { mem_wal_to_merge, fields_for_preserving_frag_bitmap, update_mode, + .. } => { // Extract existing fragments once for reuse let existing_fragments = maybe_existing_fragments?; @@ -2888,6 +2896,7 @@ impl TryFrom for Transaction { mem_wal_to_merge, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows, })) => Operation::Update { removed_fragment_ids, updated_fragments: updated_fragments @@ -2906,6 +2915,9 @@ impl TryFrom for Transaction { 1 => Some(UpdateMode::RewriteColumns), _ => Some(UpdateMode::RewriteRows), }, + inserted_rows_filter: inserted_rows + .map(|ik| KeyExistenceFilter::from_pb(&ik)) + .transpose()?, }, Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => { Operation::Project { @@ -3216,6 +3228,7 @@ impl From<&Transaction> for pb::Transaction { mem_wal_to_merge, fields_for_preserving_frag_bitmap, update_mode, + inserted_rows_filter, } => pb::transaction::Operation::Update(pb::transaction::Update { removed_fragment_ids: removed_fragment_ids.clone(), updated_fragments: updated_fragments @@ -3233,6 +3246,7 @@ impl From<&Transaction> for pb::Transaction { UpdateMode::RewriteColumns => 1, }) .unwrap_or(0), + inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.to_pb()), }), Operation::Project { schema } => { pb::transaction::Operation::Project(pb::transaction::Project { diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index f5ac12d0559..5070ee5e65f 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -761,6 +761,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, read_version: 1, tag: None, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index b6ec4112171..d5406afff51 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -19,7 +19,10 @@ // Internal column name for the merge action. Using "__action" to avoid collisions with user columns. const MERGE_ACTION_COLUMN: &str = "__action"; +pub mod inserted_rows; + use assign_action::merge_insert_action; +use inserted_rows::KeyExistenceFilter; use super::retry::{execute_with_retry, RetryConfig, RetryExecutor}; use super::{write_fragments_internal, CommitBuilder, WriteParams}; @@ -148,7 +151,7 @@ fn unzip_batch(batch: &RecordBatch, schema: &Schema) -> RecordBatch { } /// Format key values for error messages via extracting "on" column values from the given RecordBatch. -fn format_key_values_on_columns( +pub fn format_key_values_on_columns( batch: &RecordBatch, row_idx: usize, on_columns: &[String], @@ -184,7 +187,7 @@ fn format_key_values_on_columns( } /// Create duplicate rows error via extracting "on" column values from the given RecordBatch. -fn create_duplicate_row_error( +pub fn create_duplicate_row_error( batch: &RecordBatch, row_idx: usize, on_columns: &[String], @@ -1353,7 +1356,12 @@ impl MergeInsertJob { async fn execute_uncommitted_v2( self, source: SendableRecordBatchStream, - ) -> Result<(Transaction, MergeStats, Option)> { + ) -> Result<( + Transaction, + MergeStats, + Option, + Option, + )> { let plan = self.create_plan(source).await?; // Execute the plan @@ -1390,7 +1398,7 @@ impl MergeInsertJob { } // Extract merge stats from the execution plan - let (stats, transaction, affected_rows) = if let Some(full_exec) = + let (stats, transaction, affected_rows, inserted_rows_filter) = if let Some(full_exec) = plan.as_any() .downcast_ref::() { @@ -1403,7 +1411,8 @@ impl MergeInsertJob { location: location!(), })?; let affected_rows = full_exec.affected_rows().map(RowAddrTreeMap::from); - (stats, transaction, affected_rows) + let inserted_rows_filter = full_exec.inserted_rows_filter(); + (stats, transaction, affected_rows, inserted_rows_filter) } else if let Some(delete_exec) = plan .as_any() .downcast_ref::() @@ -1417,7 +1426,7 @@ impl MergeInsertJob { location: location!(), })?; let affected_rows = delete_exec.affected_rows().map(RowAddrTreeMap::from); - (stats, transaction, affected_rows) + (stats, transaction, affected_rows, None) } else { return Err(Error::Internal { message: "Expected FullSchemaMergeInsertExec or DeleteOnlyMergeInsertExec".into(), @@ -1425,7 +1434,7 @@ impl MergeInsertJob { }); }; - Ok((transaction, stats, affected_rows)) + Ok((transaction, stats, affected_rows, inserted_rows_filter)) } /// Check if the merge insert operation can use the fast path (create_plan). @@ -1491,11 +1500,13 @@ impl MergeInsertJob { let can_use_fast_path = self.can_use_create_plan(source.schema().as_ref()).await?; if can_use_fast_path { - let (transaction, stats, affected_rows) = self.execute_uncommitted_v2(source).await?; + let (transaction, stats, affected_rows, inserted_rows_filter) = + self.execute_uncommitted_v2(source).await?; return Ok(UncommittedMergeInsert { transaction, affected_rows, stats, + inserted_rows_filter, }); } @@ -1553,6 +1564,7 @@ impl MergeInsertJob { mem_wal_to_merge: self.params.mem_wal_to_merge, fields_for_preserving_frag_bitmap: vec![], // in-place update do not affect preserving frag bitmap update_mode: Some(RewriteColumns), + inserted_rows_filter: None, // not implemented for v1 }; // We have rewritten the fragments, not just the deletion files, so // we can't use affected rows here. @@ -1627,6 +1639,8 @@ impl MergeInsertJob { .map(|f| f.id as u32) .collect(), update_mode: Some(RewriteRows), + // Slow path doesn't track inserted keys - conflict detection not available + inserted_rows_filter: None, // not implemented for v1 }; let affected_rows = Some(RowAddrTreeMap::from(removed_row_addrs)); @@ -1644,6 +1658,7 @@ impl MergeInsertJob { transaction, affected_rows, stats, + inserted_rows_filter: None, // not implemented for v1 }) } @@ -1814,6 +1829,7 @@ pub struct UncommittedMergeInsert { pub transaction: Transaction, pub affected_rows: Option, pub stats: MergeStats, + pub inserted_rows_filter: Option, } /// Wrapper struct that combines MergeInsertJob with the source iterator for retry functionality @@ -2164,6 +2180,7 @@ mod tests { use super::*; use crate::dataset::scanner::ColumnOrdering; use crate::index::vector::VectorIndexParams; + use crate::io::commit::read_transaction_file; use crate::{ dataset::{builder::DatasetBuilder, InsertBuilder, ReadParams, WriteMode, WriteParams}, session::Session, @@ -2173,11 +2190,13 @@ mod tests { }, }; use arrow_array::types::Float32Type; + use arrow_array::RecordBatch; use arrow_array::{ types::{Int32Type, UInt32Type}, FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatchIterator, RecordBatchReader, StringArray, UInt32Array, }; + use arrow_schema::{DataType, Field, Schema}; use arrow_select::concat::concat_batches; use datafusion::common::Column; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -4261,6 +4280,504 @@ mod tests { ); } + #[tokio::test] + async fn test_transaction_inserted_rows_filter_roundtrip() { + // Create dataset with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2])), + Arc::new(UInt32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(); + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Source with overlapping key 1 + let new_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![1, 3])), + Arc::new(UInt32Array::from(vec![2, 2])), + ], + ) + .unwrap(); + let stream = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(new_batch)]), + ); + + let UncommittedMergeInsert { transaction, .. } = + MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_uncommitted(Box::pin(stream) as SendableRecordBatchStream) + .await + .unwrap(); + + // Commit and read back transaction file + let committed = CommitBuilder::new(dataset.clone()) + .execute(transaction) + .await + .unwrap(); + let tx_path = committed.manifest().transaction_file.clone().unwrap(); + let tx_read = read_transaction_file(dataset.object_store(), &dataset.base, &tx_path) + .await + .unwrap(); + // Check that inserted_rows_filter is present in the Operation::Update + if let Operation::Update { + inserted_rows_filter, + .. + } = &tx_read.operation + { + assert!(inserted_rows_filter.is_some()); + let filter = inserted_rows_filter.as_ref().unwrap(); + // Field IDs are assigned by Lance schema; check that we tracked exactly 1 key field + assert_eq!(filter.field_ids.len(), 1); + } else { + panic!("Expected Operation::Update"); + } + } + + #[tokio::test] + async fn test_inserted_rows_filter_bloom_conflict_detection_concurrent() { + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + // Throttle to increase contention + let throttled = Arc::new(ThrottledStoreWrapper { + config: ThrottleConfig { + wait_put_per_call: Duration::from_millis(5), + wait_get_per_call: Duration::from_millis(5), + wait_list_per_call: Duration::from_millis(5), + ..Default::default() + }, + }); + + let dataset = InsertBuilder::new("memory://") + .with_params(&WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(throttled.clone()), + ..Default::default() + }), + ..Default::default() + }) + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Both jobs update/insert the same key 2 + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![2])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![2])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let s2 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch2.clone())]), + ); + + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let t1 = tokio::spawn(async move { + b1.execute(Box::pin(s1) as SendableRecordBatchStream) + .await + .unwrap() + .1 + }); + let t2 = tokio::spawn(async move { + b2.execute(Box::pin(s2) as SendableRecordBatchStream) + .await + .unwrap() + .1 + }); + + let s1 = t1.await.unwrap(); + let s2 = t2.await.unwrap(); + // At least one attempt should include a retry under contention + assert!(s1.num_attempts >= 1); + assert!(s2.num_attempts >= 1); + + // Validate final dataset has id=2 updated to 1, without duplicates + let mut ds_latest = dataset.as_ref().clone(); + ds_latest.checkout_latest().await.unwrap(); + let batch = ds_latest.scan().try_into_batch().await.unwrap(); + let ids = batch["id"].as_primitive::().values(); + let vals = batch["value"].as_primitive::().values(); + // find index of id==2 + let pos = ids.iter().position(|&x| x == 2).unwrap(); + assert_eq!(vals[pos], 1); + } + + /// Test concurrent INSERT of the same NEW key (key doesn't exist in initial data) + /// Both transactions try to INSERT a row with id=100, which doesn't exist. + /// This tests the bloom filter conflict detection for INSERT operations, + /// as opposed to UPDATE operations on existing keys. + #[tokio::test] + async fn test_concurrent_insert_same_new_key() { + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + // Initial dataset with ids 0, 1, 2, 3 - NOT containing id=100 + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + // Throttle to increase contention + let throttled = Arc::new(ThrottledStoreWrapper { + config: ThrottleConfig { + wait_put_per_call: Duration::from_millis(5), + wait_get_per_call: Duration::from_millis(5), + wait_list_per_call: Duration::from_millis(5), + ..Default::default() + }, + }); + + let dataset = InsertBuilder::new("memory://") + .with_params(&WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(throttled.clone()), + ..Default::default() + }), + ..Default::default() + }) + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Both jobs try to INSERT the same NEW key id=100 (doesn't exist in initial data) + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), // NEW key id=100 + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), // Same NEW key id=100 + Arc::new(UInt32Array::from(vec![2])), + ], + ) + .unwrap(); + + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let s2 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch2.clone())]), + ); + + // Both use InsertAll for when_not_matched, so both will try to INSERT id=100 + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let t1 = tokio::spawn(async move { + b1.execute(Box::pin(s1) as SendableRecordBatchStream) + .await + .unwrap() + .1 + }); + let t2 = tokio::spawn(async move { + b2.execute(Box::pin(s2) as SendableRecordBatchStream) + .await + .unwrap() + .1 + }); + + let s1 = t1.await.unwrap(); + let s2 = t2.await.unwrap(); + // At least one attempt should include a retry under contention + // because both transactions tried to insert the same new key + assert!(s1.num_attempts >= 1); + assert!(s2.num_attempts >= 1); + + // Validate final dataset has exactly one row with id=100 + let mut ds_latest = dataset.as_ref().clone(); + ds_latest.checkout_latest().await.unwrap(); + let batch = ds_latest.scan().try_into_batch().await.unwrap(); + let ids = batch["id"].as_primitive::().values(); + + // Count occurrences of id=100 - should be exactly 1 (no duplicates) + let count_100 = ids.iter().filter(|&&x| x == 100).count(); + assert_eq!( + count_100, 1, + "Expected exactly one row with id=100, but found {}", + count_100 + ); + + // Should have 5 total rows: original 4 + 1 new row with id=100 + assert_eq!( + batch.num_rows(), + 5, + "Expected 5 rows, but found {}", + batch.num_rows() + ); + } + + /// Test that merge_insert with bloom filter fails when committing against + /// an Update transaction that doesn't have a filter. We can't determine if + /// the Update operation conflicted with our inserted rows. + #[tokio::test] + async fn test_merge_insert_conflict_with_update_without_filter() { + use crate::dataset::UpdateBuilder; + + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Create merge insert job based on version 1 + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) + .try_build() + .unwrap(); + + // Regular Update without bloom filter commits first (creates version 2) + let update_result = UpdateBuilder::new(dataset.clone()) + .update_where("id = 0") + .unwrap() + .set("value", "999") + .unwrap() + .build() + .unwrap() + .execute() + .await; + assert!(update_result.is_ok(), "Update should succeed"); + + // Now merge insert tries to commit based on version 1, needs to rebase against version 2 + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let merge_result = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + + // Merge insert should fail with retryable conflict because it can't + // determine if Update conflicted (Update has no inserted_rows_filter) + assert!( + matches!( + merge_result, + Err(crate::Error::TooMuchWriteContention { .. }) + ), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + merge_result + ); + } + + /// Test that merge_insert with bloom filter fails when committing against + /// an Append operation. We can't determine if the appended rows conflict + /// with our inserted rows. + #[tokio::test] + async fn test_merge_insert_conflict_with_append() { + // Create schema with unenforced primary key on "id" column + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false).with_metadata( + vec![( + "lance-schema:unenforced-primary-key".to_string(), + "true".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("value", DataType::UInt32, false), + ])); + let initial = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![0, 1, 2, 3])), + Arc::new(UInt32Array::from(vec![0, 0, 0, 0])), + ], + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .execute(vec![initial]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + + // Create merge insert job based on version 1 + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![100])), + Arc::new(UInt32Array::from(vec![1])), + ], + ) + .unwrap(); + + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) + .try_build() + .unwrap(); + + // Append commits first (creates version 2) + let append_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![50])), + Arc::new(UInt32Array::from(vec![2])), + ], + ) + .unwrap(); + let append_result = InsertBuilder::new(dataset.clone()) + .with_params(&WriteParams { + mode: WriteMode::Append, + ..Default::default() + }) + .execute(vec![append_batch]) + .await; + assert!(append_result.is_ok(), "Append should succeed"); + + // Now merge insert tries to commit based on version 1, needs to rebase against version 2 + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let merge_result = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + + // Merge insert should fail with retryable conflict because it can't + // determine if Append added conflicting keys + assert!( + matches!( + merge_result, + Err(crate::Error::TooMuchWriteContention { .. }) + ), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + merge_result + ); + } + #[tokio::test] async fn test_explain_plan() { // Set up test data using lance_datagen diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs index 12ae2359bab..1503b5b21c4 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs @@ -283,6 +283,7 @@ impl ExecutionPlan for DeleteOnlyMergeInsertExec { .map(|f| f.id as u32) .collect(), update_mode: None, + inserted_rows_filter: None, // Delete-only operations don't insert rows }; let transaction = Transaction::new(dataset.manifest.version, operation, None); diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index b2cef2dc601..4f24c94ca41 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex}; use arrow_array::{Array, RecordBatch, UInt64Array, UInt8Array}; use arrow_schema::Schema; use arrow_select; -use datafusion::common::Result as DFResult; +use datafusion::common::{DataFusionError, Result as DFResult}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::{ execution::{SendableRecordBatchStream, TaskContext}, @@ -26,6 +26,9 @@ use snafu::location; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::dataset::utils::CapturedRowIds; +use crate::dataset::write::merge_insert::inserted_rows::{ + extract_key_value_from_batch, KeyExistenceFilter, KeyExistenceFilterBuilder, +}; use crate::dataset::write::merge_insert::{ create_duplicate_row_error, format_key_values_on_columns, }; @@ -51,6 +54,8 @@ struct MergeState { delete_row_addrs: RoaringTreemap, /// Shared collection to capture row ids that need to be updated updating_row_ids: Arc>, + /// Track keys of newly inserted rows (not updates). + inserted_rows_filter: KeyExistenceFilterBuilder, /// Merge operation metrics metrics: MergeInsertMetrics, /// Whether the dataset uses stable row ids. @@ -62,10 +67,16 @@ struct MergeState { } impl MergeState { - fn new(metrics: MergeInsertMetrics, stable_row_ids: bool, on_columns: Vec) -> Self { + fn new( + metrics: MergeInsertMetrics, + stable_row_ids: bool, + on_columns: Vec, + field_ids: Vec, + ) -> Self { Self { delete_row_addrs: RoaringTreemap::new(), updating_row_ids: Arc::new(Mutex::new(CapturedRowIds::new(stable_row_ids))), + inserted_rows_filter: KeyExistenceFilterBuilder::new(field_ids), metrics, stable_row_ids, processed_row_ids: HashSet::new(), @@ -116,6 +127,14 @@ impl MergeState { } Action::Insert => { // Insert action - just insert new data + // Capture the key value for conflict detection (only for inserts, not updates) + if let Some(key_value) = + extract_key_value_from_batch(batch, row_idx, &self.on_columns) + { + self.inserted_rows_filter + .insert(key_value) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } self.metrics.num_inserted_rows.add(1); Ok(Some(row_idx)) // Keep this row for writing } @@ -151,6 +170,10 @@ pub struct FullSchemaMergeInsertExec { merge_stats: Arc>>, transaction: Arc>>, affected_rows: Arc>>, + inserted_rows_filter: Arc>>, + /// Whether the ON columns match the schema's unenforced primary key. + /// If true, inserted_rows_filter will be included in the transaction for conflict detection. + is_primary_key: bool, } impl FullSchemaMergeInsertExec { @@ -167,6 +190,20 @@ impl FullSchemaMergeInsertExec { Boundedness::Bounded, ); + // Check if ON columns match the schema's unenforced primary key + let field_ids: Vec = params + .on + .iter() + .filter_map(|name| dataset.schema().field(name).map(|f| f.id)) + .collect(); + let pk_field_ids: Vec = dataset + .schema() + .unenforced_primary_key() + .iter() + .map(|f| f.id) + .collect(); + let is_primary_key = !pk_field_ids.is_empty() && field_ids == pk_field_ids; + Ok(Self { input, dataset, @@ -176,6 +213,8 @@ impl FullSchemaMergeInsertExec { merge_stats: Arc::new(Mutex::new(None)), transaction: Arc::new(Mutex::new(None)), affected_rows: Arc::new(Mutex::new(None)), + inserted_rows_filter: Arc::new(Mutex::new(None)), + is_primary_key, }) } @@ -197,6 +236,16 @@ impl FullSchemaMergeInsertExec { .and_then(|mut guard| guard.take()) } + /// Returns the filter for inserted row keys if the execution has completed. + /// This contains keys of newly inserted rows (not updates) for conflict detection. + /// Returns `None` if the execution is still in progress or hasn't started. + pub fn inserted_rows_filter(&self) -> Option { + self.inserted_rows_filter + .lock() + .ok() + .and_then(|guard| guard.clone()) + } + /// Takes the affected rows (deleted/updated row addresses) if the execution has completed. /// Returns `None` if the execution is still in progress or hasn't started. pub fn affected_rows(&self) -> Option { @@ -725,6 +774,8 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { merge_stats: self.merge_stats.clone(), transaction: self.transaction.clone(), affected_rows: self.affected_rows.clone(), + inserted_rows_filter: self.inserted_rows_filter.clone(), + is_primary_key: self.is_primary_key, })) } @@ -766,10 +817,18 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let input_stream = self.input.execute(partition, context)?; // Step 1: Create shared state and streaming processor for row addresses and write data + // Get field IDs for the ON columns from the dataset schema + let field_ids: Vec = self + .params + .on + .iter() + .filter_map(|name| self.dataset.schema().field(name).map(|f| f.id)) + .collect(); let merge_state = Arc::new(Mutex::new(MergeState::new( MergeInsertMetrics::new(&self.metrics, partition), self.dataset.manifest.uses_stable_row_ids(), self.params.on.clone(), + field_ids, ))); let write_data_stream = self.create_filtered_write_stream(input_stream, merge_state.clone())?; @@ -779,7 +838,9 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let merge_stats_holder = self.merge_stats.clone(); let transaction_holder = self.transaction.clone(); let affected_rows_holder = self.affected_rows.clone(); + let inserted_rows_filter_holder = self.inserted_rows_filter.clone(); let mem_wal_to_merge = self.params.mem_wal_to_merge.clone(); + let is_primary_key = self.is_primary_key; let updating_row_ids = { let state = merge_state.lock().unwrap(); state.updating_row_ids.clone() @@ -832,6 +893,13 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { let merge_state = Mutex::into_inner(merge_state).expect("MergeState lock should be available"); let delete_row_addrs_clone = merge_state.delete_row_addrs; + let inserted_rows_filter = if is_primary_key { + Some(KeyExistenceFilter::from_bloom_filter( + &merge_state.inserted_rows_filter, + )) + } else { + None + }; let (updated_fragments, removed_fragment_ids) = apply_deletions(&dataset, &delete_row_addrs_clone).await?; @@ -850,6 +918,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { .map(|f| f.id as u32) .collect(), update_mode: Some(RewriteRows), + inserted_rows_filter: inserted_rows_filter.clone(), }; // Step 5: Create and store the transaction @@ -876,6 +945,9 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { if let Ok(mut affected_rows_guard) = affected_rows_holder.lock() { affected_rows_guard.replace(delete_row_addrs_clone); } + if let Ok(mut filter_guard) = inserted_rows_filter_holder.lock() { + *filter_guard = inserted_rows_filter; + } }; // Step 7: Return empty result (write operations don't return data) @@ -900,7 +972,7 @@ mod tests { #[test] fn test_merge_state_duplicate_rowid_detection() { let metrics = MergeInsertMetrics::new(&ExecutionPlanMetricsSet::new(), 0); - let mut merge_state = MergeState::new(metrics, false, Vec::new()); + let mut merge_state = MergeState::new(metrics, false, Vec::new(), Vec::new()); let row_addr_array = UInt64Array::from(vec![1000, 2000, 3000]); let row_id_array = UInt64Array::from(vec![100, 100, 300]); // Duplicate row_id 100 diff --git a/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs b/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs new file mode 100644 index 00000000000..dc4cec8fb74 --- /dev/null +++ b/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs @@ -0,0 +1,412 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Key existence tracking for merge insert conflict detection. +//! +//! This module provides data structures for tracking keys of newly inserted rows +//! during merge insert operations. This is used for detecting conflicts when +//! concurrent transactions attempt to insert rows with the same key. + +use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; +use std::hash::{Hash, Hasher}; + +use arrow_array::cast::AsArray; +use arrow_array::{BinaryArray, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray}; +use arrow_schema::DataType; +use deepsize::DeepSizeOf; +use lance_core::Result; +use lance_index::scalar::bloomfilter::sbbf::{bloom_filters_might_overlap, Sbbf, SbbfBuilder}; +use lance_table::format::pb; +use snafu::location; + +// ============================================================================ +// Constants for Bloom Filter configuration +// ============================================================================ + +// IMPORTANT: All bloom filters for conflict detection MUST use the same size. +// Different sized filters cannot be correctly compared for intersection. +// These values are fixed to ensure all filters have identical dimensions. +// +// These values match the defaults in lance-index bloomfilter.rs: +// NumberOfItems: 8192 + Probability: 0.00057(1 in 1754) -> NumberOfBytes: 16384(16KiB) +pub const BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS: u64 = 8192; +pub const BLOOM_FILTER_DEFAULT_PROBABILITY: f64 = 0.00057; + +/// Create a BloomFilter protobuf message with default configuration +fn create_bloom_filter_pb(bitmap: Vec, num_bits: u32) -> pb::transaction::BloomFilter { + pb::transaction::BloomFilter { + bitmap, + num_bits, + number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, + probability: BLOOM_FILTER_DEFAULT_PROBABILITY, + } +} + +// ============================================================================ +// Key types for conflict detection +// ============================================================================ + +/// Key value that can be used in conflict detection for inserted rows +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum KeyValue { + String(String), + Int64(i64), + UInt64(u64), + Binary(Vec), + Composite(Vec), +} + +impl KeyValue { + /// Convert the key value to bytes for hashing + pub fn to_bytes(&self) -> Vec { + match self { + Self::String(s) => s.as_bytes().to_vec(), + Self::Int64(i) => i.to_le_bytes().to_vec(), + Self::UInt64(u) => u.to_le_bytes().to_vec(), + Self::Binary(b) => b.clone(), + Self::Composite(values) => { + let mut result = Vec::new(); + for value in values { + result.extend_from_slice(&value.to_bytes()); + result.push(0); // separator + } + result + } + } + } + + /// Get a hash of the key value + pub fn hash_value(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.to_bytes().hash(&mut hasher); + hasher.finish() + } +} + +// ============================================================================ +// Bloom Filter Builder for tracking inserted rows +// ============================================================================ + +/// Builder for KeyExistenceFilter using a Split Block Bloom Filter (SBBF). +/// Used to track keys of inserted rows for conflict detection. +#[derive(Debug, Clone)] +pub struct KeyExistenceFilterBuilder { + sbbf: Sbbf, + /// Field IDs of columns that form the key (must match unenforced primary key) + field_ids: Vec, + /// Number of items inserted (for len()) + item_count: usize, +} + +impl KeyExistenceFilterBuilder { + /// Create a new KeyExistenceFilterBuilder using SBBF with default parameters. + pub fn new(field_ids: Vec) -> Self { + let sbbf = SbbfBuilder::new() + .expected_items(BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS) + .false_positive_probability(BLOOM_FILTER_DEFAULT_PROBABILITY) + .build() + .expect("Failed to build SBBF for KeyExistenceFilterBuilder"); + Self { + sbbf, + field_ids, + item_count: 0, + } + } + + /// Add a key to the filter + pub fn insert(&mut self, key: KeyValue) -> Result<()> { + let bytes = key.to_bytes(); + self.sbbf.insert(&bytes[..]); + self.item_count += 1; + Ok(()) + } + + /// Check if a key might be present + pub fn contains(&self, key: &KeyValue) -> bool { + let bytes = key.to_bytes(); + self.sbbf.check(&bytes[..]) + } + + /// Check if this filter might intersect with another filter. + /// This is probabilistic - may return true for non-overlapping sets (false positive). + /// + /// Returns an error if the filters have different sizes/configurations. + pub fn might_intersect(&self, other: &Self) -> Result { + self.sbbf + .might_intersect(&other.sbbf) + .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!())) + } + + /// Get the key field IDs + pub fn field_ids(&self) -> &[i32] { + &self.field_ids + } + + /// Get the estimated size in bytes + pub fn estimated_size_bytes(&self) -> usize { + self.sbbf.size_bytes() + } + + /// Convert to typed protobuf KeyExistenceFilter (Bloom variant) + pub fn to_pb(&self) -> pb::transaction::KeyExistenceFilter { + let bitmap = self.sbbf.to_bytes(); + let num_bits = (self.sbbf.size_bytes() as u32) * 8; + pb::transaction::KeyExistenceFilter { + field_ids: self.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Bloom( + create_bloom_filter_pb(bitmap, num_bits), + )), + } + } + + /// Get the number of items + pub fn len(&self) -> usize { + self.item_count + } + + /// Check if empty + pub fn is_empty(&self) -> bool { + self.item_count == 0 + } + + /// Check if this filter might produce false positives (Bloom filters are probabilistic) + pub fn might_have_false_positives(&self) -> bool { + true + } + + /// Build a KeyExistenceFilter from this builder + pub fn build(&self) -> KeyExistenceFilter { + let bitmap = self.sbbf.to_bytes(); + let num_bits = (self.sbbf.size_bytes() as u32) * 8; + KeyExistenceFilter { + field_ids: self.field_ids.clone(), + filter: FilterType::Bloom { + bitmap, + num_bits, + number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, + probability: BLOOM_FILTER_DEFAULT_PROBABILITY, + }, + } + } +} + +// ============================================================================ +// KeyExistenceFilter metadata for conflict detection +// ============================================================================ + +/// Type of filter used for storing key existence data +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] +pub enum FilterType { + ExactSet(HashSet), + Bloom { + bitmap: Vec, + num_bits: u32, + /// Number of items the filter was sized for + number_of_items: u64, + /// False positive probability the filter was sized for + probability: f64, + }, +} + +/// Metadata about keys of newly inserted rows, used for conflict detection. +/// Only created when the merge insert's ON columns match the schema's unenforced primary key. +/// The presence of this filter indicates strict primary key conflict detection should be used. +/// Only tracks keys from INSERT operations, not UPDATE operations. +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] +pub struct KeyExistenceFilter { + pub field_ids: Vec, + pub filter: FilterType, +} + +impl KeyExistenceFilter { + /// Create KeyExistenceFilter from a bloom filter builder + pub fn from_bloom_filter(bloom: &KeyExistenceFilterBuilder) -> Self { + bloom.build() + } + + pub fn to_pb(&self) -> pb::transaction::KeyExistenceFilter { + match &self.filter { + FilterType::ExactSet(hashes) => pb::transaction::KeyExistenceFilter { + field_ids: self.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Exact( + pb::transaction::ExactKeySetFilter { + key_hashes: hashes.iter().copied().collect(), + }, + )), + }, + FilterType::Bloom { + bitmap, + num_bits, + number_of_items, + probability, + } => pb::transaction::KeyExistenceFilter { + field_ids: self.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Bloom( + pb::transaction::BloomFilter { + bitmap: bitmap.clone(), + num_bits: *num_bits, + number_of_items: *number_of_items, + probability: *probability, + }, + )), + }, + } + } + + pub fn from_pb(message: &pb::transaction::KeyExistenceFilter) -> Result { + let field_ids = message.field_ids.clone(); + let filter = match message.data.as_ref() { + Some(pb::transaction::key_existence_filter::Data::Exact(exact)) => { + FilterType::ExactSet(exact.key_hashes.iter().copied().collect()) + } + Some(pb::transaction::key_existence_filter::Data::Bloom(b)) => { + // Use defaults for backwards compatibility with older filters + // that don't have number_of_items/probability set (they will be 0) + let number_of_items = if b.number_of_items == 0 { + BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS + } else { + b.number_of_items + }; + let probability = if b.probability == 0.0 { + BLOOM_FILTER_DEFAULT_PROBABILITY + } else { + b.probability + }; + FilterType::Bloom { + bitmap: b.bitmap.clone(), + num_bits: b.num_bits, + number_of_items, + probability, + } + } + None => { + // Treat missing data as empty exact set + FilterType::ExactSet(HashSet::new()) + } + }; + Ok(Self { field_ids, filter }) + } + + /// Determine if 2 filters intersect, and whether it might be a false positive. + /// + /// Returns `Err` if the bloom filter configs don't match (different expected_items or fpp), + /// since bloom filters with different sizes cannot be reliably compared. + /// + /// Returns `Ok((has_intersection, might_be_false_positive))` on success. + pub fn intersects(&self, other: &Self) -> Result<(bool, bool)> { + match (&self.filter, &other.filter) { + (FilterType::ExactSet(a), FilterType::ExactSet(b)) => { + let has = a.iter().any(|h| b.contains(h)); + Ok((has, false)) + } + (FilterType::ExactSet(_), FilterType::Bloom { .. }) + | (FilterType::Bloom { .. }, FilterType::ExactSet(_)) => { + // ExactSet stores hashes from an unknown scheme, while Bloom uses XxHash64. + // Since we can't reliably compare them, we conservatively assume + // there might be an intersection to avoid missing conflicts. + Ok((true, true)) + } + ( + FilterType::Bloom { + bitmap: a_bits, + number_of_items: a_num_items, + probability: a_prob, + .. + }, + FilterType::Bloom { + bitmap: b_bits, + number_of_items: b_num_items, + probability: b_prob, + .. + }, + ) => { + // Bloom filters with different configurations cannot be reliably compared + // because they have different sizes and bit patterns + if a_num_items != b_num_items || (a_prob - b_prob).abs() > f64::EPSILON { + return Err(lance_core::Error::invalid_input( + format!( + "Cannot compare bloom filters with different configurations: \ + self(number_of_items={}, probability={}) vs other(number_of_items={}, probability={}). \ + Both filters must use the same parameters for reliable intersection checking.", + a_num_items, a_prob, b_num_items, b_prob + ), + location!(), + )); + } + // Since configs are validated above, this should not fail + let has = bloom_filters_might_overlap(a_bits, b_bits) + .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!()))?; + Ok((has, has)) + } + } + } +} + +// ============================================================================ +// Utility functions for extracting key values from batches +// ============================================================================ + +/// Extract key value from a batch row for conflict detection bloom filter. +/// Returns None if any ON column is null or has an unsupported type. +pub fn extract_key_value_from_batch( + batch: &RecordBatch, + row_idx: usize, + on_columns: &[String], +) -> Option { + let mut parts: Vec = Vec::with_capacity(on_columns.len()); + + for col_name in on_columns { + let (col_idx, _) = batch.schema().column_with_name(col_name)?; + let column = batch.column(col_idx); + + if column.is_null(row_idx) { + return None; // Skip rows with null key values + } + + let key_part = match column.data_type() { + DataType::Utf8 => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::String(arr.value(row_idx).to_string()) + } + DataType::LargeUtf8 => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::String(arr.value(row_idx).to_string()) + } + DataType::UInt64 => { + let arr = column.as_primitive::(); + KeyValue::UInt64(arr.value(row_idx)) + } + DataType::Int64 => { + let arr = column.as_primitive::(); + KeyValue::Int64(arr.value(row_idx)) + } + DataType::UInt32 => { + let arr = column.as_primitive::(); + KeyValue::UInt64(arr.value(row_idx) as u64) + } + DataType::Int32 => { + let arr = column.as_primitive::(); + KeyValue::Int64(arr.value(row_idx) as i64) + } + DataType::Binary => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::Binary(arr.value(row_idx).to_vec()) + } + DataType::LargeBinary => { + let arr = column.as_any().downcast_ref::()?; + KeyValue::Binary(arr.value(row_idx).to_vec()) + } + _ => return None, // Unsupported type + }; + parts.push(key_part); + } + + if parts.is_empty() { + None + } else if parts.len() == 1 { + Some(parts.into_iter().next().unwrap()) + } else { + Some(KeyValue::Composite(parts)) + } +} diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index e301444ee48..d2cac743f92 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -391,6 +391,7 @@ impl UpdateJob { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap, update_mode: Some(RewriteRows), + inserted_rows_filter: None, }; let transaction = Transaction::new(dataset.manifest.version, operation, None); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 8c588a36277..5900c92fa8d 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -63,7 +63,7 @@ use log; use object_store::path::Path; use prost::Message; -mod conflict_resolver; +pub mod conflict_resolver; #[cfg(all(feature = "dynamodb_tests", test))] mod dynamodb; #[cfg(test)] diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 5669608a4dd..0fd52ab2a8b 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -179,8 +179,8 @@ impl<'a> TransactionRebase<'a> { } /// Check whether the transaction conflicts with another transaction. - /// Mutate the current [TransactionRebase] based on [other_transaction] to be used for - /// eventually [finish] the rebase process. + /// Mutate the current [TransactionRebase] based on `other_transaction` to be used for + /// eventually finishing the rebase process. /// /// Will return an error if the transaction is not valid. Otherwise, it will /// return Ok(()). @@ -343,17 +343,84 @@ impl<'a> TransactionRebase<'a> { other_version: u64, ) -> Result<()> { if let Operation::Update { - mem_wal_to_merge, .. + mem_wal_to_merge, + inserted_rows_filter: self_inserted_rows_filter, + .. } = &self.transaction.operation { + if let Operation::Update { + inserted_rows_filter: other_inserted_rows_filter, + .. + } = &other_transaction.operation + { + // The presence of inserted_rows_filter means this is a primary key operation + // and strict conflict detection should be applied. + match (self_inserted_rows_filter, other_inserted_rows_filter) { + (Some(self_keys), Some(other_keys)) => { + if self_keys.field_ids != other_keys.field_ids { + // Different key columns - can't verify conflicts + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + // Check for intersection. If the bloom filter configs don't match + // (e.g., different number_of_items or probability), intersects() returns + // an error and we treat it as a conflict to be safe. + let Ok((has_intersection, _maybe_false_positive)) = + self_keys.intersects(other_keys) + else { + // Bloom filter configs don't match - treat as conflict + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + }; + if has_intersection { + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + } + (Some(_), None) => { + // Current transaction has primary key conflict detection but + // the already committed transaction doesn't have a filter. + // We can't determine what rows were inserted by the other + // transaction, so we must fail to be safe. + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + _ => {} + } + } + match &other_transaction.operation { Operation::CreateIndex { .. } | Operation::ReserveFragments { .. } | Operation::Project { .. } - | Operation::Append { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } | Operation::UpdateBases { .. } => Ok(()), + Operation::Append { .. } => { + // If current transaction has primary key conflict detection, + // we can't safely commit against an Append because we don't + // know if the appended rows conflict with inserted rows. + if self_inserted_rows_filter.is_some() { + return Err(self.retryable_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + Ok(()) + } Operation::Rewrite { groups, .. } => { if groups .iter() @@ -1797,6 +1864,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }; let transaction = Transaction::new_from_version(1, operation); let other_operations = [ @@ -1808,6 +1876,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, Operation::Delete { deleted_fragment_ids: vec![3], @@ -1822,6 +1891,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ]; let other_transactions = other_operations.map(|op| Transaction::new_from_version(2, op)); @@ -1923,6 +1993,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, Operation::Delete { updated_fragments: vec![apply_deletion(&[1], &mut fragment, &dataset).await], @@ -1937,6 +2008,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ]; let transactions = @@ -2058,6 +2130,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ), ( @@ -2070,6 +2143,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ), ( @@ -2228,6 +2302,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, create_update_config_for_test( Some(HashMap::from_iter(vec![( @@ -2423,6 +2498,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, [ Compatible, // append @@ -2845,6 +2921,7 @@ mod tests { mem_wal_to_merge: None, fields_for_preserving_frag_bitmap: vec![], update_mode: None, + inserted_rows_filter: None, }, ]; @@ -3149,7 +3226,7 @@ mod tests { "{}: expected NotCompatible but got {:?}", description, result - ); + ) } Retryable => { assert!( From 1b5ac199c093c644f61acc3e17a86a10ce692c01 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 6 Jan 2026 22:47:37 -0800 Subject: [PATCH 2/2] further cleanup --- .../src/scalar/bloomfilter/sbbf.rs | 57 ++-- rust/lance/src/dataset/transaction.rs | 4 +- rust/lance/src/dataset/write/merge_insert.rs | 181 ++++-------- .../write/merge_insert/inserted_rows.rs | 259 +++++++----------- 4 files changed, 165 insertions(+), 336 deletions(-) diff --git a/rust/lance-index/src/scalar/bloomfilter/sbbf.rs b/rust/lance-index/src/scalar/bloomfilter/sbbf.rs index 1ffce69c3c8..7f506f796f3 100644 --- a/rust/lance-index/src/scalar/bloomfilter/sbbf.rs +++ b/rust/lance-index/src/scalar/bloomfilter/sbbf.rs @@ -386,19 +386,31 @@ impl Sbbf { /// Returns an error if the bitmaps have different sizes, as bloom filters with /// different configurations cannot be reliably compared. pub fn might_intersect_bytes(&self, other_bytes: &[u8]) -> Result { - let self_bytes = self.to_bytes(); - if self_bytes.len() != other_bytes.len() { + Self::bytes_might_intersect(&self.to_bytes(), other_bytes) + } + + /// Check if two raw bloom filter bitmaps might intersect. + /// Returns true if there's at least one bit position where both filters have a 1. + /// + /// This is a fast probabilistic check: if it returns false, the filters definitely + /// have no common elements. If it returns true, they might have common elements + /// (with possible false positives). + /// + /// Returns an error if the bitmaps have different sizes, as bloom filters with + /// different configurations cannot be reliably compared. + pub fn bytes_might_intersect(a: &[u8], b: &[u8]) -> Result { + if a.len() != b.len() { return Err(SbbfError::InvalidData { message: format!( "Cannot compare bloom filters with different sizes: {} bytes vs {} bytes. \ Both filters must use the same configuration.", - self_bytes.len(), - other_bytes.len() + a.len(), + b.len() ), }); } - for i in 0..self_bytes.len() { - if (self_bytes[i] & other_bytes[i]) != 0 { + for i in 0..a.len() { + if (a[i] & b[i]) != 0 { return Ok(true); } } @@ -470,39 +482,6 @@ impl Default for SbbfBuilder { } } -// ============================================================================ -// Bloom Filter helper functions for raw bitmap operations -// ============================================================================ - -/// Check if two serialized bloom filter bitmaps might contain overlapping elements. -/// Returns true if there's at least one bit position where both filters have a 1, -/// indicating a potential intersection of the sets they represent. -/// -/// This is a fast probabilistic check: if it returns false, the filters definitely -/// have no common elements. If it returns true, they might have common elements -/// (with possible false positives). -/// -/// Returns an error if the bitmaps have different sizes, as bloom filters with -/// different configurations cannot be reliably compared. -pub fn bloom_filters_might_overlap(a: &[u8], b: &[u8]) -> Result { - if a.len() != b.len() { - return Err(SbbfError::InvalidData { - message: format!( - "Cannot compare bloom filters with different sizes: {} bytes vs {} bytes. \ - Both filters must use the same configuration.", - a.len(), - b.len() - ), - }); - } - for i in 0..a.len() { - if (a[i] & b[i]) != 0 { - return Ok(true); - } - } - Ok(false) -} - #[cfg(test)] mod tests { use super::*; diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 21fea6c9e4c..f954115ada3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -2916,7 +2916,7 @@ impl TryFrom for Transaction { _ => Some(UpdateMode::RewriteRows), }, inserted_rows_filter: inserted_rows - .map(|ik| KeyExistenceFilter::from_pb(&ik)) + .map(|ik| KeyExistenceFilter::try_from(&ik)) .transpose()?, }, Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => { @@ -3246,7 +3246,7 @@ impl From<&Transaction> for pb::Transaction { UpdateMode::RewriteColumns => 1, }) .unwrap_or(0), - inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.to_pb()), + inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()), }), Operation::Project { schema } => { pb::transaction::Operation::Project(pb::transaction::Project { diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index d5406afff51..12df92f534f 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1639,7 +1639,6 @@ impl MergeInsertJob { .map(|f| f.id as u32) .collect(), update_mode: Some(RewriteRows), - // Slow path doesn't track inserted keys - conflict detection not available inserted_rows_filter: None, // not implemented for v1 }; @@ -4357,6 +4356,9 @@ mod tests { } } + /// Test that two merge insert operations on the same existing key conflict. + /// First merge insert commits successfully, second one fails with conflict error + /// because both operations updated the same key (detected via bloom filter). #[tokio::test] async fn test_inserted_rows_filter_bloom_conflict_detection_concurrent() { // Create schema with unenforced primary key on "id" column @@ -4380,24 +4382,7 @@ mod tests { ) .unwrap(); - // Throttle to increase contention - let throttled = Arc::new(ThrottledStoreWrapper { - config: ThrottleConfig { - wait_put_per_call: Duration::from_millis(5), - wait_get_per_call: Duration::from_millis(5), - wait_list_per_call: Duration::from_millis(5), - ..Default::default() - }, - }); - let dataset = InsertBuilder::new("memory://") - .with_params(&WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(throttled.clone()), - ..Default::default() - }), - ..Default::default() - }) .execute(vec![initial]) .await .unwrap(); @@ -4416,67 +4401,52 @@ mod tests { schema.clone(), vec![ Arc::new(UInt32Array::from(vec![2])), - Arc::new(UInt32Array::from(vec![1])), + Arc::new(UInt32Array::from(vec![2])), ], ) .unwrap(); - let s1 = RecordBatchStreamAdapter::new( - schema.clone(), - futures::stream::iter(vec![Ok(batch1.clone())]), - ); - let s2 = RecordBatchStreamAdapter::new( - schema.clone(), - futures::stream::iter(vec![Ok(batch2.clone())]), - ); - - let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + // Create second merge insert job based on version 1 with 0 retries + let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) .unwrap() .when_matched(WhenMatched::UpdateAll) .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) .try_build() .unwrap(); - let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + + // First merge insert commits (creates version 2) + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) .unwrap() .when_matched(WhenMatched::UpdateAll) .when_not_matched(WhenNotMatched::InsertAll) .try_build() .unwrap(); + let result1 = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + assert!(result1.is_ok(), "First merge insert should succeed"); - let t1 = tokio::spawn(async move { - b1.execute(Box::pin(s1) as SendableRecordBatchStream) - .await - .unwrap() - .1 - }); - let t2 = tokio::spawn(async move { - b2.execute(Box::pin(s2) as SendableRecordBatchStream) - .await - .unwrap() - .1 - }); + // Second merge insert tries to commit based on version 1, needs to rebase against version 2 + let s2 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch2.clone())]), + ); + let result2 = b2.execute(Box::pin(s2) as SendableRecordBatchStream).await; - let s1 = t1.await.unwrap(); - let s2 = t2.await.unwrap(); - // At least one attempt should include a retry under contention - assert!(s1.num_attempts >= 1); - assert!(s2.num_attempts >= 1); - - // Validate final dataset has id=2 updated to 1, without duplicates - let mut ds_latest = dataset.as_ref().clone(); - ds_latest.checkout_latest().await.unwrap(); - let batch = ds_latest.scan().try_into_batch().await.unwrap(); - let ids = batch["id"].as_primitive::().values(); - let vals = batch["value"].as_primitive::().values(); - // find index of id==2 - let pos = ids.iter().position(|&x| x == 2).unwrap(); - assert_eq!(vals[pos], 1); + // Second merge insert should fail because bloom filters show both updated key 2 + assert!( + matches!(result2, Err(crate::Error::TooMuchWriteContention { .. })), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + result2 + ); } - /// Test concurrent INSERT of the same NEW key (key doesn't exist in initial data) - /// Both transactions try to INSERT a row with id=100, which doesn't exist. - /// This tests the bloom filter conflict detection for INSERT operations, - /// as opposed to UPDATE operations on existing keys. + /// Test that two merge insert operations inserting the same NEW key conflict. + /// First merge insert commits successfully (inserts id=100), second one fails + /// with conflict error because both inserted the same new key (detected via bloom filter). #[tokio::test] async fn test_concurrent_insert_same_new_key() { // Create schema with unenforced primary key on "id" column @@ -4501,24 +4471,7 @@ mod tests { ) .unwrap(); - // Throttle to increase contention - let throttled = Arc::new(ThrottledStoreWrapper { - config: ThrottleConfig { - wait_put_per_call: Duration::from_millis(5), - wait_get_per_call: Duration::from_millis(5), - wait_list_per_call: Duration::from_millis(5), - ..Default::default() - }, - }); - let dataset = InsertBuilder::new("memory://") - .with_params(&WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(throttled.clone()), - ..Default::default() - }), - ..Default::default() - }) .execute(vec![initial]) .await .unwrap(); @@ -4542,69 +4495,41 @@ mod tests { ) .unwrap(); - let s1 = RecordBatchStreamAdapter::new( - schema.clone(), - futures::stream::iter(vec![Ok(batch1.clone())]), - ); - let s2 = RecordBatchStreamAdapter::new( - schema.clone(), - futures::stream::iter(vec![Ok(batch2.clone())]), - ); - - // Both use InsertAll for when_not_matched, so both will try to INSERT id=100 - let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + // Create second merge insert job based on version 1 with 0 retries + let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) .unwrap() .when_matched(WhenMatched::UpdateAll) .when_not_matched(WhenNotMatched::InsertAll) + .conflict_retries(0) .try_build() .unwrap(); - let b2 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) + + // First merge insert commits (creates version 2, inserts id=100) + let s1 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch1.clone())]), + ); + let b1 = MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) .unwrap() .when_matched(WhenMatched::UpdateAll) .when_not_matched(WhenNotMatched::InsertAll) .try_build() .unwrap(); + let result1 = b1.execute(Box::pin(s1) as SendableRecordBatchStream).await; + assert!(result1.is_ok(), "First merge insert should succeed"); - let t1 = tokio::spawn(async move { - b1.execute(Box::pin(s1) as SendableRecordBatchStream) - .await - .unwrap() - .1 - }); - let t2 = tokio::spawn(async move { - b2.execute(Box::pin(s2) as SendableRecordBatchStream) - .await - .unwrap() - .1 - }); - - let s1 = t1.await.unwrap(); - let s2 = t2.await.unwrap(); - // At least one attempt should include a retry under contention - // because both transactions tried to insert the same new key - assert!(s1.num_attempts >= 1); - assert!(s2.num_attempts >= 1); - - // Validate final dataset has exactly one row with id=100 - let mut ds_latest = dataset.as_ref().clone(); - ds_latest.checkout_latest().await.unwrap(); - let batch = ds_latest.scan().try_into_batch().await.unwrap(); - let ids = batch["id"].as_primitive::().values(); - - // Count occurrences of id=100 - should be exactly 1 (no duplicates) - let count_100 = ids.iter().filter(|&&x| x == 100).count(); - assert_eq!( - count_100, 1, - "Expected exactly one row with id=100, but found {}", - count_100 + // Second merge insert tries to commit based on version 1, needs to rebase against version 2 + let s2 = RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(vec![Ok(batch2.clone())]), ); + let result2 = b2.execute(Box::pin(s2) as SendableRecordBatchStream).await; - // Should have 5 total rows: original 4 + 1 new row with id=100 - assert_eq!( - batch.num_rows(), - 5, - "Expected 5 rows, but found {}", - batch.num_rows() + // Second merge insert should fail because bloom filters show both inserted key 100 + assert!( + matches!(result2, Err(crate::Error::TooMuchWriteContention { .. })), + "Expected TooMuchWriteContention (retryable conflict exhausted), got: {:?}", + result2 ); } diff --git a/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs b/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs index dc4cec8fb74..db81fd6f831 100644 --- a/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs +++ b/rust/lance/src/dataset/write/merge_insert/inserted_rows.rs @@ -2,10 +2,6 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors //! Key existence tracking for merge insert conflict detection. -//! -//! This module provides data structures for tracking keys of newly inserted rows -//! during merge insert operations. This is used for detecting conflicts when -//! concurrent transactions attempt to insert rows with the same key. use std::collections::hash_map::DefaultHasher; use std::collections::HashSet; @@ -16,38 +12,15 @@ use arrow_array::{BinaryArray, LargeBinaryArray, LargeStringArray, RecordBatch, use arrow_schema::DataType; use deepsize::DeepSizeOf; use lance_core::Result; -use lance_index::scalar::bloomfilter::sbbf::{bloom_filters_might_overlap, Sbbf, SbbfBuilder}; +use lance_index::scalar::bloomfilter::sbbf::{Sbbf, SbbfBuilder}; use lance_table::format::pb; use snafu::location; -// ============================================================================ -// Constants for Bloom Filter configuration -// ============================================================================ - -// IMPORTANT: All bloom filters for conflict detection MUST use the same size. -// Different sized filters cannot be correctly compared for intersection. -// These values are fixed to ensure all filters have identical dimensions. -// -// These values match the defaults in lance-index bloomfilter.rs: -// NumberOfItems: 8192 + Probability: 0.00057(1 in 1754) -> NumberOfBytes: 16384(16KiB) +// Default bloom filter config: 8192 items @ 0.00057 fpp -> 16KiB filter pub const BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS: u64 = 8192; pub const BLOOM_FILTER_DEFAULT_PROBABILITY: f64 = 0.00057; -/// Create a BloomFilter protobuf message with default configuration -fn create_bloom_filter_pb(bitmap: Vec, num_bits: u32) -> pb::transaction::BloomFilter { - pb::transaction::BloomFilter { - bitmap, - num_bits, - number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, - probability: BLOOM_FILTER_DEFAULT_PROBABILITY, - } -} - -// ============================================================================ -// Key types for conflict detection -// ============================================================================ - -/// Key value that can be used in conflict detection for inserted rows +/// Key value for conflict detection. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum KeyValue { String(String), @@ -58,7 +31,6 @@ pub enum KeyValue { } impl KeyValue { - /// Convert the key value to bytes for hashing pub fn to_bytes(&self) -> Vec { match self { Self::String(s) => s.as_bytes().to_vec(), @@ -69,14 +41,13 @@ impl KeyValue { let mut result = Vec::new(); for value in values { result.extend_from_slice(&value.to_bytes()); - result.push(0); // separator + result.push(0); } result } } } - /// Get a hash of the key value pub fn hash_value(&self) -> u64 { let mut hasher = DefaultHasher::new(); self.to_bytes().hash(&mut hasher); @@ -84,29 +55,21 @@ impl KeyValue { } } -// ============================================================================ -// Bloom Filter Builder for tracking inserted rows -// ============================================================================ - -/// Builder for KeyExistenceFilter using a Split Block Bloom Filter (SBBF). -/// Used to track keys of inserted rows for conflict detection. +/// Builder for KeyExistenceFilter using Split Block Bloom Filter. #[derive(Debug, Clone)] pub struct KeyExistenceFilterBuilder { sbbf: Sbbf, - /// Field IDs of columns that form the key (must match unenforced primary key) field_ids: Vec, - /// Number of items inserted (for len()) item_count: usize, } impl KeyExistenceFilterBuilder { - /// Create a new KeyExistenceFilterBuilder using SBBF with default parameters. pub fn new(field_ids: Vec) -> Self { let sbbf = SbbfBuilder::new() .expected_items(BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS) .false_positive_probability(BLOOM_FILTER_DEFAULT_PROBABILITY) .build() - .expect("Failed to build SBBF for KeyExistenceFilterBuilder"); + .expect("Failed to build SBBF"); Self { sbbf, field_ids, @@ -114,76 +77,44 @@ impl KeyExistenceFilterBuilder { } } - /// Add a key to the filter pub fn insert(&mut self, key: KeyValue) -> Result<()> { - let bytes = key.to_bytes(); - self.sbbf.insert(&bytes[..]); + self.sbbf.insert(&key.to_bytes()[..]); self.item_count += 1; Ok(()) } - /// Check if a key might be present pub fn contains(&self, key: &KeyValue) -> bool { - let bytes = key.to_bytes(); - self.sbbf.check(&bytes[..]) + self.sbbf.check(&key.to_bytes()[..]) } - /// Check if this filter might intersect with another filter. - /// This is probabilistic - may return true for non-overlapping sets (false positive). - /// - /// Returns an error if the filters have different sizes/configurations. pub fn might_intersect(&self, other: &Self) -> Result { self.sbbf .might_intersect(&other.sbbf) .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!())) } - /// Get the key field IDs pub fn field_ids(&self) -> &[i32] { &self.field_ids } - /// Get the estimated size in bytes pub fn estimated_size_bytes(&self) -> usize { self.sbbf.size_bytes() } - /// Convert to typed protobuf KeyExistenceFilter (Bloom variant) - pub fn to_pb(&self) -> pb::transaction::KeyExistenceFilter { - let bitmap = self.sbbf.to_bytes(); - let num_bits = (self.sbbf.size_bytes() as u32) * 8; - pb::transaction::KeyExistenceFilter { - field_ids: self.field_ids.clone(), - data: Some(pb::transaction::key_existence_filter::Data::Bloom( - create_bloom_filter_pb(bitmap, num_bits), - )), - } - } - - /// Get the number of items pub fn len(&self) -> usize { self.item_count } - /// Check if empty pub fn is_empty(&self) -> bool { self.item_count == 0 } - /// Check if this filter might produce false positives (Bloom filters are probabilistic) - pub fn might_have_false_positives(&self) -> bool { - true - } - - /// Build a KeyExistenceFilter from this builder pub fn build(&self) -> KeyExistenceFilter { - let bitmap = self.sbbf.to_bytes(); - let num_bits = (self.sbbf.size_bytes() as u32) * 8; KeyExistenceFilter { field_ids: self.field_ids.clone(), filter: FilterType::Bloom { - bitmap, - num_bits, + bitmap: self.sbbf.to_bytes(), + num_bits: (self.sbbf.size_bytes() as u32) * 8, number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, probability: BLOOM_FILTER_DEFAULT_PROBABILITY, }, @@ -191,28 +122,36 @@ impl KeyExistenceFilterBuilder { } } -// ============================================================================ -// KeyExistenceFilter metadata for conflict detection -// ============================================================================ +impl From<&KeyExistenceFilterBuilder> for pb::transaction::KeyExistenceFilter { + fn from(builder: &KeyExistenceFilterBuilder) -> Self { + Self { + field_ids: builder.field_ids.clone(), + data: Some(pb::transaction::key_existence_filter::Data::Bloom( + pb::transaction::BloomFilter { + bitmap: builder.sbbf.to_bytes(), + num_bits: (builder.sbbf.size_bytes() as u32) * 8, + number_of_items: BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS, + probability: BLOOM_FILTER_DEFAULT_PROBABILITY, + }, + )), + } + } +} -/// Type of filter used for storing key existence data +/// Filter type for key existence data. #[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub enum FilterType { ExactSet(HashSet), Bloom { bitmap: Vec, num_bits: u32, - /// Number of items the filter was sized for number_of_items: u64, - /// False positive probability the filter was sized for probability: f64, }, } -/// Metadata about keys of newly inserted rows, used for conflict detection. -/// Only created when the merge insert's ON columns match the schema's unenforced primary key. -/// The presence of this filter indicates strict primary key conflict detection should be used. -/// Only tracks keys from INSERT operations, not UPDATE operations. +/// Tracks keys of inserted rows for conflict detection. +/// Only created when ON columns match the schema's unenforced primary key. #[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub struct KeyExistenceFilter { pub field_ids: Vec, @@ -220,15 +159,58 @@ pub struct KeyExistenceFilter { } impl KeyExistenceFilter { - /// Create KeyExistenceFilter from a bloom filter builder pub fn from_bloom_filter(bloom: &KeyExistenceFilterBuilder) -> Self { bloom.build() } - pub fn to_pb(&self) -> pb::transaction::KeyExistenceFilter { - match &self.filter { - FilterType::ExactSet(hashes) => pb::transaction::KeyExistenceFilter { - field_ids: self.field_ids.clone(), + /// Check if two filters intersect. Returns (has_intersection, might_be_false_positive). + /// Errors if bloom filter configs don't match. + pub fn intersects(&self, other: &Self) -> Result<(bool, bool)> { + match (&self.filter, &other.filter) { + (FilterType::ExactSet(a), FilterType::ExactSet(b)) => { + Ok((a.iter().any(|h| b.contains(h)), false)) + } + (FilterType::ExactSet(_), FilterType::Bloom { .. }) + | (FilterType::Bloom { .. }, FilterType::ExactSet(_)) => { + // Can't compare different hash schemes, assume intersection + Ok((true, true)) + } + ( + FilterType::Bloom { + bitmap: a_bits, + number_of_items: a_num_items, + probability: a_prob, + .. + }, + FilterType::Bloom { + bitmap: b_bits, + number_of_items: b_num_items, + probability: b_prob, + .. + }, + ) => { + if a_num_items != b_num_items || (a_prob - b_prob).abs() > f64::EPSILON { + return Err(lance_core::Error::invalid_input( + format!( + "Bloom filter config mismatch: ({}, {}) vs ({}, {})", + a_num_items, a_prob, b_num_items, b_prob + ), + location!(), + )); + } + let has = Sbbf::bytes_might_intersect(a_bits, b_bits) + .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!()))?; + Ok((has, has)) + } + } + } +} + +impl From<&KeyExistenceFilter> for pb::transaction::KeyExistenceFilter { + fn from(filter: &KeyExistenceFilter) -> Self { + match &filter.filter { + FilterType::ExactSet(hashes) => Self { + field_ids: filter.field_ids.clone(), data: Some(pb::transaction::key_existence_filter::Data::Exact( pb::transaction::ExactKeySetFilter { key_hashes: hashes.iter().copied().collect(), @@ -240,8 +222,8 @@ impl KeyExistenceFilter { num_bits, number_of_items, probability, - } => pb::transaction::KeyExistenceFilter { - field_ids: self.field_ids.clone(), + } => Self { + field_ids: filter.field_ids.clone(), data: Some(pb::transaction::key_existence_filter::Data::Bloom( pb::transaction::BloomFilter { bitmap: bitmap.clone(), @@ -253,16 +235,18 @@ impl KeyExistenceFilter { }, } } +} + +impl TryFrom<&pb::transaction::KeyExistenceFilter> for KeyExistenceFilter { + type Error = lance_core::Error; - pub fn from_pb(message: &pb::transaction::KeyExistenceFilter) -> Result { - let field_ids = message.field_ids.clone(); + fn try_from(message: &pb::transaction::KeyExistenceFilter) -> Result { let filter = match message.data.as_ref() { Some(pb::transaction::key_existence_filter::Data::Exact(exact)) => { FilterType::ExactSet(exact.key_hashes.iter().copied().collect()) } Some(pb::transaction::key_existence_filter::Data::Bloom(b)) => { - // Use defaults for backwards compatibility with older filters - // that don't have number_of_items/probability set (they will be 0) + // Use defaults for backwards compatibility let number_of_items = if b.number_of_items == 0 { BLOOM_FILTER_DEFAULT_NUMBER_OF_ITEMS } else { @@ -280,75 +264,16 @@ impl KeyExistenceFilter { probability, } } - None => { - // Treat missing data as empty exact set - FilterType::ExactSet(HashSet::new()) - } + None => FilterType::ExactSet(HashSet::new()), }; - Ok(Self { field_ids, filter }) - } - - /// Determine if 2 filters intersect, and whether it might be a false positive. - /// - /// Returns `Err` if the bloom filter configs don't match (different expected_items or fpp), - /// since bloom filters with different sizes cannot be reliably compared. - /// - /// Returns `Ok((has_intersection, might_be_false_positive))` on success. - pub fn intersects(&self, other: &Self) -> Result<(bool, bool)> { - match (&self.filter, &other.filter) { - (FilterType::ExactSet(a), FilterType::ExactSet(b)) => { - let has = a.iter().any(|h| b.contains(h)); - Ok((has, false)) - } - (FilterType::ExactSet(_), FilterType::Bloom { .. }) - | (FilterType::Bloom { .. }, FilterType::ExactSet(_)) => { - // ExactSet stores hashes from an unknown scheme, while Bloom uses XxHash64. - // Since we can't reliably compare them, we conservatively assume - // there might be an intersection to avoid missing conflicts. - Ok((true, true)) - } - ( - FilterType::Bloom { - bitmap: a_bits, - number_of_items: a_num_items, - probability: a_prob, - .. - }, - FilterType::Bloom { - bitmap: b_bits, - number_of_items: b_num_items, - probability: b_prob, - .. - }, - ) => { - // Bloom filters with different configurations cannot be reliably compared - // because they have different sizes and bit patterns - if a_num_items != b_num_items || (a_prob - b_prob).abs() > f64::EPSILON { - return Err(lance_core::Error::invalid_input( - format!( - "Cannot compare bloom filters with different configurations: \ - self(number_of_items={}, probability={}) vs other(number_of_items={}, probability={}). \ - Both filters must use the same parameters for reliable intersection checking.", - a_num_items, a_prob, b_num_items, b_prob - ), - location!(), - )); - } - // Since configs are validated above, this should not fail - let has = bloom_filters_might_overlap(a_bits, b_bits) - .map_err(|e| lance_core::Error::invalid_input(e.to_string(), location!()))?; - Ok((has, has)) - } - } + Ok(Self { + field_ids: message.field_ids.clone(), + filter, + }) } } -// ============================================================================ -// Utility functions for extracting key values from batches -// ============================================================================ - -/// Extract key value from a batch row for conflict detection bloom filter. -/// Returns None if any ON column is null or has an unsupported type. +/// Extract key value from a batch row. Returns None if null or unsupported type. pub fn extract_key_value_from_batch( batch: &RecordBatch, row_idx: usize, @@ -361,7 +286,7 @@ pub fn extract_key_value_from_batch( let column = batch.column(col_idx); if column.is_null(row_idx) { - return None; // Skip rows with null key values + return None; } let key_part = match column.data_type() { @@ -397,7 +322,7 @@ pub fn extract_key_value_from_batch( let arr = column.as_any().downcast_ref::()?; KeyValue::Binary(arr.value(row_idx).to_vec()) } - _ => return None, // Unsupported type + _ => return None, }; parts.push(key_part); }