From e4f07a75c8a9e38101f89580a7bfab76a4bb487a Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 12 Jun 2025 11:52:26 +0200 Subject: [PATCH 01/15] Flink: Add DynamicIcebergSink based on its previously merged components --- ...RecordSerializerDeserializerBenchmark.java | 138 ++++ .../apache/iceberg/flink/FlinkConfParser.java | 7 + .../apache/iceberg/flink/FlinkWriteConf.java | 4 + .../sink/dynamic/DynamicIcebergSink.java | 449 +++++++++++++ .../sink/dynamic/DynamicRecordConverter.java | 31 + .../sink/dynamic/DynamicRecordProcessor.java | 163 +++++ .../flink/sink/dynamic/HashKeyGenerator.java | 16 +- .../apache/iceberg/flink/SimpleDataUtil.java | 22 +- .../org/apache/iceberg/flink/TestHelpers.java | 2 +- .../flink/sink/TestFlinkIcebergSinkBase.java | 5 +- .../sink/dynamic/TestDynamicIcebergSink.java | 593 ++++++++++++++++++ .../dynamic/TestDynamicIcebergSinkPerf.java | 226 +++++++ 12 files changed, 1646 insertions(+), 10 deletions(-) create mode 100644 flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java diff --git a/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java b/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java new file mode 100644 index 000000000000..d7c3a7b32bc8 --- /dev/null +++ b/flink/v2.0/flink/src/jmh/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordSerializerDeserializerBenchmark.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class DynamicRecordSerializerDeserializerBenchmark { + private static final int SAMPLE_SIZE = 100_000; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name2", Types.StringType.get()), + Types.NestedField.required(3, "name3", Types.StringType.get()), + Types.NestedField.required(4, "name4", Types.StringType.get()), + Types.NestedField.required(5, "name5", Types.StringType.get()), + Types.NestedField.required(6, "name6", Types.StringType.get()), + Types.NestedField.required(7, "name7", Types.StringType.get()), + Types.NestedField.required(8, "name8", Types.StringType.get()), + Types.NestedField.required(9, "name9", Types.StringType.get())); + + private List rows = Lists.newArrayListWithExpectedSize(SAMPLE_SIZE); + private DynamicRecordInternalType type; + + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .include(DynamicRecordSerializerDeserializerBenchmark.class.getSimpleName()) + .build(); + new Runner(options).run(); + } + + @Setup + public void setupBenchmark() throws IOException { + List records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L); + this.rows = + records.stream() + .map( + r -> + new DynamicRecordInternal( + "t", + "main", + SCHEMA, + RowDataConverter.convert(SCHEMA, r), + PartitionSpec.unpartitioned(), + 1, + false, + Collections.emptySet())) + .collect(Collectors.toList()); + + File warehouse = Files.createTempFile("perf-bench", null).toFile(); + CatalogLoader catalogLoader = + CatalogLoader.hadoop( + "hadoop", + new Configuration(), + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getPath())); + this.type = new DynamicRecordInternalType(catalogLoader, true, 100); + } + + @Benchmark + @Threads(1) + public void testSerialize(Blackhole blackhole) throws IOException { + TypeSerializer serializer = + type.createSerializer((SerializerConfig) null); + DataOutputSerializer outputView = new DataOutputSerializer(1024); + for (int i = 0; i < SAMPLE_SIZE; ++i) { + serializer.serialize(rows.get(i), outputView); + } + } + + @Benchmark + @Threads(1) + public void testSerializeAndDeserialize(Blackhole blackhole) throws IOException { + TypeSerializer serializer = + type.createSerializer((SerializerConfig) null); + + DataOutputSerializer outputView = new DataOutputSerializer(1024); + for (int i = 0; i < SAMPLE_SIZE; ++i) { + serializer.serialize(rows.get(i), outputView); + serializer.deserialize(new DataInputDeserializer(outputView.getSharedBuffer())); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index d6d2fd92f4d8..e0672811cf5f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -28,6 +28,7 @@ import org.apache.flink.util.TimeUtils; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @Internal @@ -43,6 +44,12 @@ public FlinkConfParser(Table table, Map options, ReadableConfig this.readableConfig = readableConfig; } + FlinkConfParser(Map options, ReadableConfig readableConfig) { + this.tableProperties = ImmutableMap.of(); + this.options = options; + this.readableConfig = readableConfig; + } + public BooleanConfParser booleanConf() { return new BooleanConfParser(); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 12ad7989c3e9..222a1e810468 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -55,6 +55,10 @@ public FlinkWriteConf( this.confParser = new FlinkConfParser(table, writeOptions, readableConfig); } + public FlinkWriteConf(Map writeOptions, ReadableConfig readableConfig) { + this.confParser = new FlinkConfParser(writeOptions, readableConfig); + } + public boolean overwriteMode() { return confParser .booleanConf() diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java new file mode 100644 index 000000000000..1cdf10e13b3b --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.FlinkWriteOptions; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Dynamic version of the IcebergSink which supports: + * + *
    + *
  1. Writing to any number of tables (No more 1:1 sink/topic relationship). + *
  2. Creating and updating tables based on the user-supplied routing. + *
  3. Updating the schema and partition spec of tables based on the user-supplied specification. + *
+ */ +@Experimental +public class DynamicIcebergSink + implements Sink, + SupportsPreWriteTopology, + SupportsCommitter, + SupportsPreCommitTopology, + SupportsPostCommitTopology { + + private final CatalogLoader catalogLoader; + private final Map snapshotProperties; + private final String uidPrefix; + private final String sinkId; + private final Map writeProperties; + private final transient FlinkWriteConf flinkWriteConf; + private final FileFormat dataFileFormat; + private final long targetDataFileSize; + private final boolean overwriteMode; + private final int workerPoolSize; + private final int cacheMaximumSize; + + private DynamicIcebergSink( + CatalogLoader catalogLoader, + Map snapshotProperties, + String uidPrefix, + Map writeProperties, + FlinkWriteConf flinkWriteConf, + int cacheMaximumSize) { + this.catalogLoader = catalogLoader; + this.snapshotProperties = snapshotProperties; + this.uidPrefix = uidPrefix; + this.writeProperties = writeProperties; + this.flinkWriteConf = flinkWriteConf; + this.dataFileFormat = flinkWriteConf.dataFileFormat(); + this.targetDataFileSize = flinkWriteConf.targetDataFileSize(); + this.overwriteMode = flinkWriteConf.overwriteMode(); + this.workerPoolSize = flinkWriteConf.workerPoolSize(); + this.cacheMaximumSize = cacheMaximumSize; + // We generate a random UUID every time when a sink is created. + // This is used to separate files generated by different sinks writing the same table. + // Also used to generate the aggregator operator name + this.sinkId = UUID.randomUUID().toString(); + } + + @Override + public SinkWriter createWriter(WriterInitContext context) { + return new DynamicWriter( + catalogLoader.loadCatalog(), + dataFileFormat, + targetDataFileSize, + writeProperties, + cacheMaximumSize, + new DynamicWriterMetrics(context.metricGroup()), + context.getTaskInfo().getIndexOfThisSubtask(), + context.getTaskInfo().getAttemptNumber()); + } + + @Override + public Committer createCommitter(CommitterInitContext context) { + DynamicCommitterMetrics metrics = new DynamicCommitterMetrics(context.metricGroup()); + return new DynamicCommitter( + catalogLoader.loadCatalog(), + snapshotProperties, + overwriteMode, + workerPoolSize, + sinkId, + metrics); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new DynamicCommittableSerializer(); + } + + @Override + public void addPostCommitTopology( + DataStream> committables) {} + + @Override + public DataStream addPreWriteTopology( + DataStream inputDataStream) { + return distributeDataStream(inputDataStream); + } + + @Override + public DataStream> addPreCommitTopology( + DataStream> writeResults) { + TypeInformation> typeInformation = + CommittableMessageTypeInfo.of(this::getCommittableSerializer); + + return writeResults + .keyBy( + committable -> { + if (committable instanceof CommittableSummary) { + return "__summary"; + } else { + CommittableWithLineage result = + (CommittableWithLineage) committable; + return result.getCommittable().key().tableName(); + } + }) + .transform( + prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"), + typeInformation, + new DynamicWriteResultAggregator(catalogLoader)) + .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology")); + } + + @Override + public SimpleVersionedSerializer getWriteResultSerializer() { + return new DynamicWriteResultSerializer(); + } + + public static class Builder { + private DataStream input; + private DynamicRecordConverter converter; + private CatalogLoader catalogLoader; + private String uidPrefix = null; + private final Map writeOptions = Maps.newHashMap(); + private final Map snapshotSummary = Maps.newHashMap(); + private ReadableConfig readableConfig = new Configuration(); + private boolean immediateUpdate = false; + private int cacheMaximumSize = 100; + private long cacheRefreshMs = 1_000; + + private Builder() {} + + public Builder forInput(DataStream inputStream) { + this.input = inputStream; + return this; + } + + public Builder withConverter(DynamicRecordConverter inputConverter) { + this.converter = inputConverter; + return this; + } + + /** + * The catalog loader is used for loading tables in {@link DynamicCommitter} lazily, we need + * this loader because {@link Table} is not serializable and could not just use the loaded table + * from Builder#table in the remote task manager. + * + * @param newCatalogLoader to load iceberg table inside tasks. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder catalogLoader(CatalogLoader newCatalogLoader) { + this.catalogLoader = newCatalogLoader; + return this; + } + + /** + * Set the write properties for IcebergSink. View the supported properties in {@link + * FlinkWriteOptions} + */ + public Builder set(String property, String value) { + writeOptions.put(property, value); + return this; + } + + /** + * Set the write properties for IcebergSink. View the supported properties in {@link + * FlinkWriteOptions} + */ + public Builder setAll(Map properties) { + writeOptions.putAll(properties); + return this; + } + + public Builder overwrite(boolean newOverwrite) { + writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); + return this; + } + + public Builder flinkConf(ReadableConfig config) { + this.readableConfig = config; + return this; + } + + /** + * Configuring the write parallel number for iceberg stream writer. + * + * @param newWriteParallelism the number of parallel iceberg stream writer. + * @return {@link DynamicIcebergSink.Builder} to connect the iceberg table. + */ + public Builder writeParallelism(int newWriteParallelism) { + writeOptions.put( + FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); + return this; + } + + /** + * Set the uid prefix for IcebergSink operators. Note that IcebergSink internally consists of + * multiple operators (like writer, committer, aggregator) Actual operator uid will be appended + * with a suffix like "uidPrefix-writer". + * + *

If provided, this prefix is also applied to operator names. + * + *

Flink auto generates operator uid if not set explicitly. It is a recommended + * best-practice to set uid for all operators before deploying to production. Flink has an + * option to {@code pipeline.auto-generate-uid=false} to disable auto-generation and force + * explicit setting of all operator uid. + * + *

Be careful with setting this for an existing job, because now we are changing the operator + * uid from an auto-generated one to this new value. When deploying the change with a + * checkpoint, Flink won't be able to restore the previous IcebergSink operator state (more + * specifically the committer operator state). You need to use {@code --allowNonRestoredState} + * to ignore the previous sink state. During restore IcebergSink state is used to check if last + * commit was actually successful or not. {@code --allowNonRestoredState} can lead to data loss + * if the Iceberg commit failed in the last completed checkpoint. + * + * @param newPrefix prefix for Flink sink operator uid and name + * @return {@link Builder} to connect the iceberg table. + */ + public Builder uidPrefix(String newPrefix) { + this.uidPrefix = newPrefix; + return this; + } + + public Builder snapshotProperties(Map properties) { + snapshotSummary.putAll(properties); + return this; + } + + public Builder setSnapshotProperty(String property, String value) { + snapshotSummary.put(property, value); + return this; + } + + public Builder toBranch(String branch) { + writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); + return this; + } + + public Builder immediateTableUpdate(boolean newImmediateUpdate) { + this.immediateUpdate = newImmediateUpdate; + return this; + } + + /** Maximum size of the caches used in Dynamic Sink for table data and serializers. */ + public Builder cacheMaxSize(int maxSize) { + this.cacheMaximumSize = maxSize; + return this; + } + + /** Maximum interval for cache items renewals. */ + public Builder cacheRefreshMs(long refreshMs) { + this.cacheRefreshMs = refreshMs; + return this; + } + + private String operatorName(String suffix) { + return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; + } + + public DynamicIcebergSink build() { + + Preconditions.checkArgument( + converter != null, "Please use withConverter() to convert the input DataStream."); + Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); + + // Init the `flinkWriteConf` here, so we can do the checks + FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); + + Map writeProperties = + writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf); + + uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); + + // FlinkWriteConf properties needed to be set separately, so we do not have to serialize the + // full conf + return new DynamicIcebergSink( + catalogLoader, + snapshotSummary, + uidPrefix, + writeProperties, + flinkWriteConf, + cacheMaximumSize); + } + + /** + * Append the iceberg sink operators to write records to iceberg table. + * + * @return {@link DataStreamSink} for sink. + */ + public DataStreamSink append() { + DynamicRecordInternalType type = + new DynamicRecordInternalType(catalogLoader, false, cacheMaximumSize); + DynamicIcebergSink sink = build(); + SingleOutputStreamOperator converted = + input + .process( + new DynamicRecordProcessor<>( + converter, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs)) + .uid(prefixIfNotNull(uidPrefix, "-converter")) + .name(operatorName("Converter")) + .returns(type); + + DataStreamSink rowDataDataStreamSink = + converted + .getSideOutput( + new OutputTag<>( + DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize))) + .keyBy((KeySelector) DynamicRecordInternal::tableName) + .map(new DynamicTableUpdateOperator(catalogLoader, cacheMaximumSize, cacheRefreshMs)) + .uid(prefixIfNotNull(uidPrefix, "-updater")) + .name(operatorName("Updater")) + .returns(type) + .union(converted) + .sinkTo(sink) + .uid(prefixIfNotNull(uidPrefix, "-sink")); + if (sink.flinkWriteConf.writeParallelism() != null) { + rowDataDataStreamSink.setParallelism(sink.flinkWriteConf.writeParallelism()); + } + + return rowDataDataStreamSink; + } + } + + /** + * Based on the {@link FileFormat} overwrites the table level compression properties for the table + * write. + * + * @param format The FileFormat to use + * @param conf The write configuration + * @return The properties to use for writing + */ + private static Map writeProperties(FileFormat format, FlinkWriteConf conf) { + Map writeProperties = Maps.newHashMap(); + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); + String parquetCompressionLevel = conf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); + String avroCompressionLevel = conf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } + + DataStream distributeDataStream(DataStream input) { + return input.keyBy(DynamicRecordInternal::writerKey); + } + + private static String prefixIfNotNull(String uidPrefix, String suffix) { + return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; + } + + /** + * Initialize a {@link IcebergSink.Builder} to export the data from input data stream with {@link + * RowData}s into iceberg table. + * + * @param input the source input data stream with {@link RowData}s. + * @return {@link IcebergSink.Builder} to connect the iceberg table. + */ + public static Builder forInput(DataStream input) { + return new Builder().forInput(input); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java new file mode 100644 index 000000000000..612b94ffb0cf --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.Serializable; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.util.Collector; + +/** Conversion method to return input type into a DynamicRecord */ +public interface DynamicRecordConverter extends Serializable { + default void open(OpenContext openContext) throws Exception {} + + /** Takes a user-defined input type and converts it one or multiple {@link DynamicRecord}s. */ + void convert(T inputRecord, Collector out) throws Exception; +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java new file mode 100644 index 000000000000..1b09057086f0 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.flink.CatalogLoader; + +@Internal +class DynamicRecordProcessor extends ProcessFunction + implements Collector { + static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream"; + + private final DynamicRecordConverter converter; + private final CatalogLoader catalogLoader; + private final boolean immediateUpdate; + private final int cacheMaximumSize; + private final long cacheRefreshMs; + + private transient TableMetadataCache tableCache; + private transient HashKeyGenerator hashKeyGenerator; + private transient TableUpdater updater; + private transient OutputTag updateStream; + private transient Collector collector; + private transient Context context; + + DynamicRecordProcessor( + DynamicRecordConverter converter, + CatalogLoader catalogLoader, + boolean immediateUpdate, + int cacheMaximumSize, + long cacheRefreshMs) { + this.converter = converter; + this.catalogLoader = catalogLoader; + this.immediateUpdate = immediateUpdate; + this.cacheMaximumSize = cacheMaximumSize; + this.cacheRefreshMs = cacheRefreshMs; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + Catalog catalog = catalogLoader.loadCatalog(); + this.tableCache = new TableMetadataCache(catalog, cacheMaximumSize, cacheRefreshMs); + this.hashKeyGenerator = + new HashKeyGenerator( + cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks()); + if (immediateUpdate) { + updater = new TableUpdater(tableCache, catalog); + } + + updateStream = + new OutputTag<>( + DYNAMIC_TABLE_UPDATE_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; + + converter.open(openContext); + } + + @Override + public void processElement(T element, Context ctx, Collector out) + throws Exception { + this.context = ctx; + this.collector = out; + converter.convert(element, this); + } + + @Override + public void collect(DynamicRecord data) { + boolean exists = tableCache.exists(data.tableIdentifier()).f0; + String foundBranch = exists ? tableCache.branch(data.tableIdentifier(), data.branch()) : null; + + Tuple2 foundSchema = + exists + ? tableCache.schema(data.tableIdentifier(), data.schema()) + : TableMetadataCache.NOT_FOUND; + + PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null; + + if (!exists + || foundBranch == null + || foundSpec == null + || foundSchema.f1 == CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { + if (immediateUpdate) { + Tuple3 newData = + updater.update(data.tableIdentifier(), data.branch(), data.schema(), data.spec()); + emit(collector, data, newData.f0, newData.f1, newData.f2); + } else { + int writerKey = + hashKeyGenerator.generateKey( + data, + foundSchema.f0 != null ? foundSchema.f0 : data.schema(), + foundSpec != null ? foundSpec : data.spec(), + data.rowData()); + context.output( + updateStream, + new DynamicRecordInternal( + data.tableIdentifier().toString(), + data.branch(), + data.schema(), + data.rowData(), + data.spec(), + writerKey, + data.upsertMode(), + DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), data.schema()))); + } + } else { + emit(collector, data, foundSchema.f0, foundSchema.f1, foundSpec); + } + } + + private void emit( + Collector out, + DynamicRecord data, + Schema schema, + CompareSchemasVisitor.Result result, + PartitionSpec spec) { + RowData rowData = + result == CompareSchemasVisitor.Result.SAME + ? data.rowData() + : RowDataEvolver.convert(data.rowData(), data.schema(), schema); + int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData); + String tableName = data.tableIdentifier().toString(); + out.collect( + new DynamicRecordInternal( + tableName, + data.branch(), + schema, + rowData, + spec, + writerKey, + data.upsertMode(), + DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema))); + } + + @Override + public void close() {} +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 6cb1f4608983..d0909e0605d4 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -77,8 +77,7 @@ int generateKey( DynamicRecord dynamicRecord, @Nullable Schema tableSchema, @Nullable PartitionSpec tableSpec, - @Nullable RowData overrideRowData) - throws Exception { + @Nullable RowData overrideRowData) { String tableIdent = dynamicRecord.tableIdentifier().toString(); SelectorKey cacheKey = new SelectorKey( @@ -89,8 +88,8 @@ int generateKey( dynamicRecord.schema(), dynamicRecord.spec(), dynamicRecord.equalityFields()); - return keySelectorCache - .get( + KeySelector keySelector = + keySelectorCache.get( cacheKey, k -> getKeySelector( @@ -101,8 +100,13 @@ int generateKey( dynamicRecord.distributionMode(), DistributionMode.NONE), MoreObjects.firstNonNull( dynamicRecord.equalityFields(), Collections.emptySet()), - dynamicRecord.writeParallelism())) - .getKey(overrideRowData != null ? overrideRowData : dynamicRecord.rowData()); + dynamicRecord.writeParallelism())); + try { + return keySelector.getKey( + overrideRowData != null ? overrideRowData : dynamicRecord.rowData()); + } catch (Exception e) { + throw new RuntimeException(e); + } } private KeySelector getKeySelector( diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 7f53215a5e88..0d39a665cfe8 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -82,12 +82,19 @@ private SimpleDataUtil() {} Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())); + public static final Schema SCHEMA2 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + public static final TableSchema FLINK_SCHEMA = TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build(); public static final RowType ROW_TYPE = (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(); public static final Record RECORD = GenericRecord.create(SCHEMA); + public static final Record RECORD2 = GenericRecord.create(SCHEMA2); public static Table createTable( String path, Map properties, boolean partitioned) { @@ -107,6 +114,14 @@ public static Record createRecord(Integer id, String data) { return record; } + public static Record createRecord(Integer id, String data, String extra) { + Record record = RECORD2.copy(); + record.setField("id", id); + record.setField("data", data); + record.setField("extra", extra); + return record; + } + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } @@ -224,7 +239,12 @@ private static List convertToRecords(List rows) { for (RowData row : rows) { Integer id = row.isNullAt(0) ? null : row.getInt(0); String data = row.isNullAt(1) ? null : row.getString(1).toString(); - records.add(createRecord(id, data)); + if (row.getArity() == 2) { + records.add(createRecord(id, data)); + } else { + String extra = row.isNullAt(2) ? null : row.getString(2).toString(); + records.add(createRecord(id, data, extra)); + } } return records; } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 960849fb4feb..d8d3c5dc249b 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -132,7 +132,7 @@ public static List convertRowDataToRow(List rowDataList, RowType r .collect(Collectors.toList()); } - private static List convertRecordToRow(List expectedRecords, Schema schema) { + public static List convertRecordToRow(List expectedRecords, Schema schema) { List expected = Lists.newArrayList(); @SuppressWarnings("unchecked") DataStructureConverter converter = diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java index 9513cd1e4808..de098f826d7d 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.flink.TestFixtures.DATABASE; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -64,8 +65,8 @@ public class TestFlinkIcebergSinkBase { protected Table table; protected StreamExecutionEnvironment env; - protected BoundedTestSource createBoundedSource(List rows) { - return new BoundedTestSource<>(rows.toArray(new Row[0])); + protected BoundedTestSource createBoundedSource(List rows) { + return new BoundedTestSource<>(Collections.singletonList(rows)); } protected List createRows(String prefix) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java new file mode 100644 index 000000000000..5d0317fa89e4 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.legacy.api.TableSchema; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { + + private static long seed; + + @BeforeEach + void before() { + env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(2); + seed = 0; + } + + private static class DynamicIcebergDataImpl implements Serializable { + Row rowProvided; + Row rowExpected; + Schema schemaProvided; + Schema schemaExpected; + String tableName; + String branch; + PartitionSpec partitionSpec; + boolean upsertMode; + Set equalityFields; + + private DynamicIcebergDataImpl( + Schema schemaProvided, String tableName, String branch, PartitionSpec partitionSpec) { + this( + schemaProvided, + schemaProvided, + tableName, + branch, + partitionSpec, + false, + Collections.emptySet(), + false); + } + + private DynamicIcebergDataImpl( + Schema schemaProvided, + Schema schemaExpected, + String tableName, + String branch, + PartitionSpec partitionSpec) { + this( + schemaProvided, + schemaExpected, + tableName, + branch, + partitionSpec, + false, + Collections.emptySet(), + false); + } + + private DynamicIcebergDataImpl( + Schema schemaProvided, + String tableName, + String branch, + PartitionSpec partitionSpec, + boolean upsertMode, + Set equalityFields, + boolean isDuplicate) { + this( + schemaProvided, + schemaProvided, + tableName, + branch, + partitionSpec, + upsertMode, + equalityFields, + isDuplicate); + } + + private DynamicIcebergDataImpl( + Schema schemaProvided, + Schema schemaExpected, + String tableName, + String branch, + PartitionSpec partitionSpec, + boolean upsertMode, + Set equalityFields, + boolean isDuplicate) { + this.rowProvided = randomRow(schemaProvided, isDuplicate ? seed : ++seed); + this.rowExpected = isDuplicate ? null : rowProvided; + this.schemaProvided = schemaProvided; + this.schemaExpected = schemaExpected; + this.tableName = tableName; + this.branch = branch; + this.partitionSpec = partitionSpec; + this.upsertMode = upsertMode; + this.equalityFields = equalityFields; + } + } + + private static class Converter implements DynamicRecordConverter { + + @Override + public void convert(DynamicIcebergDataImpl row, Collector out) { + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, row.tableName); + String branch = row.branch; + Schema schema = row.schemaProvided; + PartitionSpec spec = row.partitionSpec; + DynamicRecord dynamicRecord = + new DynamicRecord( + tableIdentifier, + branch, + schema, + converter(schema).toInternal(row.rowProvided), + spec, + spec.isPartitioned() ? DistributionMode.HASH : DistributionMode.NONE, + 10); + dynamicRecord.setUpsertMode(row.upsertMode); + dynamicRecord.setEqualityFields(row.equalityFields); + out.collect(dynamicRecord); + } + } + + private static DataFormatConverters.RowConverter converter(Schema schema) { + RowType rowType = FlinkSchemaUtil.convert(schema); + TableSchema tableSchema = FlinkSchemaUtil.toSchema(rowType); + return new DataFormatConverters.RowConverter(tableSchema.getFieldDataTypes()); + } + + @Test + void testWrite() throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testWritePartitioned() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec)); + + runTest(rows); + } + + @Test + void testWritePartitionedAdjustSchemaIdsInSpec() throws Exception { + Schema schema = + new Schema( + // Use zero-based schema field ids + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id", 10).build(); + Schema schema2 = + new Schema( + // Use zero-based schema field ids + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get()), + Types.NestedField.optional(2, "extra", Types.StringType.get())); + PartitionSpec spec2 = PartitionSpec.builderFor(schema2).bucket("extra", 23).build(); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(schema, "t1", "main", spec), + new DynamicIcebergDataImpl(schema, "t1", "main", spec), + new DynamicIcebergDataImpl(schema, "t1", "main", spec), + new DynamicIcebergDataImpl(schema2, "t1", "main", spec2), + new DynamicIcebergDataImpl(schema2, "t1", "main", spec2)); + + runTest(rows); + } + + @Test + void testSchemaEvolutionFieldOrderChanges() throws Exception { + Schema schema = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "data", Types.StringType.get())); + Schema expectedSchema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + Schema schema2 = + new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required(1, "extra", Types.StringType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + Schema expectedSchema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + schema, expectedSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema, expectedSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema, expectedSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema2, expectedSchema2, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + schema2, expectedSchema2, "t1", "main", PartitionSpec.unpartitioned())); + + for (DynamicIcebergDataImpl row : rows) { + if (row.schemaExpected == expectedSchema) { + // We manually adjust the expected Row to match the second expected schema + row.rowExpected = Row.of(row.rowProvided.getField(0), null, row.rowProvided.getField(1)); + } + } + + runTest(rows); + } + + @Test + void testMultipleTables() throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testMultipleTablesPartitioned() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t2", "main", spec)); + + runTest(rows); + } + + @Test + void testSchemaEvolutionAddField() throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows, this.env, 1); + } + + @Test + void testRowEvolutionNullMissingOptionalField() throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows, this.env, 1); + } + + @Test + void testSchemaEvolutionNonBackwardsCompatible() throws Exception { + Schema backwardsIncompatibleSchema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + // Required column is missing in this schema + Schema erroringSchema = + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + backwardsIncompatibleSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + erroringSchema, "t1", "main", PartitionSpec.unpartitioned())); + + try { + runTest(rows, StreamExecutionEnvironment.getExecutionEnvironment(), 1); + fail(); + } catch (JobExecutionException e) { + assertThat( + ExceptionUtils.findThrowable( + e, + t -> + t.getMessage() + .contains( + "Field 2 in target schema ROW<`id` INT NOT NULL, `data` STRING NOT NULL> is non-nullable but does not exist in source schema."))) + .isNotEmpty(); + } + } + + @Test + void testPartitionSpecEvolution() throws Exception { + PartitionSpec spec1 = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + PartitionSpec spec2 = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 5).identity("data").build(); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec1), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2)); + + runTest(rows); + } + + @Test + void testMultipleBranches() throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "branch1", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testWriteDynamicRowData() throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA2, "t2", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned())); + + runTest(rows); + } + + @Test + void testUpsert() throws Exception { + List rows = + Lists.newArrayList( + // Insert one rows + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + false), + // Remaining rows are duplicates + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true)); + + executeDynamicSink(rows, env, true, 1); + + try (CloseableIterable iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + List records = Lists.newArrayList(); + for (Record record : iterable) { + records.add(record); + } + + assertThat(records.size()).isEqualTo(1); + Record actual = records.get(0); + DynamicIcebergDataImpl input = rows.get(0); + assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1)); + // There is an additional _pos field which gets added + } + } + + private void runTest(List dynamicData) throws Exception { + runTest(dynamicData, this.env, 2); + } + + private void runTest( + List dynamicData, StreamExecutionEnvironment env, int parallelism) + throws Exception { + runTest(dynamicData, env, true, parallelism); + runTest(dynamicData, env, false, parallelism); + } + + private void runTest( + List dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism) + throws Exception { + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism); + verifyResults(dynamicData); + } + + private void executeDynamicSink( + List dynamicData, + StreamExecutionEnvironment env, + boolean immediateUpdate, + int parallelism) + throws Exception { + DataStream dataStream = + env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(parallelism); + + DynamicIcebergSink.forInput(dataStream) + .withConverter(new Converter()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(parallelism) + .immediateTableUpdate(immediateUpdate) + .append(); + + // Write the data + env.execute("Test Iceberg DataStream"); + } + + private void verifyResults(List dynamicData) throws IOException { + // Calculate the expected result + Map, List> expectedData = Maps.newHashMap(); + Map expectedSchema = Maps.newHashMap(); + dynamicData.forEach( + r -> { + Schema oldSchema = expectedSchema.get(r.tableName); + if (oldSchema == null || oldSchema.columns().size() < r.schemaProvided.columns().size()) { + expectedSchema.put(r.tableName, r.schemaExpected); + } + }); + + dynamicData.forEach( + r -> { + List data = + expectedData.computeIfAbsent( + Tuple2.of(r.tableName, r.branch), unused -> Lists.newArrayList()); + data.addAll( + convertToRowData(expectedSchema.get(r.tableName), ImmutableList.of(r.rowExpected))); + }); + + // Check the expected result + int count = dynamicData.size(); + for (Map.Entry, List> e : expectedData.entrySet()) { + SimpleDataUtil.assertTableRows( + CATALOG_EXTENSION + .catalogLoader() + .loadCatalog() + .loadTable(TableIdentifier.of(DATABASE, e.getKey().f0)), + e.getValue(), + e.getKey().f1); + count -= e.getValue().size(); + } + + // Found every record + assertThat(count).isZero(); + } + + private List convertToRowData(Schema schema, List rows) { + DataFormatConverters.RowConverter converter = converter(schema); + return rows.stream() + .map( + r -> { + Row updateRow = r; + // We need conversion to generate the missing columns + if (r.getArity() != schema.columns().size()) { + updateRow = new Row(schema.columns().size()); + for (int i = 0; i < r.getArity(); ++i) { + updateRow.setField(i, r.getField(i)); + } + } + return converter.toInternal(updateRow); + }) + .collect(Collectors.toList()); + } + + private static Row randomRow(Schema schema, long seedOverride) { + return TestHelpers.convertRecordToRow( + RandomGenericData.generate(schema, 1, seedOverride), schema) + .get(0); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java new file mode 100644 index 000000000000..f512dedb203d --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE; +import static org.apache.iceberg.flink.sink.dynamic.DynamicCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; + +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Disabled("Please enable manually for performance testing.") +class TestDynamicIcebergSinkPerf { + private static final Logger LOG = LoggerFactory.getLogger(TestDynamicIcebergSinkPerf.class); + + @RegisterExtension + protected static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(DATABASE, TABLE); + + private static final int SAMPLE_SIZE = 50_000; + private static final int RECORD_SIZE = 5_000_000; + private static final int TABLE_NUM = 3; + private static final int PARALLELISM = 2; + private static final int WRITE_PARALLELISM = 2; + private static final TableIdentifier[] IDENTIFIERS = new TableIdentifier[TABLE_NUM]; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name2", Types.StringType.get()), + Types.NestedField.required(3, "name3", Types.StringType.get()), + Types.NestedField.required(4, "name4", Types.StringType.get()), + Types.NestedField.required(5, "name5", Types.StringType.get()), + Types.NestedField.required(6, "name6", Types.StringType.get()), + Types.NestedField.required(7, "name7", Types.StringType.get()), + Types.NestedField.required(8, "name8", Types.StringType.get()), + Types.NestedField.required(9, "name9", Types.StringType.get())); + private static final List RANGE = + IntStream.range(0, RECORD_SIZE).boxed().collect(Collectors.toList()); + + private static List rows; + private StreamExecutionEnvironment env; + + @BeforeEach + void before() { + for (int i = 0; i < TABLE_NUM; ++i) { + // So the table name hash difference is bigger than 1 + IDENTIFIERS[i] = TableIdentifier.of(DATABASE, TABLE + "_" + (i * 13)); + + Table table = + CATALOG_EXTENSION + .catalog() + .createTable( + IDENTIFIERS[i], + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(MAX_CONTINUOUS_EMPTY_COMMITS, "100000")); + + table.manageSnapshots().createBranch("main").commit(); + } + + List records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L); + rows = Lists.newArrayListWithCapacity(records.size()); + for (int i = 0; i < records.size(); ++i) { + rows.add( + new DynamicRecord( + IDENTIFIERS[i % TABLE_NUM], + "main", + SCHEMA, + RowDataConverter.convert(SCHEMA, records.get(i)), + PartitionSpec.unpartitioned(), + DistributionMode.NONE, + WRITE_PARALLELISM)); + } + + Configuration configuration = MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG; + configuration.setString("rest.flamegraph.enabled", "true"); + env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration) + .enableCheckpointing(100) + .setParallelism(PARALLELISM) + .setMaxParallelism(PARALLELISM); + env.getConfig().enableObjectReuse(); + } + + @AfterEach + void after() { + for (TableIdentifier identifier : IDENTIFIERS) { + CATALOG_EXTENSION.catalog().dropTable(identifier); + } + } + + private static class IdBasedConverter implements DynamicRecordConverter { + + @Override + public void convert(Integer id, Collector out) { + out.collect(rows.get(id % SAMPLE_SIZE)); + } + } + + @Test + void testDynamicSink() throws Exception { + // So we make sure that the writer threads are the same for the 2 tests + env.setMaxParallelism(PARALLELISM * TABLE_NUM * 2); + env.setParallelism(PARALLELISM * TABLE_NUM * 2); + runTest( + s -> { + DynamicIcebergSink.forInput(s) + .withConverter(new IdBasedConverter()) + .immediateTableUpdate(true) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .append(); + }); + } + + @Test + void testIcebergSink() throws Exception { + runTest( + s -> { + for (int i = 0; i < IDENTIFIERS.length; ++i) { + TableLoader tableLoader = + TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), IDENTIFIERS[i]); + final int finalInt = i; + IcebergSink.forRowData( + s.flatMap( + (FlatMapFunction) + (input, collector) -> { + if (input % TABLE_NUM == finalInt) { + collector.collect(rows.get(input % SAMPLE_SIZE).rowData()); + } + }) + .returns(InternalTypeInfo.of(FlinkSchemaUtil.convert(SCHEMA))) + .rebalance()) + .tableLoader(tableLoader) + .uidSuffix("Uid" + i) + .writeParallelism(WRITE_PARALLELISM) + .append(); + } + }); + } + + private void runTest(Consumer> sink) throws Exception { + DataStream dataStream = + env.addSource( + new BoundedTestSource<>( + ImmutableList.of( + RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE, RANGE), + true), + TypeInformation.of(Integer.class)); + + sink.accept(dataStream); + + long before = System.currentTimeMillis(); + env.execute(); + + for (TableIdentifier identifier : IDENTIFIERS) { + Table table = CATALOG_EXTENSION.catalog().loadTable(identifier); + for (Snapshot snapshot : table.snapshots()) { + long records = 0; + for (DataFile dataFile : snapshot.addedDataFiles(table.io())) { + records += dataFile.recordCount(); + } + + LOG.info( + "TEST RESULT: For table {} snapshot {} written {} records in {} ms", + identifier, + snapshot.snapshotId(), + records, + snapshot.timestampMillis() - before); + before = snapshot.timestampMillis(); + } + } + } +} From bec912fb6a2cf279693d6870697a5e6a756ede8d Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 13 Jun 2025 10:39:23 +0200 Subject: [PATCH 02/15] Rename DynamicRecordConverter to DynamicRecordGenerator --- .../flink/sink/dynamic/DynamicIcebergSink.java | 14 +++++++------- ...dConverter.java => DynamicRecordGenerator.java} | 6 +++--- .../flink/sink/dynamic/DynamicRecordProcessor.java | 10 +++++----- .../flink/sink/dynamic/TestDynamicIcebergSink.java | 4 ++-- .../sink/dynamic/TestDynamicIcebergSinkPerf.java | 4 ++-- 5 files changed, 19 insertions(+), 19 deletions(-) rename flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/{DynamicRecordConverter.java => DynamicRecordGenerator.java} (82%) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 1cdf10e13b3b..4b04aaa05b1f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -184,7 +184,7 @@ public SimpleVersionedSerializer getWriteResultSerializer() public static class Builder { private DataStream input; - private DynamicRecordConverter converter; + private DynamicRecordGenerator generator; private CatalogLoader catalogLoader; private String uidPrefix = null; private final Map writeOptions = Maps.newHashMap(); @@ -201,8 +201,8 @@ public Builder forInput(DataStream inputStream) { return this; } - public Builder withConverter(DynamicRecordConverter inputConverter) { - this.converter = inputConverter; + public Builder withGenerator(DynamicRecordGenerator inputGenerator) { + this.generator = inputGenerator; return this; } @@ -327,7 +327,7 @@ private String operatorName(String suffix) { public DynamicIcebergSink build() { Preconditions.checkArgument( - converter != null, "Please use withConverter() to convert the input DataStream."); + generator != null, "Please use withGenerator() to convert the input DataStream."); Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); // Init the `flinkWriteConf` here, so we can do the checks @@ -362,9 +362,9 @@ public DataStreamSink append() { input .process( new DynamicRecordProcessor<>( - converter, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs)) - .uid(prefixIfNotNull(uidPrefix, "-converter")) - .name(operatorName("Converter")) + generator, catalogLoader, immediateUpdate, cacheMaximumSize, cacheRefreshMs)) + .uid(prefixIfNotNull(uidPrefix, "-generator")) + .name(operatorName("generator")) .returns(type); DataStreamSink rowDataDataStreamSink = diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java similarity index 82% rename from flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java rename to flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java index 612b94ffb0cf..0681bea05919 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordConverter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java @@ -22,10 +22,10 @@ import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.util.Collector; -/** Conversion method to return input type into a DynamicRecord */ -public interface DynamicRecordConverter extends Serializable { +/** A generator to yield {@link DynamicRecord} from the provided input. */ +public interface DynamicRecordGenerator extends Serializable { default void open(OpenContext openContext) throws Exception {} - /** Takes a user-defined input type and converts it one or multiple {@link DynamicRecord}s. */ + /** Takes the user-defined input and yields zero, one, or multiple {@link DynamicRecord}s. */ void convert(T inputRecord, Collector out) throws Exception; } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 1b09057086f0..efff3a69aa0f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -36,7 +36,7 @@ class DynamicRecordProcessor extends ProcessFunction { static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream"; - private final DynamicRecordConverter converter; + private final DynamicRecordGenerator generator; private final CatalogLoader catalogLoader; private final boolean immediateUpdate; private final int cacheMaximumSize; @@ -50,12 +50,12 @@ class DynamicRecordProcessor extends ProcessFunction converter, + DynamicRecordGenerator generator, CatalogLoader catalogLoader, boolean immediateUpdate, int cacheMaximumSize, long cacheRefreshMs) { - this.converter = converter; + this.generator = generator; this.catalogLoader = catalogLoader; this.immediateUpdate = immediateUpdate; this.cacheMaximumSize = cacheMaximumSize; @@ -79,7 +79,7 @@ public void open(OpenContext openContext) throws Exception { DYNAMIC_TABLE_UPDATE_STREAM, new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; - converter.open(openContext); + generator.open(openContext); } @Override @@ -87,7 +87,7 @@ public void processElement(T element, Context ctx, Collector { + private static class Generator implements DynamicRecordGenerator { @Override public void convert(DynamicIcebergDataImpl row, Collector out) { @@ -519,7 +519,7 @@ private void executeDynamicSink( env.setParallelism(parallelism); DynamicIcebergSink.forInput(dataStream) - .withConverter(new Converter()) + .withGenerator(new Generator()) .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java index f512dedb203d..a4ab17b51f40 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -142,7 +142,7 @@ void after() { } } - private static class IdBasedConverter implements DynamicRecordConverter { + private static class IdBasedGenerator implements DynamicRecordGenerator { @Override public void convert(Integer id, Collector out) { @@ -158,7 +158,7 @@ void testDynamicSink() throws Exception { runTest( s -> { DynamicIcebergSink.forInput(s) - .withConverter(new IdBasedConverter()) + .withGenerator(new IdBasedGenerator()) .immediateTableUpdate(true) .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .append(); From b53fe61c2242b96eb88b6bae94dc46f44d5b943f Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 13 Jun 2025 11:07:12 +0200 Subject: [PATCH 03/15] Add JavaDoc for TestDynamicIcebergSinkPerf --- .../dynamic/TestDynamicIcebergSinkPerf.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java index a4ab17b51f40..f70b5401261f 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -62,6 +62,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Performance test class to compare {@link DynamicIcebergSink} against {@link IcebergSink} to + * measure and compare their throughput. + * + *

The test dynamically generates input for multiple tables, then writes to these tables. For the + * DynamicSink, a single sink is used to write all tables. For the IcebergSink, one sink is used per + * table. The test logs the written record counts and elapsed time based on the Iceberg snapshot + * metadata. + * + *

Usage

+ * + *
    + *
  • Set the SAMPLE_SIZE, RECORD_SIZE, and TABLE_NUM. + *
  • Run the unit tests and review logs for performance results. + *
+ * + *

Note: This test is disabled by default and should be enabled manually when performance testing + * is needed. It is not intended as a standard unit test. + */ @Disabled("Please enable manually for performance testing.") class TestDynamicIcebergSinkPerf { private static final Logger LOG = LoggerFactory.getLogger(TestDynamicIcebergSinkPerf.class); From 94179680ce59ace88578adda68d6786d93c6873f Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 13 Jun 2025 11:08:42 +0200 Subject: [PATCH 04/15] ol => ul --- .../apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 4b04aaa05b1f..550575e1d9b6 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -64,11 +64,11 @@ /** * Dynamic version of the IcebergSink which supports: * - *

    + *
      *
    • Writing to any number of tables (No more 1:1 sink/topic relationship). *
    • Creating and updating tables based on the user-supplied routing. *
    • Updating the schema and partition spec of tables based on the user-supplied specification. - *
+ * */ @Experimental public class DynamicIcebergSink From ff7b18aaa3ddac98772316a4291e9b1cca1affb4 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 13 Jun 2025 17:04:38 +0200 Subject: [PATCH 05/15] Reduce visibility of build method (users use append() only) --- .../apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 550575e1d9b6..193736b3401e 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -324,7 +324,7 @@ private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } - public DynamicIcebergSink build() { + DynamicIcebergSink build() { Preconditions.checkArgument( generator != null, "Please use withGenerator() to convert the input DataStream."); From b66e991a3de691b5a100d5a98214cd8deed317f0 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 13 Jun 2025 17:54:56 +0200 Subject: [PATCH 06/15] Rename test --- .../iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 8815c9c59f70..ca35dd7bb3d3 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -403,7 +403,7 @@ void testMultipleBranches() throws Exception { } @Test - void testWriteDynamicRowData() throws Exception { + void testWriteMultipleTablesWithSchemaChanges() throws Exception { List rows = Lists.newArrayList( new DynamicIcebergDataImpl( From 3ef4d874f3392424239aef79df48bd40cbd28e52 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 11:21:23 +0200 Subject: [PATCH 07/15] Add tests testCommitFailedBeforeOrAfterCommit / testCommitConcurrency --- .../flink/sink/dynamic/DynamicCommitter.java | 3 +- .../sink/dynamic/DynamicIcebergSink.java | 18 +- .../sink/dynamic/TestDynamicIcebergSink.java | 257 +++++++++++++++++- 3 files changed, 260 insertions(+), 18 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 3e051dc5d549..8f9ce802d150 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -325,7 +325,8 @@ private void commitDeltaTxn( } } - private void commitOperation( + @VisibleForTesting + void commitOperation( Table table, String branch, SnapshotUpdate operation, diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 193736b3401e..b735de73af26 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.UUID; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; @@ -90,7 +91,7 @@ public class DynamicIcebergSink private final int workerPoolSize; private final int cacheMaximumSize; - private DynamicIcebergSink( + DynamicIcebergSink( CatalogLoader catalogLoader, Map snapshotProperties, String uidPrefix, @@ -194,7 +195,7 @@ public static class Builder { private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; - private Builder() {} + Builder() {} public Builder forInput(DataStream inputStream) { this.input = inputStream; @@ -324,22 +325,23 @@ private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } - DynamicIcebergSink build() { + private DynamicIcebergSink build() { Preconditions.checkArgument( generator != null, "Please use withGenerator() to convert the input DataStream."); Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); - // Init the `flinkWriteConf` here, so we can do the checks FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); - Map writeProperties = writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf); - uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); - // FlinkWriteConf properties needed to be set separately, so we do not have to serialize the - // full conf + return instantiateSink(writeProperties, flinkWriteConf); + } + + @VisibleForTesting + DynamicIcebergSink instantiateSink( + Map writeProperties, FlinkWriteConf flinkWriteConf) { return new DynamicIcebergSink( catalogLoader, snapshotSummary, diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index ca35dd7bb3d3..05b2fd55e49b 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -24,14 +24,21 @@ import java.io.IOException; import java.io.Serializable; +import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -42,18 +49,28 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotUpdate; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.flink.sink.CommitSummary; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.inmemory.InMemoryInputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -467,7 +484,7 @@ void testUpsert() throws Exception { Sets.newHashSet("id"), true)); - executeDynamicSink(rows, env, true, 1); + executeDynamicSink(rows, env, true, 1, null); try (CloseableIterable iterable = IcebergGenerics.read( @@ -487,6 +504,118 @@ void testUpsert() throws Exception { } } + @Test + void testCommitFailedBeforeOrAfterCommit() throws Exception { + // Configure a Restart strategy to allow recovery + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 2); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ZERO); + env.configure(configuration); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); + + FailBeforeAndAfterCommit.reset(); + final CommitHook commitHook = new FailBeforeAndAfterCommit(); + assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse(); + + executeDynamicSink(rows, env, true, 1, commitHook); + + assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue(); + assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue(); + } + + @Test + void testCommitConcurrency() throws Exception { + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, "t2", "main", PartitionSpec.unpartitioned())); + + TableIdentifier tableIdentifier = TableIdentifier.of("default", "t1"); + Catalog catalog = CATALOG_EXTENSION.catalog(); + catalog.createTable(tableIdentifier, new Schema()); + + final CommitHook commitHook = new AppendRightBeforeCommit(tableIdentifier.toString()); + + executeDynamicSink(rows, env, true, 1, commitHook); + } + + interface CommitHook extends Serializable { + void beforeCommit(); + + void duringCommit(); + + void afterCommit(); + } + + private static class FailBeforeAndAfterCommit implements CommitHook { + + static boolean failedBeforeCommit; + static boolean failedAfterCommit; + + @Override + public void beforeCommit() { + if (!failedBeforeCommit) { + failedBeforeCommit = true; + throw new RuntimeException("Failing before commit"); + } + } + + @Override + public void duringCommit() {} + + @Override + public void afterCommit() { + if (!failedAfterCommit) { + failedAfterCommit = true; + throw new RuntimeException("Failing before commit"); + } + } + + static void reset() { + failedBeforeCommit = false; + failedAfterCommit = false; + } + } + + private static class AppendRightBeforeCommit implements CommitHook { + + final String tableIdentifier; + + private AppendRightBeforeCommit(String tableIdentifier) { + this.tableIdentifier = tableIdentifier; + } + + @Override + public void beforeCommit() {} + + @Override + public void duringCommit() { + // Create a conflict + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(new InMemoryInputFile(new byte[] {1, 2, 3})) + .withFormat(FileFormat.AVRO) + .withRecordCount(3) + .build(); + table.newAppend().appendFile(dataFile).commit(); + } + + @Override + public void afterCommit() {} + } + private void runTest(List dynamicData) throws Exception { runTest(dynamicData, this.env, 2); } @@ -504,7 +633,7 @@ private void runTest( boolean immediateUpdate, int parallelism) throws Exception { - executeDynamicSink(dynamicData, env, immediateUpdate, parallelism); + executeDynamicSink(dynamicData, env, immediateUpdate, parallelism, null); verifyResults(dynamicData); } @@ -512,23 +641,133 @@ private void executeDynamicSink( List dynamicData, StreamExecutionEnvironment env, boolean immediateUpdate, - int parallelism) + int parallelism, + @Nullable CommitHook commitHook) throws Exception { DataStream dataStream = env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new TypeHint<>() {})); env.setParallelism(parallelism); - DynamicIcebergSink.forInput(dataStream) - .withGenerator(new Generator()) - .catalogLoader(CATALOG_EXTENSION.catalogLoader()) - .writeParallelism(parallelism) - .immediateTableUpdate(immediateUpdate) - .append(); + if (commitHook != null) { + // Sink failingDynamicIcebergSink = + new CommitHookEnabledDynamicIcebergSink(commitHook) + .forInput(dataStream) + .withGenerator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(parallelism) + .immediateTableUpdate(immediateUpdate) + .setSnapshotProperty("commit.retry.num-retries", "0") + .append(); + } else { + DynamicIcebergSink.forInput(dataStream) + .withGenerator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(parallelism) + .immediateTableUpdate(immediateUpdate) + .append(); + } // Write the data env.execute("Test Iceberg DataStream"); } + static class CommitHookEnabledDynamicIcebergSink extends DynamicIcebergSink.Builder { + private final CommitHook commitHook; + + CommitHookEnabledDynamicIcebergSink(CommitHook commitHook) { + this.commitHook = commitHook; + } + + @Override + DynamicIcebergSink instantiateSink( + Map writeProperties, FlinkWriteConf flinkWriteConf) { + return new CommitHookDynamicIcebergSink( + commitHook, + CATALOG_EXTENSION.catalogLoader(), + Collections.emptyMap(), + "uidPrefix", + writeProperties, + flinkWriteConf, + 100); + } + } + + static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { + + private final CommitHook commitHook; + + CommitHookDynamicIcebergSink( + CommitHook commitHook, + CatalogLoader catalogLoader, + Map snapshotProperties, + String uidPrefix, + Map writeProperties, + FlinkWriteConf flinkWriteConf, + int cacheMaximumSize) { + super( + catalogLoader, + snapshotProperties, + uidPrefix, + writeProperties, + flinkWriteConf, + cacheMaximumSize); + this.commitHook = commitHook; + } + + @Override + public Committer createCommitter(CommitterInitContext context) { + // return super.createCommitter(context); + return new CommitHookEnabledDynamicCommitter( + commitHook, + CATALOG_EXTENSION.catalogLoader().loadCatalog(), + Collections.emptyMap(), + false, + 10, + "sinkId", + new DynamicCommitterMetrics(context.metricGroup())); + } + } + + static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { + private final CommitHook commitHook; + + CommitHookEnabledDynamicCommitter( + CommitHook commitHook, + Catalog catalog, + Map snapshotProperties, + boolean replacePartitions, + int workerPoolSize, + String sinkId, + DynamicCommitterMetrics committerMetrics) { + super( + catalog, snapshotProperties, replacePartitions, workerPoolSize, sinkId, committerMetrics); + this.commitHook = commitHook; + } + + @Override + public void commit(Collection> commitRequests) + throws IOException, InterruptedException { + commitHook.beforeCommit(); + super.commit(commitRequests); + commitHook.afterCommit(); + } + + @Override + void commitOperation( + Table table, + String branch, + SnapshotUpdate operation, + CommitSummary summary, + String description, + String newFlinkJobId, + String operatorId, + long checkpointId) { + commitHook.duringCommit(); + super.commitOperation( + table, branch, operation, summary, description, newFlinkJobId, operatorId, checkpointId); + } + } + private void verifyResults(List dynamicData) throws IOException { // Calculate the expected result Map, List> expectedData = Maps.newHashMap(); From 0ed61d64b9fe1f99e73d292a98e81a58915963b2 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 11:33:57 +0200 Subject: [PATCH 08/15] Rename generator method --- .../apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java | 2 +- .../iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java | 4 ++-- .../flink/sink/dynamic/TestDynamicIcebergSinkPerf.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index b735de73af26..6e749757a735 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -202,7 +202,7 @@ public Builder forInput(DataStream inputStream) { return this; } - public Builder withGenerator(DynamicRecordGenerator inputGenerator) { + public Builder generator(DynamicRecordGenerator inputGenerator) { this.generator = inputGenerator; return this; } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 05b2fd55e49b..1bd582cf6f6f 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -652,7 +652,7 @@ private void executeDynamicSink( // Sink failingDynamicIcebergSink = new CommitHookEnabledDynamicIcebergSink(commitHook) .forInput(dataStream) - .withGenerator(new Generator()) + .generator(new Generator()) .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) @@ -660,7 +660,7 @@ private void executeDynamicSink( .append(); } else { DynamicIcebergSink.forInput(dataStream) - .withGenerator(new Generator()) + .generator(new Generator()) .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .writeParallelism(parallelism) .immediateTableUpdate(immediateUpdate) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java index f70b5401261f..6e943efb6230 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -177,7 +177,7 @@ void testDynamicSink() throws Exception { runTest( s -> { DynamicIcebergSink.forInput(s) - .withGenerator(new IdBasedGenerator()) + .generator(new IdBasedGenerator()) .immediateTableUpdate(true) .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .append(); From 828b7acdcf9c3982ff56361dd73eedc581ad17f4 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 11:39:30 +0200 Subject: [PATCH 09/15] Remove duplicate code to a util class --- .../apache/iceberg/flink/sink/FlinkSink.java | 49 +-------------- .../iceberg/flink/sink/IcebergSink.java | 49 +-------------- .../apache/iceberg/flink/sink/SinkUtil.java | 59 ++++++++++++++++++- .../sink/dynamic/DynamicIcebergSink.java | 49 +-------------- 4 files changed, 62 insertions(+), 144 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index c42e4a015ba3..8da97df037de 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -18,12 +18,6 @@ */ package org.apache.iceberg.flink.sink; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import java.io.IOException; @@ -722,51 +716,10 @@ static IcebergStreamWriter createStreamWriter( flinkRowType, flinkWriteConf.targetDataFileSize(), format, - writeProperties(initTable, format, flinkWriteConf), + SinkUtil.writeProperties(format, flinkWriteConf, initTable), equalityFieldIds, flinkWriteConf.upsertMode()); return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory); } - - /** - * Based on the {@link FileFormat} overwrites the table level compression properties for the table - * write. - * - * @param table The table to get the table level settings - * @param format The FileFormat to use - * @param conf The write configuration - * @return The properties to use for writing - */ - private static Map writeProperties( - Table table, FileFormat format, FlinkWriteConf conf) { - Map writeProperties = Maps.newHashMap(table.properties()); - - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); - String parquetCompressionLevel = conf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); - String avroCompressionLevel = conf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } - - return writeProperties; - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index a2aec53a7415..58d33b654882 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -18,12 +18,6 @@ */ package org.apache.iceberg.flink.sink; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import java.io.IOException; @@ -648,7 +642,7 @@ IcebergSink build() { table, snapshotSummary, uidSuffix, - writeProperties(table, flinkWriteConf.dataFileFormat(), flinkWriteConf), + SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table), toFlinkRowType(table.schema(), tableSchema), tableSupplier, flinkWriteConf, @@ -729,47 +723,6 @@ private static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema } } - /** - * Based on the {@link FileFormat} overwrites the table level compression properties for the table - * write. - * - * @param table The table to get the table level settings - * @param format The FileFormat to use - * @param conf The write configuration - * @return The properties to use for writing - */ - private static Map writeProperties( - Table table, FileFormat format, FlinkWriteConf conf) { - Map writeProperties = Maps.newHashMap(table.properties()); - - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); - String parquetCompressionLevel = conf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); - String avroCompressionLevel = conf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } - - return writeProperties; - } - private DataStream distributeDataStream(DataStream input) { DistributionMode mode = flinkWriteConf.distributionMode(); Schema schema = table.schema(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index 3f60b45a1f81..b3a9ac6ba2eb 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -18,17 +18,30 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; +import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; + import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SinkUtil { +@Internal +public class SinkUtil { private static final long INITIAL_CHECKPOINT_ID = -1L; @@ -90,4 +103,48 @@ static long getMaxCommittedCheckpointId( return lastCommittedCheckpointId; } + + /** + * Based on the {@link FileFormat} overwrites the table level compression properties for the table + * write. + * + * @param format The FileFormat to use + * @param conf The write configuration + * @param table The table to get the table level settings + * @return The properties to use for writing + */ + public static Map writeProperties( + FileFormat format, FlinkWriteConf conf, @Nullable Table table) { + Map writeProperties = Maps.newHashMap(); + if (table != null) { + writeProperties.putAll(table.properties()); + } + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); + String parquetCompressionLevel = conf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); + String avroCompressionLevel = conf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 6e749757a735..8d62e93a30c8 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -18,13 +18,6 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; - import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -59,6 +52,7 @@ import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.flink.sink.SinkUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -333,7 +327,7 @@ private DynamicIcebergSink build() { FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); Map writeProperties = - writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf); + SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, null); uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); return instantiateSink(writeProperties, flinkWriteConf); @@ -391,45 +385,6 @@ public DataStreamSink append() { } } - /** - * Based on the {@link FileFormat} overwrites the table level compression properties for the table - * write. - * - * @param format The FileFormat to use - * @param conf The write configuration - * @return The properties to use for writing - */ - private static Map writeProperties(FileFormat format, FlinkWriteConf conf) { - Map writeProperties = Maps.newHashMap(); - - switch (format) { - case PARQUET: - writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); - String parquetCompressionLevel = conf.parquetCompressionLevel(); - if (parquetCompressionLevel != null) { - writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); - } - - break; - case AVRO: - writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); - String avroCompressionLevel = conf.avroCompressionLevel(); - if (avroCompressionLevel != null) { - writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); - } - - break; - case ORC: - writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); - writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); - break; - default: - throw new IllegalArgumentException(String.format("Unknown file format %s", format)); - } - - return writeProperties; - } - DataStream distributeDataStream(DataStream input) { return input.keyBy(DynamicRecordInternal::writerKey); } From a55185d70f9819ace8e7bacb4e6c29ad9552cbac Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 11:48:05 +0200 Subject: [PATCH 10/15] Update JavaDoc --- .../iceberg/flink/sink/dynamic/DynamicRecordGenerator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java index 0681bea05919..637dd1307d37 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordGenerator.java @@ -26,6 +26,9 @@ public interface DynamicRecordGenerator extends Serializable { default void open(OpenContext openContext) throws Exception {} - /** Takes the user-defined input and yields zero, one, or multiple {@link DynamicRecord}s. */ + /** + * Takes the user-defined input and yields zero, one, or multiple {@link DynamicRecord}s using the + * {@link Collector}. + */ void convert(T inputRecord, Collector out) throws Exception; } From edd23dc0df2b12b406e3d7dd3f0e7996d0c01fbc Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 11:48:40 +0200 Subject: [PATCH 11/15] VisibleForTesting --- .../iceberg/flink/sink/dynamic/DynamicRecordProcessor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index efff3a69aa0f..e44f0c0820e0 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.sink.dynamic; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -34,6 +35,7 @@ @Internal class DynamicRecordProcessor extends ProcessFunction implements Collector { + @VisibleForTesting static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream"; private final DynamicRecordGenerator generator; From bd633a763ec56c664a658ad0f89344d14b9b5cac Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 12:26:09 +0200 Subject: [PATCH 12/15] Retrigger tests (unrelated failure in 1.20 tests) From 522d4bb90804316f38ccb79d1964a461df6d6720 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 16:34:55 +0200 Subject: [PATCH 13/15] Call super.close() in DynamicRecordProcessor --- .../flink/sink/dynamic/DynamicRecordProcessor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index e44f0c0820e0..abd12747b7a3 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -161,5 +161,11 @@ private void emit( } @Override - public void close() {} + public void close() { + try { + super.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } From 8284ef17c28f431e11639f42d72242a27af46781 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 17:21:58 +0200 Subject: [PATCH 14/15] Only instantiate updateStream when update is non-immediate --- .../flink/sink/dynamic/DynamicRecordProcessor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index abd12747b7a3..bc569633cc8f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -74,13 +74,13 @@ public void open(OpenContext openContext) throws Exception { cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks()); if (immediateUpdate) { updater = new TableUpdater(tableCache, catalog); + } else { + updateStream = + new OutputTag<>( + DYNAMIC_TABLE_UPDATE_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; } - updateStream = - new OutputTag<>( - DYNAMIC_TABLE_UPDATE_STREAM, - new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; - generator.open(openContext); } From d842ee5910f30d3d2f219948c984cf37e669fdcb Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 16 Jun 2025 17:42:40 +0200 Subject: [PATCH 15/15] Remove left-over comment --- .../src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java | 1 - .../iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java | 1 - 2 files changed, 2 deletions(-) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java index 8188a8bcdc03..b47a7920fe0c 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkFilters.java @@ -427,7 +427,6 @@ public Expression visit(UnresolvedCallExpression unresolvedCall) { unresolvedCall.getChildren().stream() .map(e -> (ResolvedExpression) e.accept(this)) .collect(Collectors.toList()); - // TODO mxm false? return new CallExpression( false, unresolvedCall.getFunctionIdentifier().orElse(null), diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 1bd582cf6f6f..f94990cc1519 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -649,7 +649,6 @@ private void executeDynamicSink( env.setParallelism(parallelism); if (commitHook != null) { - // Sink failingDynamicIcebergSink = new CommitHookEnabledDynamicIcebergSink(commitHook) .forInput(dataStream) .generator(new Generator())