From ef36747b921ca5a467427559b0124d9798a7605e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 27 Feb 2026 08:57:01 -0800 Subject: [PATCH 1/3] chore: improve ambiguous merge insert error message --- rust/lance/src/dataset/write/merge_insert.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 478b20971fd..0c634214a0d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -194,8 +194,8 @@ pub fn create_duplicate_row_error( ) -> DataFusionError { DataFusionError::Execution( format!( - "Ambiguous merge insert: multiple source rows match the same target row on ({}). \ - This could lead to data corruption. Please ensure each target row is matched by at most one source row.", + "Ambiguous merge inserts are prohibited: multiple source rows match the same target row on ({}). \ + Please ensure each target row is matched by at most one source row.", format_key_values_on_columns(batch, row_idx, on_columns) ) ) From 63645d67efa49a21cdf721862a10db1ba54259cc Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 27 Feb 2026 09:08:53 -0800 Subject: [PATCH 2/3] feat: surface ambiguous merge insert error as InvalidInput --- rust/lance/src/dataset/write/merge_insert.rs | 21 ++++++++++--------- .../dataset/write/merge_insert/exec/write.rs | 15 ++++++------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 0c634214a0d..88bfcc248d2 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -192,13 +192,15 @@ pub fn create_duplicate_row_error( row_idx: usize, on_columns: &[String], ) -> DataFusionError { - DataFusionError::Execution( - format!( - "Ambiguous merge inserts are prohibited: multiple source rows match the same target row on ({}). \ - Please ensure each target row is matched by at most one source row.", - format_key_values_on_columns(batch, row_idx, on_columns) + DataFusionError::External( + Box::new( + Error::invalid_input(format!( + "Ambiguous merge inserts are prohibited: multiple source rows match the same target row on ({}). \ + Please ensure each target row is matched by at most one source row.", + format_key_values_on_columns(batch, row_idx, on_columns) + ), location!()) + ) ) - ) } /// Describes how rows should be handled when there is no matching row in the source table @@ -5591,11 +5593,10 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n "Expected merge insert to fail due to duplicate rows on key column." ); - let error_msg = result.unwrap_err().to_string(); assert!( - error_msg.contains("Ambiguous merge insert") && error_msg.contains("multiple source rows"), - "Expected error message to mention ambiguous merge insert and multiple source rows, got: {}", - error_msg + matches!(&result, &Err(Error::InvalidInput { ref source, .. }) if source.to_string().contains("Ambiguous merge insert") && source.to_string().contains("multiple source rows")), + "Expected error to be InvalidInput with message about ambiguous merge insert and multiple source rows, got: {:?}", + result ); } diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 45b915fd353..4406bbb75fc 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -695,13 +695,14 @@ impl FullSchemaMergeInsertExec { update_tx: &tokio::sync::mpsc::UnboundedSender>, insert_tx: &tokio::sync::mpsc::UnboundedSender>, ) { - let error_msg = format!("Stream processing failed: {}", error); - - let update_error = datafusion::error::DataFusionError::Internal(error_msg.clone()); - let insert_error = datafusion::error::DataFusionError::Internal(error_msg); - - let _ = update_tx.send(Err(update_error)); - let _ = insert_tx.send(Err(insert_error)); + // Send to first open one. + match update_tx.send(Err(error)) { + Ok(_) => return, + Err(tokio::sync::mpsc::error::SendError(error)) => { + let _ = insert_tx.send(error); + return; + } + } } } From 738f33182b2fb361d47cf70fbb656409ec76380c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 27 Feb 2026 09:25:57 -0800 Subject: [PATCH 3/3] fix clippy --- .../src/dataset/write/merge_insert/exec/write.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 4406bbb75fc..a203d561fe3 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -695,13 +695,10 @@ impl FullSchemaMergeInsertExec { update_tx: &tokio::sync::mpsc::UnboundedSender>, insert_tx: &tokio::sync::mpsc::UnboundedSender>, ) { - // Send to first open one. - match update_tx.send(Err(error)) { - Ok(_) => return, - Err(tokio::sync::mpsc::error::SendError(error)) => { - let _ = insert_tx.send(error); - return; - } + // Send to first open one. It doesn't matter which one receives it as + // long as the user gets the error in the end. + if let Err(tokio::sync::mpsc::error::SendError(error)) = update_tx.send(Err(error)) { + let _ = insert_tx.send(error); } } }