Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer#12996
Conversation
9081c13 to
0e50889
Compare
| private PartitionSpec spec; | ||
| private int writerKey; | ||
| private RowData rowData; | ||
| private boolean upsertMode; |
There was a problem hiding this comment.
Should we rename this to isUpsert or if it denotes an actual mode use an enum instead?
There was a problem hiding this comment.
Can do but it's consistent with the coding style. We often omit these verbs from the getters in Iceberg.
There was a problem hiding this comment.
in that case upsert or useUpsertMode would probably a better name
| private String tableName; | ||
| private String branch; | ||
| private Schema schema; | ||
| private PartitionSpec spec; |
There was a problem hiding this comment.
Should we rename this to partitionSpec in case some other kind of spec appears in the future?
There was a problem hiding this comment.
I was also leaning towards this name in the beginning, but it's Iceberg convention to use this name across the code base. We can rename though if this is a concern.
| // Check that the schema id can be resolved. Not strictly necessary for serialization. | ||
| Tuple3<RowDataSerializer, Schema, PartitionSpec> serializer = | ||
| serializerCache.serializerWithSchemaAndSpec( | ||
| toSerialize.tableName(), | ||
| toSerialize.schema().schemaId(), | ||
| toSerialize.spec().specId()); |
There was a problem hiding this comment.
if not strictly necessary why do we do it? What happens if this fails / why would it fail?
There was a problem hiding this comment.
This is basically a sanity-test, to test that looking up the serializer by id on the remote side will work. The remote side won't have the schema available, because it is not written in this branch. If there are any issues, we will know about them on the sender side, as opposed on the receiving side.
I've added a JavaDoc which should clarify things.
| private String branch; | ||
| private Schema schema; | ||
| private RowData rowData; | ||
| private PartitionSpec spec; |
There was a problem hiding this comment.
should this be called partitionSpec in case other specs are added in the future?
| private PartitionSpec spec; | ||
| private DistributionMode mode; | ||
| private int writeParallelism; | ||
| private boolean upsertMode; |
There was a problem hiding this comment.
a boolean doesn't really describe a mode , should this be an enum or isUpsert maybe?
There was a problem hiding this comment.
I think it does. If enabled, upsert mode will be used.
| private Schema schema; | ||
| private RowData rowData; | ||
| private PartitionSpec spec; | ||
| private DistributionMode mode; |
There was a problem hiding this comment.
should this be distribution or distributionMode? (it is already clashing with upsertMode a little)
There was a problem hiding this comment.
Yes, it makes sense to rename to distributionMode.
|
Thanks for the review @gyfora! I think it makes sense to rename the API-facing fields / getters / setters to avoid confusion for users. |
| private DistributionMode mode; | ||
| private int writeParallelism; | ||
| private boolean upsertMode; | ||
| @Nullable private List<String> equalityFields; |
There was a problem hiding this comment.
Only this field is nullable?
Shall we use the annotation consistently?
There was a problem hiding this comment.
Correct, only this field is currently nullable / optional. We could add some defaults. I was thinking to add a builder, what do you think?
There was a problem hiding this comment.
A builder makes sense to me, as we have many parameters
It might be strange for new developers, but we always omit |
In general I get the idea, but my particular concern was related to |
I assume this is |
|
I've pushed an update to address the comments. On the name discussion: I think this is all just convention. Every community has its own styles. I don't think either way makes more sense. The most important reason is consistency. All existing Flink Iceberg sinks use that name. I don't see a strong case to deviate from it. I did rename |
This comment was marked as resolved.
This comment was marked as resolved.
…nalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
665aa07 to
ec7d036
Compare
|
(rebased and squashed commits) |
| return tableIdentifier; | ||
| } | ||
|
|
||
| public void setTableIdentifier(TableIdentifier tableIdentifier) { |
There was a problem hiding this comment.
Do we need these setters, if we have a builder?
There was a problem hiding this comment.
We wouldn't. I'm not sure though we should remove these methods, as they allow DynamicRecord to be reused. If we add the builder, that won't be possible anymore.
| return tableName; | ||
| } | ||
|
|
||
| public void setTableName(String tableName) { |
There was a problem hiding this comment.
We currently use these setters here to allow for Flink's object reuse mode:
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
…ternal / DynamicRecordInternalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
…mparison and evolution This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This backports apache#12996 to Flink versions 1.19. Patch applied cleanly, apart from re-adding a 2.0 removed interface method in DynamicRecordInternalSerializer.
This backports apache#12996 to Flink versions 1.20. Patch applied cleanly, apart from re-adding a 2.0 removed interface method in DynamicRecordInternalSerializer.
…cordInternalSerializer to Flink 1.19 / 1.20 (apache#13246) backports apache#12996
…cordInternalSerializer to Flink 1.19 / 1.20 (apache#13246) backports apache#12996
This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer.
Broken out of #12424.
The original PR is based on Flink 1.20. This version is based on Flink 2.0.