Flink: Dynamic Iceberg Sink: Optimise RowData evolution#13340
Flink: Dynamic Iceberg Sink: Optimise RowData evolution#13340pvary merged 6 commits intoapache:mainfrom
Conversation
mxm
left a comment
There was a problem hiding this comment.
Thanks for improving the performance on the conversion write path @aiborodin! It looks like this PR contains two separate changes:
- Adding caching to the conversion write path
- Refactoring
RowDataEvolverto dynamically instantiate converter classes (quasi code generation)
I wonder if we can do (1) as a first step. RowDataEvolver so far has been static and I understand that it needs to become an object in order to add the cache, but perhaps we can use a central RowDataEvolver instance with a cache for source and target schema first. I'm not sure adding the code generation yields much performance and I would like to minimize the objects getting created.
913c0c6 to
0a6af3a
Compare
|
According to the profile in my previous comment #13340 (comment), schema caching would not be sufficient and we also need to cache field accessors and converters to minimise the CPU overhead. The object overhead is minimal as each converter would only store filed accessors and conversion lambdas. The cache overhead is minimal because it is an identity cache and same schema objects are already cached in TableMetadataCache. |
0a6af3a to
5c63747
Compare
mxm
left a comment
There was a problem hiding this comment.
Thanks for explaining the rational behind the change. This is an excellent contribution!
c918919 to
8e45f21
Compare
pvary
left a comment
There was a problem hiding this comment.
LGTM +1
A few small changes, and we are ready
eeb0687 to
a888dc3
Compare
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.
TableMetadataCache already contains an identity cache to store schema comparison results. Let's move the row data converter cache into SchemaInfo and make it configurable.
83e1150 to
2339e78
Compare
|
Nice last commits 😂 |
|
Merged to main. @aiborodin: Could you please create a backport PR to port these changes to Flink 1.20, 1.19. Also, you need to change anything above cleanly applying the change, please highlight, so it is easier to review. Thanks for all of your work on this! Happy to have you as a contributor! |
RowDataEvolver recomputes Flink RowType and field getters for every input record that needs to match a destination Iceberg table schema. Cache field getters and column converters to optimise RowData conversion.