From d31983218af214786fb10c05b6056e6434fe2c54 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Wed, 18 Dec 2024 10:51:47 +0800 Subject: [PATCH] [cdc] add exception message for CdcRecordStoreMultiWriteOperator --- .../cdc/CdcRecordStoreMultiWriteOperator.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 9387a8293874..a4b4e8284043 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -174,7 +174,7 @@ public void processElement(StreamRecord element) throws Exce try { write.write(optionalConverted.get()); } catch (Exception e) { - throw new IOException(e); + throw new IOException("Exception occurs for writing record to table: " + tableId, e); } } @@ -235,12 +235,17 @@ protected List prepareCommit(boolean waitCompaction, long for (Map.Entry entry : writes.entrySet()) { Identifier key = entry.getKey(); StoreSinkWrite write = entry.getValue(); - committables.addAll( - write.prepareCommit(waitCompaction, checkpointId).stream() - .map( - committable -> - MultiTableCommittable.fromCommittable(key, committable)) - .collect(Collectors.toList())); + try { + committables.addAll( + write.prepareCommit(waitCompaction, checkpointId).stream() + .map( + committable -> + MultiTableCommittable.fromCommittable( + key, committable)) + .collect(Collectors.toList())); + } catch (Exception e) { + throw new IOException("Failed to prepare commit for table: " + key.toString(), e); + } } return committables; }