Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
Expand All @@ -53,6 +54,8 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final boolean skipCorruptRecord;

private final boolean logCorruptRecord;

private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
Expand All @@ -63,6 +66,7 @@ private CdcDynamicBucketWriteOperator(
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
this.logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD);
}

@Override
Expand All @@ -79,11 +83,13 @@ protected boolean containLogSystem() {
@Override
public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) throws Exception {
Tuple2<CdcRecord, Integer> record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record.f0, table.schema().fields());
Optional<GenericRow> optionalConverted =
toGenericRow(record.f0, table.schema().fields(), logCorruptRecord);
if (!optionalConverted.isPresent()) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record.f0, table.schema().fields());
optionalConverted =
toGenericRow(record.f0, table.schema().fields(), logCorruptRecord);
if (optionalConverted.isPresent()) {
break;
}
Expand All @@ -94,9 +100,13 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr

if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
LOG.warn(
"Skipping corrupt or unparsable record {}",
(logCorruptRecord ? record : "<redacted>"));
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
throw new RuntimeException(
"Unable to process element. Possibly a corrupt record: "
+ (logCorruptRecord ? record : "<redacted>"));
}
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
Expand Down Expand Up @@ -155,14 +156,17 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce

((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor);

boolean logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD);
Optional<GenericRow> optionalConverted =
toGenericRow(record.record(), table.schema().fields());
toGenericRow(record.record(), table.schema().fields(), logCorruptRecord);
if (!optionalConverted.isPresent()) {
FileStoreTable latestTable = table;
for (int retry = 0; retry < retryCnt; ++retry) {
latestTable = latestTable.copyWithLatestSchema();
tables.put(tableId, latestTable);
optionalConverted = toGenericRow(record.record(), latestTable.schema().fields());
optionalConverted =
toGenericRow(
record.record(), latestTable.schema().fields(), logCorruptRecord);
if (optionalConverted.isPresent()) {
break;
}
Expand All @@ -178,9 +182,13 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce

if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
LOG.warn(
"Skipping corrupt or unparsable record {}",
(logCorruptRecord ? record : "<redacted>"));
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
throw new RuntimeException(
"Unable to process element. Possibly a corrupt record: "
+ (logCorruptRecord ? record : "<redacted>"));
}
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,21 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {
.defaultValue(false)
.withDescription("Skip corrupt record if we fail to parse it");

public static final ConfigOption<Boolean> LOG_CORRUPT_RECORD =
ConfigOptions.key("cdc.log-corrupt-record")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to allow corrupt record logging when unable to parse CDC records.");

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

private final boolean logCorruptRecord;

protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
Expand All @@ -83,6 +92,7 @@ protected CdcRecordStoreWriteOperator(
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
this.logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD);
}

@Override
Expand All @@ -99,11 +109,12 @@ protected boolean containLogSystem() {
@Override
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
Optional<GenericRow> optionalConverted =
toGenericRow(record, table.schema().fields(), logCorruptRecord);
if (!optionalConverted.isPresent()) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
optionalConverted = toGenericRow(record, table.schema().fields(), logCorruptRecord);
if (optionalConverted.isPresent()) {
break;
}
Expand All @@ -114,9 +125,13 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {

if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
LOG.warn(
"Skipping corrupt or unparsable record {}",
(logCorruptRecord ? record : "<redacted>"));
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
throw new RuntimeException(
"Unable to process element. Possibly a corrupt record: "
+ (logCorruptRecord ? record : "<redacted>"));
}
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ public static GenericRow projectAsInsert(CdcRecord record, List<DataField> dataF
* CdcRecordUtils#projectAsInsert} instead.
*
* @param dataFields {@link DataField}s of the converted {@link GenericRow}.
* @param logCorruptRecord whether to log data during conversion error
* @return if all field names of {@code dataFields} existed in keys of {@code fields} and all
* values of {@code fields} can be correctly converted to the specified type, an {@code
* Optional#of(GenericRow)} will be returned, otherwise an {@code Optional#empty()} will be
* returned
*/
public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField> dataFields) {
public static Optional<GenericRow> toGenericRow(
CdcRecord record, List<DataField> dataFields, boolean logCorruptRecord) {
GenericRow genericRow = new GenericRow(record.kind(), dataFields.size());
List<String> fieldNames =
dataFields.stream().map(DataField::name).collect(Collectors.toList());
Expand All @@ -105,7 +107,7 @@ public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField
} catch (Exception e) {
LOG.info(
"Failed to convert value "
+ value
+ (logCorruptRecord ? value : "<redacted>")
+ " to type "
+ type
+ ". Waiting for schema update.",
Expand Down
Loading