Flink: Dynamic Iceberg Sink: Add dynamic writer and committer#13080
Flink: Dynamic Iceberg Sink: Add dynamic writer and committer#13080pvary merged 23 commits intoapache:mainfrom
Conversation
|
Could you please provide tests? |
| /** | ||
| * The aggregated results of a single checkpoint which should be committed. Containing the | ||
| * serialized {@link org.apache.iceberg.flink.sink.DeltaManifests} file - which contains the commit | ||
| * data, and the jobId, operatorId, checkpointId triplet which help identifying the specific commit |
There was a problem hiding this comment.
nit: help -> helps
Add a . at the end of the sentence
|
Thanks for the review @pvary. I'll some more tests for the writer / aggregator / committer. There is more integration / e2e testing coming with the subsequent PRs, but we should also test the individual components. |
073623b to
000e7df
Compare
pvary
left a comment
There was a problem hiding this comment.
Please move all tests to package private, and remove @Internal from non-public classes.
Otherwise +1 LGTM.
|
Merged to main. Could you please create the backport PRs for this and the previous commits too? |
|
Thanks for merging! I'll create the backport PRs. |
…r to Flink 1.19 / 1.20 (apache#13248) Backports apache#13080
| private static final Duration CACHE_EXPIRATION_DURATION = Duration.ofMinutes(1); | ||
|
|
||
| private final CatalogLoader catalogLoader; | ||
| private transient Map<WriteTarget, Collection<DynamicWriteResult>> results; |
There was a problem hiding this comment.
@mxm @pvary @rodmeneses should Map<WriteTarget, Collection<DynamicWriteResult>> results; be serialised with the class for recovery in Flink? Or does Flink guarantee to reissue all write results after recovery if the pre-commit topology fails? This is serialisable in the original IcebergSink:
There was a problem hiding this comment.
We do not serialize the WriteResults in the non-dynamic sink. We flush them on checkpoint via the prepareSnapshotPreBarrier method.
| @Override | ||
| public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { | ||
| Collection<CommittableWithLineage<DynamicCommittable>> committables = | ||
| Sets.newHashSetWithExpectedSize(results.size()); |
There was a problem hiding this comment.
Set is used here because there isn't a particular order to the the committables.
| DynamicWriteResult result = | ||
| ((CommittableWithLineage<DynamicWriteResult>) element.getValue()).getCommittable(); | ||
| WriteTarget key = result.key(); | ||
| results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result); |
There was a problem hiding this comment.
We use a Set here because there is no enforced order. I'm curious, why do you want to use List?
…r to Flink 1.19 / 1.20 (apache#13248) Backports apache#13080
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter.
Broken out of #12424.