diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 478b20971fd..88bfcc248d2 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -192,13 +192,15 @@ pub fn create_duplicate_row_error( row_idx: usize, on_columns: &[String], ) -> DataFusionError { - DataFusionError::Execution( - format!( - "Ambiguous merge insert: multiple source rows match the same target row on ({}). \ - This could lead to data corruption. Please ensure each target row is matched by at most one source row.", - format_key_values_on_columns(batch, row_idx, on_columns) + DataFusionError::External( + Box::new( + Error::invalid_input(format!( + "Ambiguous merge inserts are prohibited: multiple source rows match the same target row on ({}). \ + Please ensure each target row is matched by at most one source row.", + format_key_values_on_columns(batch, row_idx, on_columns) + ), location!()) + ) ) - ) } /// Describes how rows should be handled when there is no matching row in the source table @@ -5591,11 +5593,10 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n "Expected merge insert to fail due to duplicate rows on key column." ); - let error_msg = result.unwrap_err().to_string(); assert!( - error_msg.contains("Ambiguous merge insert") && error_msg.contains("multiple source rows"), - "Expected error message to mention ambiguous merge insert and multiple source rows, got: {}", - error_msg + matches!(&result, &Err(Error::InvalidInput { ref source, .. }) if source.to_string().contains("Ambiguous merge insert") && source.to_string().contains("multiple source rows")), + "Expected error to be InvalidInput with message about ambiguous merge insert and multiple source rows, got: {:?}", + result ); } diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 45b915fd353..a203d561fe3 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -695,13 +695,11 @@ impl FullSchemaMergeInsertExec { update_tx: &tokio::sync::mpsc::UnboundedSender>, insert_tx: &tokio::sync::mpsc::UnboundedSender>, ) { - let error_msg = format!("Stream processing failed: {}", error); - - let update_error = datafusion::error::DataFusionError::Internal(error_msg.clone()); - let insert_error = datafusion::error::DataFusionError::Internal(error_msg); - - let _ = update_tx.send(Err(update_error)); - let _ = insert_tx.send(Err(insert_error)); + // Send to first open one. It doesn't matter which one receives it as + // long as the user gets the error in the end. + if let Err(tokio::sync::mpsc::error::SendError(error)) = update_tx.send(Err(error)) { + let _ = insert_tx.send(error); + } } }