Flink: Dynamic Iceberg Sink: Add table update code for schema comparison and evolution #13032
Flink: Dynamic Iceberg Sink: Add table update code for schema comparison and evolution #13032pvary merged 16 commits intoapache:mainfrom
Conversation
f632507 to
bf02ad6
Compare
…st schema updates
bf02ad6 to
8a539c8
Compare
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
8a539c8 to
61fedb1
Compare
…mparison and evolution This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
|
@mxm: Do you think it would be good to have a discussion on the dev list about the Schema/Partition evolution changes?
|
|
Thanks for the review @pvary. It does make sense to ask the community for feedback on the core changes. I'll start a thread. |
|
Here is the link to the discussion on the mailing list for the core changes: https://lists.apache.org/thread/rmqjbwd8wcqpmkow51m843pwh8fbq4bt |
| LOG.info( | ||
| "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); | ||
| UpdateSchema updateApi = table.updateSchema(); | ||
| EvolveSchemaVisitor.visit(updateApi, tableSchema, schema); |
There was a problem hiding this comment.
Why not UnionByNameVisitor here? Or maybe we can make this configurable somehow?
There was a problem hiding this comment.
I agree we would like this to be configurable, e.g. to allow dropping columns if needed which is not currently supported.
There was a problem hiding this comment.
@mxm Why is dropping columns unsupported? Also, more generally, why do we need a custom EvolveSchemaVisitor rather than using a stock UnionByNameVisitor?
There was a problem hiding this comment.
Dropping columns is tricky. Once we drop a field, its gone forever. If we re-add a field with the same name later on, it will not inherit the old field's data. If the data does not arrive in the expected order, this could have unforeseen consequences. That's why we opted for not supporting it for now. We need the custom visitor because of the reduced support of schema changes.
There was a problem hiding this comment.
@mxm Could you please clarify how we specifically reduce the support for schema changes with a custom EvolveSchemaVisitor? It appears that the public UnionByNameVisitor has very similar code and performs an almost equivalent schema evolution update, so it seems we can use it instead of maintaining a custom visitor.
cc: @pvary
There was a problem hiding this comment.
That's a great suggestion. The original code in EvolveSchemaVisitor diverted in more ways, e.g. it allowed dropping columns. There are still some differences, e.g. required fields in the table schema for which there is no match in the input schema are made optional, default values are not changed, no case sensitivity.
That said, I'm all for using as much for Iceberg core as possible. If we do not think that these subtle changes matter for users, we can use UnionByNameVisitor.
| catalog.loadTable(identifier).manageSnapshots().createBranch(branch).commit(); | ||
| LOG.info("Branch {} for {} created", branch, identifier); | ||
| } catch (Exception e) { | ||
| LOG.info( |
There was a problem hiding this comment.
Can we distinguish the cases when the branch is created concurrently, and log only when there is an issue really? Maybe warn in this case?
And debug, if it is just a concurrent update which is done by another updater?
| cache.update(identifier, table); | ||
| return Tuple2.of(tableSchema, result); | ||
| case SCHEMA_UPDATE_NEEDED: | ||
| LOG.info( |
There was a problem hiding this comment.
Please check all of the logs.
General rule:
- Log after successful change
- On failure check if it is concurrent update
- If we can continue because someone fixed the table for us, log on debug level
- If some real error, log warn, or fail
pvary
left a comment
There was a problem hiding this comment.
LGTM
Please fix the logging and the javadoc, and I think this is fine
|
Thanks @pvary for the review and for merging the changes! 🙏 |
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
…a comparison and evolution to Flink 1.19 / 1.20 (apache#13247) Backports apache#13032
| return lastResult; | ||
| } | ||
|
|
||
| for (Map.Entry<Integer, Schema> tableSchema : cached.schema.schemas.entrySet()) { |
There was a problem hiding this comment.
@mxm @pvary, why do we need to loop through all previous table schemas here? Would it be more correct to always compare the incoming record schema to the latest table schema and evolve the table, or convert the incoming records to match the latest schema?
The side effect of the current approach is that the dynamic sink will create multiple DynamicCommittable instances for each resolved table schema of cached.schema.schemas.entrySet() in the dynamic commit aggregator, which will issue multiple commits to an Iceberg table per checkpoint - one commit per each table schema id. However, each of these commits will have a schema-id in its metadata snapshot pointing to the latest table schema, which seems wrong. See the SnapshotProducer implementation.
I can see the performance benefit of exactly matching an incoming schema to a previous table schema, as we don't have to go through the conversion code and can directly write using one of the earlier schemas.
Is it valid in Iceberg to constantly alternate between multiple schemas? Or should we only evolve the latest schema and adjust incoming records to match it? What do you think?
There was a problem hiding this comment.
@mxm @pvary, why do we need to loop through all previous table schemas here?
We need to loop through all existing schemas to avoid adding a new schema in case there is already a matching schema. If we do not find a match, we will modify the current table schema and thereby produce a new schema.
Would it be more correct to always compare the incoming record schema to the latest table schema and evolve the table, or convert the incoming records to match the latest schema?
The logic is as follows:
- If there is an exact match schema already (i.e. field types and field names match) => use that schema
- If there is a compatible schema (e.g. extra optional field) => convert RowData to match that schema
- Otherwise, evolve the current table schema to match the input schema
The side effect of the current approach is that the dynamic sink will create multiple DynamicCommittable instances for each resolved table schema of cached.schema.schemas.entrySet() in the dynamic commit aggregator, [..]
You're right that the schema isn't used in the commit process. The data files already reference the schema id. This creates more DynamicCommittable than if we didn't include schema in the WriteTarget. But semantically this doesn't change how the table gets modified in the end.
[..] which will issue multiple commits to an Iceberg table per checkpoint - one commit per each table schema id
I'm not sure that is true. We commit once per table / branch.
However, each of these commits will have a schema-id in its metadata snapshot pointing to the latest table schema, which seems wrong. See the SnapshotProducer implementation.
I think it makes sense to use the latest schema, since we have to pick one schema and cannot include all.
Is it valid in Iceberg to constantly alternate between multiple schemas? Or should we only evolve the latest schema and adjust incoming records to match it? What do you think?
We only ever evolve the current table schema. We adjust incoming records if there is no existing schema that we can use. But we need to support using old schemas because it is a common use case that old data gets written. Since we don't allow breaking changes like removing fields, all old schemas are still valid and can be used safely, while remaining compatible with newer schemas.
…son and evolution (apache#13032)
…a comparison and evolution to Flink 1.19 / 1.20 (apache#13247) Backports apache#13032
This adds the classes around schema / spec comparison and evolution.
Broken out of #12424. The first commit contains required Iceberg cores changes for partition spec evolution and schema evolution testing.
A breakdown of the classes follows:
CompareSchemasVisitor
Compares the user-provided schema against the current table schema.
EvolveSchemaVisitor
Computes the changes required to the table schema to be compatible with the
user-provided schema.
ParititonSpecEvolution
Code for checking compatibility with the user-provided partition spec and
computing a set of changes to rewrite the PartitionSpec.
TableDataCache
Cache which holds all relevant metadata of a table like its name, branch,
schema, partition spec. Also holds a cache of past comparison results for a
given table's schema and a user-provided input schema.
Table Updater
Core logic to compare and create/update a table given a user-provided input
schema.