From 1ace27637cae837046a04ac60d97267fc21c262f Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 10:56:16 -0800 Subject: [PATCH 1/8] fix: make overwrites retryable and creates incompatible --- rust/lance/src/io/commit/conflict_resolver.rs | 164 +++++++++++++++++- 1 file changed, 159 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 703afbb17e6..864043c025a 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -889,8 +889,35 @@ impl<'a> TransactionRebase<'a> { other_version: u64, ) -> Result<()> { match &other_transaction.operation { - // Overwrite only conflicts with another operation modifying the same update config - Operation::Overwrite { .. } | Operation::UpdateConfig { .. } => { + Operation::Overwrite { .. } => { + // Two concurrent creates (read_version == 0) are incompatible - + // only one can create the dataset, the other must fail. + if self.transaction.read_version == 0 && other_transaction.read_version == 0 { + return Err(self.incompatible_conflict_err( + other_transaction, + other_version, + location!(), + )); + } + + // Upsert key conflicts are incompatible + if self + .transaction + .operation + .upsert_key_conflict(&other_transaction.operation) + { + Err(self.incompatible_conflict_err( + other_transaction, + other_version, + location!(), + )) + } else { + // Two concurrent overwrites (on existing dataset) are retryable + // so user can decide if their overwrite should still proceed + Err(self.retryable_conflict_err(other_transaction, other_version, location!())) + } + } + Operation::UpdateConfig { .. } => { if self .transaction .operation @@ -2412,9 +2439,22 @@ mod tests { config_upsert_values: None, initial_bases: None, }, - // No conflicts: overwrite can always happen since it doesn't - // depend on previous state of the table. - [Compatible; 9], + // Note: In this test, both transactions have read_version == 0, + // so both overwrites are "creates". Two concurrent creates are + // incompatible (only one can create the dataset). + // For overwrites on existing datasets (read_version > 0), the + // conflict would be Retryable instead. + [ + Compatible, // append + Compatible, // create index + Compatible, // delete + Compatible, // merge + NotCompatible, // overwrite (both are creates with read_version == 0) + Compatible, // rewrite + Compatible, // reserve + Compatible, // update + Compatible, // update config + ], ), ( Operation::CreateIndex { @@ -3547,4 +3587,118 @@ mod tests { assert_eq!(rebase.conflicting_mem_wal_merged_gens[0].region_id, region); assert_eq!(rebase.conflicting_mem_wal_merged_gens[0].generation, 10); } + + #[tokio::test] + async fn test_concurrent_overwrites_retryable() { + use crate::dataset::write::{WriteMode, WriteParams}; + use crate::dataset::{CommitBuilder, InsertBuilder}; + + // Create initial dataset at version 1 + let dataset = test_dataset(5, 1).await; + + // Simulate two concurrent readers at version 1 + let dataset_v1_reader1 = Arc::new(dataset.checkout_version(1).await.unwrap()); + let dataset_v1_reader2 = Arc::new(dataset.checkout_version(1).await.unwrap()); + + // Prepare data for overwrite + let data = RecordBatch::try_new( + Arc::new(arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("a", arrow_schema::DataType::Int32, false), + arrow_schema::Field::new("b", arrow_schema::DataType::Int32, true), + ])), + vec![ + Arc::new(Int32Array::from_iter_values(10..15)), + Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(1, 5))), + ], + ) + .unwrap(); + + // First overwrite: write uncommitted transaction + let txn1 = InsertBuilder::new(dataset_v1_reader1.clone()) + .with_params(&WriteParams { + mode: WriteMode::Overwrite, + ..Default::default() + }) + .execute_uncommitted(vec![data.clone()]) + .await + .unwrap(); + + // Commit first overwrite - should succeed + let dataset_v2 = CommitBuilder::new(dataset_v1_reader1) + .execute(txn1) + .await + .unwrap(); + assert_eq!(dataset_v2.manifest.version, 2); + + // Second overwrite: write uncommitted transaction + let txn2 = InsertBuilder::new(dataset_v1_reader2.clone()) + .with_params(&WriteParams { + mode: WriteMode::Overwrite, + ..Default::default() + }) + .execute_uncommitted(vec![data]) + .await + .unwrap(); + + // Commit second overwrite - should fail with retryable conflict + let result = CommitBuilder::new(dataset_v1_reader2).execute(txn2).await; + + assert!( + matches!(result, Err(Error::RetryableCommitConflict { .. })), + "Expected RetryableCommitConflict but got: {:?}", + result + ); + + // Verify the first overwrite succeeded and dataset is at version 2 + assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5); + } + + #[test] + fn test_concurrent_create_incompatible() { + // Two concurrent creates (read_version == 0) should be incompatible. + // Only one can create the dataset, the other must fail without retry option. + + let fragment0 = Fragment::new(0); + + // First create transaction (read_version == 0) + let create_txn1 = Transaction::new( + 0, // read_version == 0 means this is a create + Operation::Overwrite { + fragments: vec![fragment0.clone()], + schema: lance_core::datatypes::Schema::default(), + config_upsert_values: None, + initial_bases: None, + }, + None, + ); + + // Second create transaction (also read_version == 0) + let create_txn2 = Transaction::new( + 0, // read_version == 0 means this is also a create + Operation::Overwrite { + fragments: vec![fragment0], + schema: lance_core::datatypes::Schema::default(), + config_upsert_values: None, + initial_bases: None, + }, + None, + ); + + let mut rebase = TransactionRebase { + transaction: create_txn1, + initial_fragments: HashMap::new(), + modified_fragment_ids: HashSet::new(), + affected_rows: None, + conflicting_frag_reuse_indices: Vec::new(), + conflicting_mem_wal_merged_gens: Vec::new(), + }; + + // Check conflict: two creates should be incompatible (not retryable) + let result = rebase.check_txn(&create_txn2, 1); + assert!( + matches!(result, Err(Error::IncompatibleTransaction { .. })), + "Expected IncompatibleTransaction for concurrent creates, got: {:?}", + result + ); + } } From 84c5280492bf849cfab57f5db93bf96efb21fcdc Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 11:31:15 -0800 Subject: [PATCH 2/8] commit --- rust/lance/src/io/commit.rs | 14 +++ rust/lance/src/io/commit/conflict_resolver.rs | 99 ++++++++++++------- 2 files changed, 77 insertions(+), 36 deletions(-) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 922060b2f73..cc904a8926c 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -829,6 +829,20 @@ pub(crate) async fn commit_transaction( if !strict_overwrite { (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?; + // For create operations (read_version == 0), if the dataset already exists, + // this is an incompatible conflict. Only one process can create the dataset. + if read_version == 0 + && matches!(transaction.operation, Operation::Overwrite { .. }) + && dataset.manifest.version > 0 + { + return Err(crate::Error::IncompatibleTransaction { + source: "This create operation conflicts with a concurrent create. \ + The dataset already exists at a newer version." + .into(), + location: location!(), + }); + } + // See if we can retry the commit. Try to account for all // transactions that have been committed since the read_version. // Use small amount of backoff to handle transactions that all diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 864043c025a..efbf6ab8216 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -3653,52 +3653,79 @@ mod tests { assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5); } - #[test] - fn test_concurrent_create_incompatible() { - // Two concurrent creates (read_version == 0) should be incompatible. + #[tokio::test] + async fn test_concurrent_create_incompatible() { + use crate::dataset::write::{WriteMode, WriteParams}; + use crate::dataset::{CommitBuilder, InsertBuilder}; + use crate::session::Session; + + // Two concurrent creates should be incompatible. // Only one can create the dataset, the other must fail without retry option. - let fragment0 = Fragment::new(0); + // Use shared session so both commits see the same memory store + let session = std::sync::Arc::new(Session::default()); + let uri = "memory://test_concurrent_create"; - // First create transaction (read_version == 0) - let create_txn1 = Transaction::new( - 0, // read_version == 0 means this is a create - Operation::Overwrite { - fragments: vec![fragment0.clone()], - schema: lance_core::datatypes::Schema::default(), - config_upsert_values: None, - initial_bases: None, - }, - None, - ); + // Prepare data for creates + let data = RecordBatch::try_new( + Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "a", + arrow_schema::DataType::Int32, + false, + )])), + vec![Arc::new(Int32Array::from_iter_values(0..5))], + ) + .unwrap(); - // Second create transaction (also read_version == 0) - let create_txn2 = Transaction::new( - 0, // read_version == 0 means this is also a create - Operation::Overwrite { - fragments: vec![fragment0], - schema: lance_core::datatypes::Schema::default(), - config_upsert_values: None, - initial_bases: None, - }, - None, - ); + // First create: write uncommitted transaction + let txn1 = InsertBuilder::new(uri) + .with_params(&WriteParams { + mode: WriteMode::Create, + session: Some(session.clone()), + ..Default::default() + }) + .execute_uncommitted(vec![data.clone()]) + .await + .unwrap(); - let mut rebase = TransactionRebase { - transaction: create_txn1, - initial_fragments: HashMap::new(), - modified_fragment_ids: HashSet::new(), - affected_rows: None, - conflicting_frag_reuse_indices: Vec::new(), - conflicting_mem_wal_merged_gens: Vec::new(), - }; + // Second create: write uncommitted transaction (same URI, no dataset exists yet) + let txn2 = InsertBuilder::new(uri) + .with_params(&WriteParams { + mode: WriteMode::Create, + session: Some(session.clone()), + ..Default::default() + }) + .execute_uncommitted(vec![data]) + .await + .unwrap(); + + // Both transactions should have read_version == 0 (creates) + assert_eq!(txn1.read_version, 0); + assert_eq!(txn2.read_version, 0); + + // Commit first create - should succeed + let dataset_v1 = CommitBuilder::new(uri) + .with_session(session.clone()) + .execute(txn1) + .await + .unwrap(); + assert_eq!(dataset_v1.manifest.version, 1); + + // Commit second create - should fail with IncompatibleTransaction + // (DatasetAlreadyExists would only occur in true concurrent scenarios where the + // pre-check race condition causes both to call commit_new_dataset) + let result = CommitBuilder::new(uri) + .with_session(session.clone()) + .execute(txn2) + .await; - // Check conflict: two creates should be incompatible (not retryable) - let result = rebase.check_txn(&create_txn2, 1); assert!( matches!(result, Err(Error::IncompatibleTransaction { .. })), "Expected IncompatibleTransaction for concurrent creates, got: {:?}", result ); + + // Verify only the first create succeeded + assert_eq!(dataset_v1.count_rows(None).await.unwrap(), 5); } } From a4a081c8f955f8387c2a73237dab7871b9bde12e Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 11:42:34 -0800 Subject: [PATCH 3/8] commit --- rust/lance/src/io/commit.rs | 108 ++++++++---------- rust/lance/src/io/commit/conflict_resolver.rs | 54 ++------- 2 files changed, 58 insertions(+), 104 deletions(-) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index cc904a8926c..21d06b8f4ea 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1314,74 +1314,58 @@ mod tests { #[tokio::test] async fn test_concurrent_writes() { - for write_mode in [WriteMode::Append, WriteMode::Overwrite] { - // Create an empty table - let test_dir = TempStrDir::default(); - let test_uri = test_dir.as_str(); - - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "i", - DataType::Int32, - false, - )])); - - let dataset = Dataset::write( - RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()), - test_uri, - None, - ) - .await - .unwrap(); - - // Make some sample data - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - ) - .unwrap(); + // Test concurrent appends - all should succeed + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); - // Write data concurrently in 5 tasks - let futures: Vec<_> = (0..5) - .map(|_| { - let batch = batch.clone(); - let schema = schema.clone(); - let uri = test_uri.to_string(); - tokio::spawn(async move { - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); - Dataset::write( - reader, - &uri, - Some(WriteParams { - mode: write_mode, - ..Default::default() - }), - ) - .await - }) - }) - .collect(); - let results = join_all(futures).await; + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); - // Assert all succeeded - for result in results { - assert!(matches!(result, Ok(Ok(_))), "{:?}", result); - } + let dataset = Dataset::write( + RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()), + test_uri, + None, + ) + .await + .unwrap(); - // Assert final fragments and versions expected - let dataset = dataset.checkout_version(6).await.unwrap(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); - match write_mode { - WriteMode::Append => { - assert_eq!(dataset.get_fragments().len(), 5); - } - WriteMode::Overwrite => { - assert_eq!(dataset.get_fragments().len(), 1); - } - _ => unreachable!(), - } + let futures: Vec<_> = (0..5) + .map(|_| { + let batch = batch.clone(); + let schema = schema.clone(); + let uri = test_uri.to_string(); + tokio::spawn(async move { + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + Dataset::write( + reader, + &uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + }) + }) + .collect(); + let results = join_all(futures).await; - dataset.validate().await.unwrap() + for result in results { + assert!(matches!(result, Ok(Ok(_))), "{:?}", result); } + + let dataset = dataset.checkout_version(6).await.unwrap(); + assert_eq!(dataset.get_fragments().len(), 5); + dataset.validate().await.unwrap() } #[tokio::test] diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index efbf6ab8216..8558c06294e 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1823,7 +1823,9 @@ mod tests { use super::*; use crate::dataset::transaction::{DataReplacementGroup, RewriteGroup}; + use crate::dataset::write::WriteMode; use crate::session::caches::DeletionFileKey; + use crate::session::Session; use crate::{ dataset::{CommitBuilder, InsertBuilder, WriteParams}, io, @@ -3590,21 +3592,14 @@ mod tests { #[tokio::test] async fn test_concurrent_overwrites_retryable() { - use crate::dataset::write::{WriteMode, WriteParams}; - use crate::dataset::{CommitBuilder, InsertBuilder}; - - // Create initial dataset at version 1 let dataset = test_dataset(5, 1).await; - - // Simulate two concurrent readers at version 1 let dataset_v1_reader1 = Arc::new(dataset.checkout_version(1).await.unwrap()); let dataset_v1_reader2 = Arc::new(dataset.checkout_version(1).await.unwrap()); - // Prepare data for overwrite let data = RecordBatch::try_new( - Arc::new(arrow_schema::Schema::new(vec![ - arrow_schema::Field::new("a", arrow_schema::DataType::Int32, false), - arrow_schema::Field::new("b", arrow_schema::DataType::Int32, true), + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, true), ])), vec![ Arc::new(Int32Array::from_iter_values(10..15)), @@ -3613,7 +3608,7 @@ mod tests { ) .unwrap(); - // First overwrite: write uncommitted transaction + // First overwrite succeeds let txn1 = InsertBuilder::new(dataset_v1_reader1.clone()) .with_params(&WriteParams { mode: WriteMode::Overwrite, @@ -3622,15 +3617,13 @@ mod tests { .execute_uncommitted(vec![data.clone()]) .await .unwrap(); - - // Commit first overwrite - should succeed let dataset_v2 = CommitBuilder::new(dataset_v1_reader1) .execute(txn1) .await .unwrap(); assert_eq!(dataset_v2.manifest.version, 2); - // Second overwrite: write uncommitted transaction + // Second overwrite should fail with retryable conflict let txn2 = InsertBuilder::new(dataset_v1_reader2.clone()) .with_params(&WriteParams { mode: WriteMode::Overwrite, @@ -3639,45 +3632,28 @@ mod tests { .execute_uncommitted(vec![data]) .await .unwrap(); - - // Commit second overwrite - should fail with retryable conflict let result = CommitBuilder::new(dataset_v1_reader2).execute(txn2).await; - assert!( matches!(result, Err(Error::RetryableCommitConflict { .. })), "Expected RetryableCommitConflict but got: {:?}", result ); - // Verify the first overwrite succeeded and dataset is at version 2 assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5); } #[tokio::test] async fn test_concurrent_create_incompatible() { - use crate::dataset::write::{WriteMode, WriteParams}; - use crate::dataset::{CommitBuilder, InsertBuilder}; - use crate::session::Session; - - // Two concurrent creates should be incompatible. - // Only one can create the dataset, the other must fail without retry option. - - // Use shared session so both commits see the same memory store - let session = std::sync::Arc::new(Session::default()); + // Shared session so both commits see the same memory store + let session = Arc::new(Session::default()); let uri = "memory://test_concurrent_create"; - // Prepare data for creates let data = RecordBatch::try_new( - Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( - "a", - arrow_schema::DataType::Int32, - false, - )])), + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), vec![Arc::new(Int32Array::from_iter_values(0..5))], ) .unwrap(); - // First create: write uncommitted transaction let txn1 = InsertBuilder::new(uri) .with_params(&WriteParams { mode: WriteMode::Create, @@ -3688,7 +3664,6 @@ mod tests { .await .unwrap(); - // Second create: write uncommitted transaction (same URI, no dataset exists yet) let txn2 = InsertBuilder::new(uri) .with_params(&WriteParams { mode: WriteMode::Create, @@ -3699,11 +3674,10 @@ mod tests { .await .unwrap(); - // Both transactions should have read_version == 0 (creates) assert_eq!(txn1.read_version, 0); assert_eq!(txn2.read_version, 0); - // Commit first create - should succeed + // First create succeeds let dataset_v1 = CommitBuilder::new(uri) .with_session(session.clone()) .execute(txn1) @@ -3711,21 +3685,17 @@ mod tests { .unwrap(); assert_eq!(dataset_v1.manifest.version, 1); - // Commit second create - should fail with IncompatibleTransaction - // (DatasetAlreadyExists would only occur in true concurrent scenarios where the - // pre-check race condition causes both to call commit_new_dataset) + // Second create should fail with IncompatibleTransaction let result = CommitBuilder::new(uri) .with_session(session.clone()) .execute(txn2) .await; - assert!( matches!(result, Err(Error::IncompatibleTransaction { .. })), "Expected IncompatibleTransaction for concurrent creates, got: {:?}", result ); - // Verify only the first create succeeded assert_eq!(dataset_v1.count_rows(None).await.unwrap(), 5); } } From 770d1d710642841d673206f9bda106f55120929e Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 11:51:39 -0800 Subject: [PATCH 4/8] fix --- rust/lance/src/io/commit.rs | 8 ++-- rust/lance/src/io/commit/conflict_resolver.rs | 46 +++++++------------ 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 21d06b8f4ea..3283319d5b2 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -830,15 +830,13 @@ pub(crate) async fn commit_transaction( (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?; // For create operations (read_version == 0), if the dataset already exists, - // this is an incompatible conflict. Only one process can create the dataset. + // only one process can create the dataset. if read_version == 0 && matches!(transaction.operation, Operation::Overwrite { .. }) && dataset.manifest.version > 0 { - return Err(crate::Error::IncompatibleTransaction { - source: "This create operation conflicts with a concurrent create. \ - The dataset already exists at a newer version." - .into(), + return Err(crate::Error::DatasetAlreadyExists { + uri: dataset.uri().to_string(), location: location!(), }); } diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 8558c06294e..775ad49793b 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -890,17 +890,6 @@ impl<'a> TransactionRebase<'a> { ) -> Result<()> { match &other_transaction.operation { Operation::Overwrite { .. } => { - // Two concurrent creates (read_version == 0) are incompatible - - // only one can create the dataset, the other must fail. - if self.transaction.read_version == 0 && other_transaction.read_version == 0 { - return Err(self.incompatible_conflict_err( - other_transaction, - other_version, - location!(), - )); - } - - // Upsert key conflicts are incompatible if self .transaction .operation @@ -912,8 +901,8 @@ impl<'a> TransactionRebase<'a> { location!(), )) } else { - // Two concurrent overwrites (on existing dataset) are retryable - // so user can decide if their overwrite should still proceed + // Concurrent overwrites are retryable so user can decide + // if their overwrite should still proceed Err(self.retryable_conflict_err(other_transaction, other_version, location!())) } } @@ -2441,21 +2430,18 @@ mod tests { config_upsert_values: None, initial_bases: None, }, - // Note: In this test, both transactions have read_version == 0, - // so both overwrites are "creates". Two concurrent creates are - // incompatible (only one can create the dataset). - // For overwrites on existing datasets (read_version > 0), the - // conflict would be Retryable instead. + // Concurrent overwrites are retryable so user can decide + // if their overwrite should still proceed. [ - Compatible, // append - Compatible, // create index - Compatible, // delete - Compatible, // merge - NotCompatible, // overwrite (both are creates with read_version == 0) - Compatible, // rewrite - Compatible, // reserve - Compatible, // update - Compatible, // update config + Compatible, // append + Compatible, // create index + Compatible, // delete + Compatible, // merge + Retryable, // overwrite + Compatible, // rewrite + Compatible, // reserve + Compatible, // update + Compatible, // update config ], ), ( @@ -3685,14 +3671,14 @@ mod tests { .unwrap(); assert_eq!(dataset_v1.manifest.version, 1); - // Second create should fail with IncompatibleTransaction + // Second create should fail with DatasetAlreadyExists let result = CommitBuilder::new(uri) .with_session(session.clone()) .execute(txn2) .await; assert!( - matches!(result, Err(Error::IncompatibleTransaction { .. })), - "Expected IncompatibleTransaction for concurrent creates, got: {:?}", + matches!(result, Err(Error::DatasetAlreadyExists { .. })), + "Expected DatasetAlreadyExists for concurrent creates, got: {:?}", result ); From c57ce3fba05c0e418bea97b8f5df936f74197319 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 13:27:18 -0800 Subject: [PATCH 5/8] fix --- rust/lance/src/io/commit.rs | 12 ---- rust/lance/src/io/commit/conflict_resolver.rs | 58 ------------------- 2 files changed, 70 deletions(-) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 3283319d5b2..42ed37a627a 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -829,18 +829,6 @@ pub(crate) async fn commit_transaction( if !strict_overwrite { (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?; - // For create operations (read_version == 0), if the dataset already exists, - // only one process can create the dataset. - if read_version == 0 - && matches!(transaction.operation, Operation::Overwrite { .. }) - && dataset.manifest.version > 0 - { - return Err(crate::Error::DatasetAlreadyExists { - uri: dataset.uri().to_string(), - location: location!(), - }); - } - // See if we can retry the commit. Try to account for all // transactions that have been committed since the read_version. // Use small amount of backoff to handle transactions that all diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 775ad49793b..fc248998643 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1814,7 +1814,6 @@ mod tests { use crate::dataset::transaction::{DataReplacementGroup, RewriteGroup}; use crate::dataset::write::WriteMode; use crate::session::caches::DeletionFileKey; - use crate::session::Session; use crate::{ dataset::{CommitBuilder, InsertBuilder, WriteParams}, io, @@ -3627,61 +3626,4 @@ mod tests { assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5); } - - #[tokio::test] - async fn test_concurrent_create_incompatible() { - // Shared session so both commits see the same memory store - let session = Arc::new(Session::default()); - let uri = "memory://test_concurrent_create"; - - let data = RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), - vec![Arc::new(Int32Array::from_iter_values(0..5))], - ) - .unwrap(); - - let txn1 = InsertBuilder::new(uri) - .with_params(&WriteParams { - mode: WriteMode::Create, - session: Some(session.clone()), - ..Default::default() - }) - .execute_uncommitted(vec![data.clone()]) - .await - .unwrap(); - - let txn2 = InsertBuilder::new(uri) - .with_params(&WriteParams { - mode: WriteMode::Create, - session: Some(session.clone()), - ..Default::default() - }) - .execute_uncommitted(vec![data]) - .await - .unwrap(); - - assert_eq!(txn1.read_version, 0); - assert_eq!(txn2.read_version, 0); - - // First create succeeds - let dataset_v1 = CommitBuilder::new(uri) - .with_session(session.clone()) - .execute(txn1) - .await - .unwrap(); - assert_eq!(dataset_v1.manifest.version, 1); - - // Second create should fail with DatasetAlreadyExists - let result = CommitBuilder::new(uri) - .with_session(session.clone()) - .execute(txn2) - .await; - assert!( - matches!(result, Err(Error::DatasetAlreadyExists { .. })), - "Expected DatasetAlreadyExists for concurrent creates, got: {:?}", - result - ); - - assert_eq!(dataset_v1.count_rows(None).await.unwrap(), 5); - } } From 1203dd622f722f18e2b26b52fbed64a61f55e024 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 13:48:17 -0800 Subject: [PATCH 6/8] fix java test --- java/src/test/java/org/lance/operation/OverwriteTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/src/test/java/org/lance/operation/OverwriteTest.java b/java/src/test/java/org/lance/operation/OverwriteTest.java index f5a90de5b49..276937f8f6a 100644 --- a/java/src/test/java/org/lance/operation/OverwriteTest.java +++ b/java/src/test/java/org/lance/operation/OverwriteTest.java @@ -65,6 +65,7 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { } // Commit fragment again + dataset.checkoutLatest(); rowCount = 40; fragmentMeta = testDataset.createNewFragment(rowCount); transaction = From b38466fd1e21297aeb1da54ecc05dbfdcd750d26 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 13:54:32 -0800 Subject: [PATCH 7/8] improve java test --- .../org/lance/operation/OverwriteTest.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/java/src/test/java/org/lance/operation/OverwriteTest.java b/java/src/test/java/org/lance/operation/OverwriteTest.java index 276937f8f6a..1f266440ae2 100644 --- a/java/src/test/java/org/lance/operation/OverwriteTest.java +++ b/java/src/test/java/org/lance/operation/OverwriteTest.java @@ -29,6 +29,8 @@ import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class OverwriteTest extends OperationTestBase { @@ -64,8 +66,7 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { } } - // Commit fragment again - dataset.checkoutLatest(); + // Try to commit from stale version (v1) - should fail with retryable error rowCount = 40; fragmentMeta = testDataset.createNewFragment(rowCount); transaction = @@ -81,6 +82,26 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { .build(); assertEquals( "value", transaction.transactionProperties().map(m -> m.get("key")).orElse(null)); + + RuntimeException ex = + assertThrows(RuntimeException.class, () -> transaction.commit().close()); + assertTrue( + ex.getMessage().contains("Retryable commit conflict"), + "Expected retryable commit conflict error, got: " + ex.getMessage()); + + // Checkout latest and retry - should succeed + dataset.checkoutLatest(); + transaction = + dataset + .newTransactionBuilder() + .operation( + Overwrite.builder() + .fragments(Collections.singletonList(fragmentMeta)) + .schema(testDataset.getSchema()) + .configUpsertValues(Collections.singletonMap("config_key", "config_value")) + .build()) + .transactionProperties(Collections.singletonMap("key", "value")) + .build(); try (Dataset dataset = transaction.commit()) { assertEquals(3, dataset.version()); assertEquals(3, dataset.latestVersion()); From 5b86bbb9e4379e8d8dc2d0a6320041b631f96a86 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 25 Feb 2026 14:10:18 -0800 Subject: [PATCH 8/8] fix java11 --- .../java/org/lance/operation/OverwriteTest.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/java/src/test/java/org/lance/operation/OverwriteTest.java b/java/src/test/java/org/lance/operation/OverwriteTest.java index 1f266440ae2..5ecda106fe4 100644 --- a/java/src/test/java/org/lance/operation/OverwriteTest.java +++ b/java/src/test/java/org/lance/operation/OverwriteTest.java @@ -69,7 +69,7 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { // Try to commit from stale version (v1) - should fail with retryable error rowCount = 40; fragmentMeta = testDataset.createNewFragment(rowCount); - transaction = + Transaction staleTxn = dataset .newTransactionBuilder() .operation( @@ -80,18 +80,16 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { .build()) .transactionProperties(Collections.singletonMap("key", "value")) .build(); - assertEquals( - "value", transaction.transactionProperties().map(m -> m.get("key")).orElse(null)); + assertEquals("value", staleTxn.transactionProperties().map(m -> m.get("key")).orElse(null)); - RuntimeException ex = - assertThrows(RuntimeException.class, () -> transaction.commit().close()); + RuntimeException ex = assertThrows(RuntimeException.class, () -> staleTxn.commit().close()); assertTrue( ex.getMessage().contains("Retryable commit conflict"), "Expected retryable commit conflict error, got: " + ex.getMessage()); // Checkout latest and retry - should succeed dataset.checkoutLatest(); - transaction = + Transaction retryTxn = dataset .newTransactionBuilder() .operation( @@ -102,7 +100,7 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { .build()) .transactionProperties(Collections.singletonMap("key", "value")) .build(); - try (Dataset dataset = transaction.commit()) { + try (Dataset dataset = retryTxn.commit()) { assertEquals(3, dataset.version()); assertEquals(3, dataset.latestVersion()); assertEquals(rowCount, dataset.countRows()); @@ -113,7 +111,7 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { Schema schemaRes = scanner.schema(); assertEquals(testDataset.getSchema(), schemaRes); } - assertEquals(transaction, dataset.readTransaction().orElse(null)); + assertEquals(retryTxn, dataset.readTransaction().orElse(null)); } } }