diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java index 5637e8b12794..96a70632ac79 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.Optional; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD; @@ -53,6 +54,8 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator parameters, FileStoreTable table, @@ -63,6 +66,7 @@ private CdcDynamicBucketWriteOperator( table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis(); this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES); this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD); + this.logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD); } @Override @@ -79,11 +83,13 @@ protected boolean containLogSystem() { @Override public void processElement(StreamRecord> element) throws Exception { Tuple2 record = element.getValue(); - Optional optionalConverted = toGenericRow(record.f0, table.schema().fields()); + Optional optionalConverted = + toGenericRow(record.f0, table.schema().fields(), logCorruptRecord); if (!optionalConverted.isPresent()) { for (int retry = 0; retry < maxRetryNumTimes; ++retry) { table = table.copyWithLatestSchema(); - optionalConverted = toGenericRow(record.f0, table.schema().fields()); + optionalConverted = + toGenericRow(record.f0, table.schema().fields(), logCorruptRecord); if (optionalConverted.isPresent()) { break; } @@ -94,9 +100,13 @@ public void processElement(StreamRecord> element) thr if (!optionalConverted.isPresent()) { if (skipCorruptRecord) { - LOG.warn("Skipping corrupt or unparsable record {}", record); + LOG.warn( + "Skipping corrupt or unparsable record {}", + (logCorruptRecord ? record : "")); } else { - throw new RuntimeException("Unable to process element. Possibly a corrupt record"); + throw new RuntimeException( + "Unable to process element. Possibly a corrupt record: " + + (logCorruptRecord ? record : "")); } } else { try { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index a6c55c55f18c..fb59878b8b72 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -54,6 +54,7 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME; import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD; @@ -155,14 +156,17 @@ public void processElement(StreamRecord element) throws Exce ((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor); + boolean logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD); Optional optionalConverted = - toGenericRow(record.record(), table.schema().fields()); + toGenericRow(record.record(), table.schema().fields(), logCorruptRecord); if (!optionalConverted.isPresent()) { FileStoreTable latestTable = table; for (int retry = 0; retry < retryCnt; ++retry) { latestTable = latestTable.copyWithLatestSchema(); tables.put(tableId, latestTable); - optionalConverted = toGenericRow(record.record(), latestTable.schema().fields()); + optionalConverted = + toGenericRow( + record.record(), latestTable.schema().fields(), logCorruptRecord); if (optionalConverted.isPresent()) { break; } @@ -178,9 +182,13 @@ public void processElement(StreamRecord element) throws Exce if (!optionalConverted.isPresent()) { if (skipCorruptRecord) { - LOG.warn("Skipping corrupt or unparsable record {}", record); + LOG.warn( + "Skipping corrupt or unparsable record {}", + (logCorruptRecord ? record : "")); } else { - throw new RuntimeException("Unable to process element. Possibly a corrupt record"); + throw new RuntimeException( + "Unable to process element. Possibly a corrupt record: " + + (logCorruptRecord ? record : "")); } } else { try { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java index d19e3915f621..f8e9daee4f03 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java @@ -67,12 +67,21 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator { .defaultValue(false) .withDescription("Skip corrupt record if we fail to parse it"); + public static final ConfigOption LOG_CORRUPT_RECORD = + ConfigOptions.key("cdc.log-corrupt-record") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to allow corrupt record logging when unable to parse CDC records."); + private final long retrySleepMillis; private final int maxRetryNumTimes; private final boolean skipCorruptRecord; + private final boolean logCorruptRecord; + protected CdcRecordStoreWriteOperator( StreamOperatorParameters parameters, FileStoreTable table, @@ -83,6 +92,7 @@ protected CdcRecordStoreWriteOperator( table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis(); this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES); this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD); + this.logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD); } @Override @@ -99,11 +109,12 @@ protected boolean containLogSystem() { @Override public void processElement(StreamRecord element) throws Exception { CdcRecord record = element.getValue(); - Optional optionalConverted = toGenericRow(record, table.schema().fields()); + Optional optionalConverted = + toGenericRow(record, table.schema().fields(), logCorruptRecord); if (!optionalConverted.isPresent()) { for (int retry = 0; retry < maxRetryNumTimes; ++retry) { table = table.copyWithLatestSchema(); - optionalConverted = toGenericRow(record, table.schema().fields()); + optionalConverted = toGenericRow(record, table.schema().fields(), logCorruptRecord); if (optionalConverted.isPresent()) { break; } @@ -114,9 +125,13 @@ public void processElement(StreamRecord element) throws Exception { if (!optionalConverted.isPresent()) { if (skipCorruptRecord) { - LOG.warn("Skipping corrupt or unparsable record {}", record); + LOG.warn( + "Skipping corrupt or unparsable record {}", + (logCorruptRecord ? record : "")); } else { - throw new RuntimeException("Unable to process element. Possibly a corrupt record"); + throw new RuntimeException( + "Unable to process element. Possibly a corrupt record: " + + (logCorruptRecord ? record : "")); } } else { try { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java index 91979a2c99b8..9c442487b1dd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java @@ -73,12 +73,14 @@ public static GenericRow projectAsInsert(CdcRecord record, List dataF * CdcRecordUtils#projectAsInsert} instead. * * @param dataFields {@link DataField}s of the converted {@link GenericRow}. + * @param logCorruptRecord whether to log data during conversion error * @return if all field names of {@code dataFields} existed in keys of {@code fields} and all * values of {@code fields} can be correctly converted to the specified type, an {@code * Optional#of(GenericRow)} will be returned, otherwise an {@code Optional#empty()} will be * returned */ - public static Optional toGenericRow(CdcRecord record, List dataFields) { + public static Optional toGenericRow( + CdcRecord record, List dataFields, boolean logCorruptRecord) { GenericRow genericRow = new GenericRow(record.kind(), dataFields.size()); List fieldNames = dataFields.stream().map(DataField::name).collect(Collectors.toList()); @@ -105,7 +107,7 @@ public static Optional toGenericRow(CdcRecord record, List") + " to type " + type + ". Waiting for schema update.",