Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
Expand All @@ -47,6 +49,10 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
Expand All @@ -55,6 +61,8 @@ private CdcDynamicBucketWriteOperator(
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}

@Override
Expand All @@ -73,7 +81,7 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
Tuple2<CdcRecord, Integer> record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record.f0, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record.f0, table.schema().fields());
if (optionalConverted.isPresent()) {
Expand All @@ -84,10 +92,18 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
write.replace(table);
}

try {
write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
Expand Down Expand Up @@ -123,6 +125,9 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce

FileStoreTable table = getTable(tableId);

int retryCnt = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
boolean skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);

// all table write should share one write buffer so that writers can preempt memory
// from those of other tables
if (memoryPoolFactory == null) {
Expand Down Expand Up @@ -154,7 +159,7 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
toGenericRow(record.record(), table.schema().fields());
if (!optionalConverted.isPresent()) {
FileStoreTable latestTable = table;
while (true) {
for (int retry = 0; retry < retryCnt; ++retry) {
latestTable = latestTable.copyWithLatestSchema();
tables.put(tableId, latestTable);
optionalConverted = toGenericRow(record.record(), latestTable.schema().fields());
Expand All @@ -171,10 +176,18 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
write.replace(latestTable);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException("Exception occurs for writing record to table: " + tableId, e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
}
} else {
try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,24 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {
.durationType()
.defaultValue(Duration.ofMillis(500));

public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES =
ConfigOptions.key("cdc.max-retry-num-times")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we prefix the new options with retry- similar to the already existing one cdc.retry-sleep-time?
e.g. cdc.retry-count and cdc.retry-skip-corrupt-record?

.intType()
.defaultValue(100)
.withDescription("Max retry count for updating table before failing loudly");

public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD =
ConfigOptions.key("cdc.skip-corrupt-record")
.booleanType()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether this is true or false, it will change the current default behaviour of waiting indefinitely for a schema change. so we could either add an option to preserve the current behaviour as default and make retries optional. or make sure patch notes contain an info about the breaking change that paimon will no longer wait indefinitely for schema updates but rather retry and optionally skip unreadable rows.

.withDescription("Skip corrupt record if we fail to parse it");

private final long retrySleepMillis;

private final int maxRetryNumTimes;

private final boolean skipCorruptRecord;

protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
Expand All @@ -62,6 +78,8 @@ protected CdcRecordStoreWriteOperator(
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
}

@Override
Expand All @@ -80,7 +98,7 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to me it's slightly unusual to increment the retry counter before entering the block. but i guess it does the same thing as int retry = 0; retry <= maxRetryNumTimes; retry++ and is just preference

table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
if (optionalConverted.isPresent()) {
Expand All @@ -91,10 +109,18 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
write.replace(table);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
if (!optionalConverted.isPresent()) {
if (skipCorruptRecord) {
LOG.warn("Skipping corrupt or unparsable record {}", record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note that this might leak sensitive data to the log system. maybe we can log some metadata about the record instead of the full record?

} else {
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we include some info about the record in the exception too?

}
} else {
try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}

Expand Down
Loading