Skip to content

Flink: Dynamic Iceberg Sink: Add sink / core processing logic / benchmarking#13304

Merged
pvary merged 15 commits intoapache:mainfrom
mxm:dynamic-sink-contrib-breakdown5
Jun 17, 2025
Merged

Flink: Dynamic Iceberg Sink: Add sink / core processing logic / benchmarking#13304
pvary merged 15 commits intoapache:mainfrom
mxm:dynamic-sink-contrib-breakdown5

Conversation

@mxm
Copy link
Copy Markdown
Contributor

@mxm mxm commented Jun 12, 2025

This completes the breakdown of #12424. Most importantly, it adds the following missing components:

DynamicIcebergSink

The actual Flink sink which ties together the previously merged components. The sink will be instantiated by the user like this:

DynamicIcebergSink
  .forInput(inputStream)
  .withConverter(new MyConverter())
  .catalogLoader(..)
  .append();  

An example for a converter implementation would be:

class MyConverter implements DynamicRecordConverter<MyInput> {

  final RuntimeParams params;

  MyConverter(RuntimeParams params) {
    this.params = params;
  }

  @Override
  public void convert(MyInput inputRecord, Collector<DynamicRecord> out) {

    DynamicRecord record = new DynamicRecord(
        inputRecord.getTable(),
        inputRecord.getBranch(),
        inputRecord.buildSchema(),
        inputRecord.convertData(),
        PartitionSpec.unpartitioned(),
        params.distributionMode(),
        params.writeParallelism());
        
    out.collect(record);
  }
}

DynamicRecordProcessor

The core processing logic which takes user-provided input to write to Iceberg tables and make the necessary changes to the Iceberg metadata.

Benchmarks

  • DynamicRecordSerializerDeserializerBenchmark
  • TestDynamicIcebergSinkPerf

Previous PRs

The following previous PRs have been merged already:

  1. Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer #12996
  2. Flink: Dynamic Iceberg Sink: Add table update code for schema comparison and evolution  #13032
  3. Flink: Dynamic Iceberg Sink: Add dynamic writer and committer #13080
  4. Flink: Dynamic Iceberg Sink: Add HashKeyGenerator / RowDataEvolver / TableUpdateOperator #13277

}

@Test
void testUpsert() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some tests with state restore?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we have a check were there is concurrent insert to one of the tables we are writing to?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test where there is a concurrent commit to one of the tables by a different sink, or application?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test to fail before / after commit, and one for concurrent commits: 3ef4d87.

Do we have a test where there is a concurrent commit to one of the tables by a different sink, or application?

Not yet.

@pvary pvary merged commit 68651a3 into apache:main Jun 17, 2025
18 checks passed
@pvary
Copy link
Copy Markdown
Contributor

pvary commented Jun 17, 2025

Merged to main.
Thanks for the PR @mxm!

@mxm mxm deleted the dynamic-sink-contrib-breakdown5 branch June 17, 2025 15:33
@mxm
Copy link
Copy Markdown
Contributor Author

mxm commented Jun 17, 2025

Thanks for reviewing / merging @pvary!

mxm added a commit to mxm/iceberg that referenced this pull request Jun 18, 2025
mxm added a commit to mxm/iceberg that referenced this pull request Jun 18, 2025
pvary pushed a commit that referenced this pull request Jun 18, 2025
eric-maynard pushed a commit to eric-maynard/iceberg that referenced this pull request Jun 18, 2025
eric-maynard pushed a commit to eric-maynard/iceberg that referenced this pull request Jun 18, 2025
cogwirrel pushed a commit to cogwirrel/iceberg that referenced this pull request Aug 10, 2025
devendra-nr pushed a commit to devendra-nr/iceberg that referenced this pull request Dec 8, 2025
devendra-nr pushed a commit to devendra-nr/iceberg that referenced this pull request Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants