diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 2109c91bddf7..1b786e46452f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -19,13 +19,16 @@ package org.apache.iceberg.flink.sink; import java.util.Arrays; +import java.util.List; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.util.ScanTaskUtil; -class CommitSummary { +@Internal +public class CommitSummary { private final AtomicLong dataFilesCount = new AtomicLong(); private final AtomicLong dataFilesRecordCount = new AtomicLong(); @@ -34,30 +37,35 @@ class CommitSummary { private final AtomicLong deleteFilesRecordCount = new AtomicLong(); private final AtomicLong deleteFilesByteCount = new AtomicLong(); - CommitSummary(NavigableMap pendingResults) { - pendingResults - .values() + public CommitSummary() {} + + public CommitSummary(NavigableMap pendingResults) { + pendingResults.values().forEach(this::addWriteResult); + } + + public void addAll(NavigableMap> pendingResults) { + pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult)); + } + + private void addWriteResult(WriteResult writeResult) { + dataFilesCount.addAndGet(writeResult.dataFiles().length); + Arrays.stream(writeResult.dataFiles()) + .forEach( + dataFile -> { + dataFilesRecordCount.addAndGet(dataFile.recordCount()); + dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes()); + }); + deleteFilesCount.addAndGet(writeResult.deleteFiles().length); + Arrays.stream(writeResult.deleteFiles()) .forEach( - writeResult -> { - dataFilesCount.addAndGet(writeResult.dataFiles().length); - Arrays.stream(writeResult.dataFiles()) - .forEach( - dataFile -> { - dataFilesRecordCount.addAndGet(dataFile.recordCount()); - dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes()); - }); - deleteFilesCount.addAndGet(writeResult.deleteFiles().length); - Arrays.stream(writeResult.deleteFiles()) - .forEach( - deleteFile -> { - deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); - long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile); - deleteFilesByteCount.addAndGet(deleteBytes); - }); + deleteFile -> { + deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); + long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile); + deleteFilesByteCount.addAndGet(deleteBytes); }); } - long dataFilesCount() { + public long dataFilesCount() { return dataFilesCount.get(); } @@ -69,7 +77,7 @@ long dataFilesByteCount() { return dataFilesByteCount.get(); } - long deleteFilesCount() { + public long deleteFilesCount() { return deleteFilesCount.get(); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java index 036970c06d5b..92c50165c0f5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -19,11 +19,13 @@ package org.apache.iceberg.flink.sink; import java.util.List; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -class DeltaManifests { +@Internal +public class DeltaManifests { private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0]; @@ -56,7 +58,7 @@ CharSequence[] referencedDataFiles() { return referencedDataFiles; } - List manifests() { + public List manifests() { List manifests = Lists.newArrayListWithCapacity(2); if (dataManifest != null) { manifests.add(dataManifest); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java index 92ca284b12ba..6ad41bacf337 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -23,17 +23,19 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class DeltaManifestsSerializer implements SimpleVersionedSerializer { +@Internal +public class DeltaManifestsSerializer implements SimpleVersionedSerializer { private static final int VERSION_1 = 1; private static final int VERSION_2 = 2; private static final byte[] EMPTY_BINARY = new byte[0]; - static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer(); + public static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer(); @Override public int getVersion() { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 9571efdc5268..d107c2739b04 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -38,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class FlinkManifestUtil { +public class FlinkManifestUtil { private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class); private static final int FORMAT_V2 = 2; @@ -66,7 +66,7 @@ static List readDataFiles( } } - static ManifestOutputFileFactory createOutputFileFactory( + public static ManifestOutputFileFactory createOutputFileFactory( Supplier tableSupplier, Map tableProps, String flinkJobId, @@ -83,7 +83,7 @@ static ManifestOutputFileFactory createOutputFileFactory( * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same * partition spec */ - static DeltaManifests writeCompletedFiles( + public static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { @@ -114,7 +114,7 @@ static DeltaManifests writeCompletedFiles( return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); } - static WriteResult readCompletedFiles( + public static WriteResult readCompletedFiles( DeltaManifests deltaManifests, FileIO io, Map specsById) throws IOException { WriteResult.Builder builder = WriteResult.builder(); @@ -135,7 +135,7 @@ static WriteResult readCompletedFiles( return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build(); } - static void deleteCommittedManifests( + public static void deleteCommittedManifests( Table table, List manifests, String newFlinkJobId, long checkpointId) { for (ManifestFile manifest : manifests) { try { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java index e2b388a83c75..1d83c211e001 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java @@ -30,7 +30,7 @@ * *

In both cases only the respective part is serialized. */ -class IcebergCommittableSerializer implements SimpleVersionedSerializer { +public class IcebergCommittableSerializer implements SimpleVersionedSerializer { private static final int VERSION = 1; @Override diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java index 5b28c4acb1c5..ce81ef11f13c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java @@ -20,11 +20,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.iceberg.flink.util.ElapsedTimeGauge; -class IcebergFilesCommitterMetrics { +@Internal +public class IcebergFilesCommitterMetrics { private final AtomicLong lastCheckpointDurationMs = new AtomicLong(); private final AtomicLong lastCommitDurationMs = new AtomicLong(); private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit; @@ -35,7 +37,7 @@ class IcebergFilesCommitterMetrics { private final Counter committedDeleteFilesRecordCount; private final Counter committedDeleteFilesByteCount; - IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) { + public IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) { MetricGroup committerMetrics = metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName); committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get); @@ -52,16 +54,16 @@ class IcebergFilesCommitterMetrics { this.committedDeleteFilesByteCount = committerMetrics.counter("committedDeleteFilesByteCount"); } - void checkpointDuration(long checkpointDurationMs) { + public void checkpointDuration(long checkpointDurationMs) { lastCheckpointDurationMs.set(checkpointDurationMs); } - void commitDuration(long commitDurationMs) { + public void commitDuration(long commitDurationMs) { lastCommitDurationMs.set(commitDurationMs); } /** This is called upon a successful commit. */ - void updateCommitSummary(CommitSummary stats) { + public void updateCommitSummary(CommitSummary stats) { elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime(); committedDataFilesCount.inc(stats.dataFilesCount()); committedDataFilesRecordCount.inc(stats.dataFilesRecordCount()); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java index ab458ad2e7cb..434f3969577f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java @@ -21,6 +21,7 @@ import com.codahale.metrics.SlidingWindowReservoir; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; @@ -28,7 +29,8 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.util.ScanTaskUtil; -class IcebergStreamWriterMetrics { +@Internal +public class IcebergStreamWriterMetrics { // 1,024 reservoir size should cost about 8KB, which is quite small. // It should also produce good accuracy for histogram distribution (like percentiles). private static final int HISTOGRAM_RESERVOIR_SIZE = 1024; @@ -40,7 +42,7 @@ class IcebergStreamWriterMetrics { private final Histogram dataFilesSizeHistogram; private final Histogram deleteFilesSizeHistogram; - IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) { + public IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) { MetricGroup writerMetrics = metrics.addGroup("IcebergStreamWriter").addGroup("table", fullTableName); this.flushedDataFiles = writerMetrics.counter("flushedDataFiles"); @@ -63,7 +65,7 @@ class IcebergStreamWriterMetrics { new DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram)); } - void updateFlushResult(WriteResult result) { + public void updateFlushResult(WriteResult result) { flushedDataFiles.inc(result.dataFiles().length); flushedDeleteFiles.inc(result.deleteFiles().length); flushedReferencedDataFiles.inc(result.referencedDataFiles().length); @@ -84,7 +86,15 @@ void updateFlushResult(WriteResult result) { }); } - void flushDuration(long flushDurationMs) { + public void flushDuration(long flushDurationMs) { lastFlushDurationMs.set(flushDurationMs); } + + public Counter getFlushedDataFiles() { + return flushedDataFiles; + } + + public Counter getFlushedDeleteFiles() { + return flushedDeleteFiles; + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index 30517cd38216..6ba87bea30c2 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; @@ -30,7 +31,8 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Strings; -class ManifestOutputFileFactory { +@Internal +public class ManifestOutputFileFactory { // Users could define their own flink manifests directory by setting this value in table // properties. @VisibleForTesting static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location"; @@ -70,7 +72,7 @@ private String generatePath(long checkpointId) { fileCount.incrementAndGet())); } - OutputFile create(long checkpointId) { + public OutputFile create(long checkpointId) { String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION); TableOperations ops = ((HasTableOperations) tableSupplier.get()).operations(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 67422a1afeb1..8dc8d38869bc 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -80,6 +80,28 @@ public RowDataTaskWriterFactory( Map writeProperties, List equalityFieldIds, boolean upsert) { + this( + tableSupplier, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsert, + tableSupplier.get().schema(), + tableSupplier.get().spec()); + } + + public RowDataTaskWriterFactory( + SerializableSupplier

tableSupplier, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + Map writeProperties, + List equalityFieldIds, + boolean upsert, + Schema schema, + PartitionSpec spec) { this.tableSupplier = tableSupplier; Table table; @@ -90,9 +112,9 @@ public RowDataTaskWriterFactory( table = tableSupplier.get(); } - this.schema = table.schema(); + this.schema = schema; this.flinkSchema = flinkSchema; - this.spec = table.spec(); + this.spec = spec; this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; this.equalityFieldIds = equalityFieldIds; @@ -148,6 +170,7 @@ public void initialize(int taskId, int attemptId) { OutputFileFactory.builderFor(table, taskId, attemptId) .format(format) .ioSupplier(() -> tableSupplier.get().io()) + .defaultSpec(spec) .build(); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java index 5a44373cccaa..40a3ce0cb846 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java @@ -20,13 +20,15 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; import org.apache.iceberg.io.WriteResult; -class WriteResultSerializer implements SimpleVersionedSerializer { +@Internal +public class WriteResultSerializer implements SimpleVersionedSerializer { private static final int VERSION = 1; @Override diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java new file mode 100644 index 000000000000..33edefe71eb0 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; +import org.apache.iceberg.flink.sink.DeltaManifests; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** + * The aggregated results of a single checkpoint which should be committed. Containing the + * serialized {@link DeltaManifests} file - which contains the commit data, and the jobId, + * operatorId, checkpointId triplet to identify the specific commit. + * + *

{@link DynamicCommittableSerializer} is used to serialize {@link DynamicCommittable} between + * the {@link DynamicWriter} and the {@link DynamicWriteResultAggregator}. + */ +class DynamicCommittable implements Serializable { + + private final WriteTarget key; + private final byte[] manifest; + private final String jobId; + private final String operatorId; + private final long checkpointId; + + DynamicCommittable( + WriteTarget key, byte[] manifest, String jobId, String operatorId, long checkpointId) { + this.key = key; + this.manifest = manifest; + this.jobId = jobId; + this.operatorId = operatorId; + this.checkpointId = checkpointId; + } + + WriteTarget key() { + return key; + } + + byte[] manifest() { + return manifest; + } + + String jobId() { + return jobId; + } + + String operatorId() { + return operatorId; + } + + long checkpointId() { + return checkpointId; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + DynamicCommittable that = (DynamicCommittable) o; + return checkpointId == that.checkpointId + && Objects.equals(key, that.key) + && Objects.deepEquals(manifest, that.manifest) + && Objects.equals(jobId, that.jobId) + && Objects.equals(operatorId, that.operatorId); + } + + @Override + public int hashCode() { + return Objects.hash(key, Arrays.hashCode(manifest), jobId, operatorId, checkpointId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key) + .add("jobId", jobId) + .add("checkpointId", checkpointId) + .add("operatorId", operatorId) + .toString(); + } + + public WriteTarget writeTarget() { + return key; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java new file mode 100644 index 000000000000..4aadcf1f3620 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +/** + * This serializer is used for serializing the {@link DynamicCommittable} objects between the {@link + * DynamicWriter} and the {@link DynamicWriteResultAggregator} operator and for sending it down to + * the {@link DynamicCommitter}. + */ +class DynamicCommittableSerializer implements SimpleVersionedSerializer { + + private static final int VERSION = 1; + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(DynamicCommittable committable) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + committable.key().serializeTo(view); + view.writeUTF(committable.jobId()); + view.writeUTF(committable.operatorId()); + view.writeLong(committable.checkpointId()); + view.writeInt(committable.manifest().length); + view.write(committable.manifest()); + return out.toByteArray(); + } + + @Override + public DynamicCommittable deserialize(int version, byte[] serialized) throws IOException { + if (version == 1) { + DataInputDeserializer view = new DataInputDeserializer(serialized); + WriteTarget key = WriteTarget.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf; + manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); + } + + throw new IOException("Unrecognized version or corrupt state: " + version); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java new file mode 100644 index 000000000000..3e051dc5d549 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SnapshotUpdate; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.CommitSummary; +import org.apache.iceberg.flink.sink.DeltaManifests; +import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; +import org.apache.iceberg.flink.sink.FlinkManifestUtil; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements the Flink SinkV2 {@link Committer} interface to implement the Iceberg + * commits. The implementation builds on the following assumptions: + * + *

    + *
  • There is a single {@link DynamicCommittable} for every table / branch / checkpoint + *
  • There is no late checkpoint - if checkpoint 'x' has received in one call, then after a + * successful run only checkpoints > x will arrive + *
  • There is no other writer which would generate another commit to the same branch with the + * same jobId-operatorId-checkpointId triplet + *
+ */ +@Internal +class DynamicCommitter implements Committer { + + private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class); + private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; + private static final WriteResult EMPTY_WRITE_RESULT = + WriteResult.builder() + .addDataFiles(Lists.newArrayList()) + .addDeleteFiles(Lists.newArrayList()) + .build(); + + private static final long INITIAL_CHECKPOINT_ID = -1L; + + @VisibleForTesting + static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits"; + + private static final String FLINK_JOB_ID = "flink.job-id"; + private static final String OPERATOR_ID = "flink.operator-id"; + private final Map snapshotProperties; + private final boolean replacePartitions; + private final DynamicCommitterMetrics committerMetrics; + private final Catalog catalog; + private final Map maxContinuousEmptyCommitsMap; + private final Map continuousEmptyCheckpointsMap; + private final ExecutorService workerPool; + + DynamicCommitter( + Catalog catalog, + Map snapshotProperties, + boolean replacePartitions, + int workerPoolSize, + String sinkId, + DynamicCommitterMetrics committerMetrics) { + this.snapshotProperties = snapshotProperties; + this.replacePartitions = replacePartitions; + this.committerMetrics = committerMetrics; + this.catalog = catalog; + this.maxContinuousEmptyCommitsMap = Maps.newHashMap(); + this.continuousEmptyCheckpointsMap = Maps.newHashMap(); + + this.workerPool = ThreadPools.newWorkerPool("iceberg-committer-pool-" + sinkId, workerPoolSize); + } + + @Override + public void commit(Collection> commitRequests) + throws IOException, InterruptedException { + if (commitRequests.isEmpty()) { + return; + } + + // For every table and every checkpoint, we store the list of to-be-committed + // DynamicCommittable. + // There may be DynamicCommittable from previous checkpoints which have not been committed yet. + Map>>> commitRequestMap = + Maps.newHashMap(); + for (CommitRequest request : commitRequests) { + NavigableMap>> committables = + commitRequestMap.computeIfAbsent( + new TableKey(request.getCommittable()), unused -> Maps.newTreeMap()); + committables + .computeIfAbsent(request.getCommittable().checkpointId(), unused -> Lists.newArrayList()) + .add(request); + } + + for (Map.Entry>>> entry : + commitRequestMap.entrySet()) { + Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); + DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); + long maxCommittedCheckpointId = + getMaxCommittedCheckpointId( + table, last.jobId(), last.operatorId(), entry.getKey().branch()); + // Mark the already committed FilesCommittable(s) as finished + entry + .getValue() + .headMap(maxCommittedCheckpointId, true) + .values() + .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); + NavigableMap>> uncommitted = + entry.getValue().tailMap(maxCommittedCheckpointId, false); + if (!uncommitted.isEmpty()) { + commitPendingRequests( + table, entry.getKey().branch(), uncommitted, last.jobId(), last.operatorId()); + } + } + } + + private static long getMaxCommittedCheckpointId( + Table table, String flinkJobId, String operatorId, String branch) { + Snapshot snapshot = table.snapshot(branch); + long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + + while (snapshot != null) { + Map summary = snapshot.summary(); + String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); + String snapshotOperatorId = summary.get(OPERATOR_ID); + if (flinkJobId.equals(snapshotFlinkJobId) + && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { + String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + if (value != null) { + lastCommittedCheckpointId = Long.parseLong(value); + break; + } + } + + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + + return lastCommittedCheckpointId; + } + + /** + * Commits the data to the Iceberg table by reading the file data from the {@link DeltaManifests} + * ordered by the checkpointId, and writing the new snapshot to the Iceberg table. The {@link + * SnapshotSummary} will contain the jobId, snapshotId, checkpointId so in case of job restart we + * can identify which changes are committed, and which are still waiting for the commit. + * + * @param commitRequestMap The checkpointId to {@link CommitRequest} map of the changes to commit + * @param newFlinkJobId The jobId to store in the {@link SnapshotSummary} + * @param operatorId The operatorId to store in the {@link SnapshotSummary} + * @throws IOException On commit failure + */ + private void commitPendingRequests( + Table table, + String branch, + NavigableMap>> commitRequestMap, + String newFlinkJobId, + String operatorId) + throws IOException { + long checkpointId = commitRequestMap.lastKey(); + List manifests = Lists.newArrayList(); + NavigableMap> pendingResults = Maps.newTreeMap(); + for (Map.Entry>> e : commitRequestMap.entrySet()) { + for (CommitRequest committable : e.getValue()) { + if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.getCommittable().manifest())) { + pendingResults + .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) + .add(EMPTY_WRITE_RESULT); + } else { + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, committable.getCommittable().manifest()); + pendingResults + .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) + .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); + manifests.addAll(deltaManifests.manifests()); + } + } + } + + CommitSummary summary = new CommitSummary(); + summary.addAll(pendingResults); + commitPendingResult(table, branch, pendingResults, summary, newFlinkJobId, operatorId); + if (committerMetrics != null) { + committerMetrics.updateCommitSummary(table.name(), summary); + } + + FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId); + } + + private void commitPendingResult( + Table table, + String branch, + NavigableMap> pendingResults, + CommitSummary summary, + String newFlinkJobId, + String operatorId) { + long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); + TableKey key = new TableKey(table.name(), branch); + int continuousEmptyCheckpoints = + continuousEmptyCheckpointsMap.computeIfAbsent(key, unused -> 0); + int maxContinuousEmptyCommits = + maxContinuousEmptyCommitsMap.computeIfAbsent( + key, + unused -> { + int result = + PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10); + Preconditions.checkArgument( + result > 0, MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive"); + return result; + }); + continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; + if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { + if (replacePartitions) { + replacePartitions(table, branch, pendingResults, summary, newFlinkJobId, operatorId); + } else { + commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId); + } + + continuousEmptyCheckpoints = 0; + } else { + long checkpointId = pendingResults.lastKey(); + LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId); + } + + continuousEmptyCheckpointsMap.put(key, continuousEmptyCheckpoints); + } + + private void replacePartitions( + Table table, + String branch, + NavigableMap> pendingResults, + CommitSummary summary, + String newFlinkJobId, + String operatorId) { + for (Map.Entry> e : pendingResults.entrySet()) { + // We don't commit the merged result into a single transaction because for the sequential + // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied + // to data files from txn1. Committing the merged one will lead to the incorrect delete + // semantic. + for (WriteResult result : e.getValue()) { + ReplacePartitions dynamicOverwrite = + table.newReplacePartitions().scanManifestsWith(workerPool); + Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); + commitOperation( + table, + branch, + dynamicOverwrite, + summary, + "dynamic partition overwrite", + newFlinkJobId, + operatorId, + e.getKey()); + } + } + } + + private void commitDeltaTxn( + Table table, + String branch, + NavigableMap> pendingResults, + CommitSummary summary, + String newFlinkJobId, + String operatorId) { + for (Map.Entry> e : pendingResults.entrySet()) { + // We don't commit the merged result into a single transaction because for the sequential + // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied + // to data files from txn1. Committing the merged one will lead to the incorrect delete + // semantic. + for (WriteResult result : e.getValue()) { + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced by + // the position delete files that are being committed. + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + commitOperation( + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey()); + } + } + } + + private void commitOperation( + Table table, + String branch, + SnapshotUpdate operation, + CommitSummary summary, + String description, + String newFlinkJobId, + String operatorId, + long checkpointId) { + + LOG.info( + "Committing {} for checkpoint {} to table {} branch {} with summary: {}", + description, + checkpointId, + table.name(), + branch, + summary); + snapshotProperties.forEach(operation::set); + // custom snapshot metadata properties will be overridden if they conflict with internal ones + // used by the sink. + operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); + operation.set(FLINK_JOB_ID, newFlinkJobId); + operation.set(OPERATOR_ID, operatorId); + operation.toBranch(branch); + + long startNano = System.nanoTime(); + operation.commit(); // abort is automatically called if this fails. + long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano); + LOG.info( + "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", + description, + table.name(), + branch, + checkpointId, + durationMs); + if (committerMetrics != null) { + committerMetrics.commitDuration(table.name(), durationMs); + } + } + + @Override + public void close() throws IOException { + // do nothing + } + + private static class TableKey implements Serializable { + private String tableName; + private String branch; + + TableKey(String tableName, String branch) { + this.tableName = tableName; + this.branch = branch; + } + + TableKey(DynamicCommittable committable) { + this.tableName = committable.key().tableName(); + this.branch = committable.key().branch(); + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + TableKey that = (TableKey) other; + return tableName.equals(that.tableName) && branch.equals(that.branch); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .toString(); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitterMetrics.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitterMetrics.java new file mode 100644 index 000000000000..d34feea75285 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitterMetrics.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Map; +import org.apache.flink.metrics.MetricGroup; +import org.apache.iceberg.flink.sink.CommitSummary; +import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +class DynamicCommitterMetrics { + + private final Map metrics; + private final MetricGroup mainMetricsGroup; + + DynamicCommitterMetrics(MetricGroup mainMetricsGroup) { + this.mainMetricsGroup = mainMetricsGroup; + this.metrics = Maps.newHashMap(); + } + + public void commitDuration(String fullTableName, long commitDurationMs) { + committerMetrics(fullTableName).commitDuration(commitDurationMs); + } + + /** This is called upon a successful commit. */ + public void updateCommitSummary(String fullTableName, CommitSummary stats) { + committerMetrics(fullTableName).updateCommitSummary(stats); + } + + private IcebergFilesCommitterMetrics committerMetrics(String fullTableName) { + return metrics.computeIfAbsent( + fullTableName, tableName -> new IcebergFilesCommitterMetrics(mainMetricsGroup, tableName)); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java new file mode 100644 index 000000000000..85806f932ad5 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.iceberg.io.WriteResult; + +class DynamicWriteResult { + + private final WriteTarget key; + private final WriteResult writeResult; + + DynamicWriteResult(WriteTarget key, WriteResult writeResult) { + this.key = key; + this.writeResult = writeResult; + } + + WriteTarget key() { + return key; + } + + WriteResult writeResult() { + return writeResult; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java new file mode 100644 index 000000000000..58ba183dfcd4 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.sink.DeltaManifests; +import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; +import org.apache.iceberg.flink.sink.FlinkManifestUtil; +import org.apache.iceberg.flink.sink.ManifestOutputFileFactory; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operator which aggregates the individual {@link WriteResult} objects to a single {@link + * DynamicCommittable} per checkpoint (storing the serialized {@link DeltaManifests}, jobId, + * operatorId, checkpointId) + */ +class DynamicWriteResultAggregator + extends AbstractStreamOperator> + implements OneInputStreamOperator< + CommittableMessage, CommittableMessage> { + private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteResultAggregator.class); + private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; + private static final Duration CACHE_EXPIRATION_DURATION = Duration.ofMinutes(1); + + private final CatalogLoader catalogLoader; + private transient Map> results; + private transient Cache> specs; + private transient Cache outputFileFactories; + private transient String flinkJobId; + private transient String operatorId; + private transient int subTaskId; + private transient int attemptId; + private transient Catalog catalog; + + DynamicWriteResultAggregator(CatalogLoader catalogLoader) { + this.catalogLoader = catalogLoader; + } + + @Override + public void open() throws Exception { + this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); + this.operatorId = getOperatorID().toString(); + this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber(); + this.results = Maps.newHashMap(); + this.specs = + Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build(); + this.outputFileFactories = + Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build(); + this.catalog = catalogLoader.loadCatalog(); + } + + @Override + public void finish() throws IOException { + prepareSnapshotPreBarrier(Long.MAX_VALUE); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { + Collection> committables = + Sets.newHashSetWithExpectedSize(results.size()); + int count = 0; + for (Map.Entry> entries : results.entrySet()) { + committables.add( + new CommittableWithLineage<>( + new DynamicCommittable( + entries.getKey(), + writeToManifest(entries.getKey(), entries.getValue(), checkpointId), + getContainingTask().getEnvironment().getJobID().toString(), + getRuntimeContext().getOperatorUniqueID(), + checkpointId), + checkpointId, + count)); + ++count; + } + + output.collect( + new StreamRecord<>( + new CommittableSummary<>(subTaskId, count, checkpointId, count, count, 0))); + committables.forEach( + c -> + output.collect( + new StreamRecord<>( + new CommittableWithLineage<>(c.getCommittable(), checkpointId, subTaskId)))); + LOG.info("Emitted {} commit message to downstream committer operator", count); + results.clear(); + } + + /** + * Write all the completed data files to a newly created manifest file and return the manifest's + * avro serialized bytes. + */ + @VisibleForTesting + byte[] writeToManifest( + WriteTarget key, Collection writeResults, long checkpointId) + throws IOException { + if (writeResults.isEmpty()) { + return EMPTY_MANIFEST_DATA; + } + + WriteResult.Builder builder = WriteResult.builder(); + writeResults.forEach(w -> builder.add(w.writeResult())); + WriteResult result = builder.build(); + + DeltaManifests deltaManifests = + FlinkManifestUtil.writeCompletedFiles( + result, + () -> outputFileFactory(key.tableName()).create(checkpointId), + spec(key.tableName(), key.specId())); + + return SimpleVersionedSerialization.writeVersionAndSerialize( + DeltaManifestsSerializer.INSTANCE, deltaManifests); + } + + @Override + public void processElement(StreamRecord> element) + throws Exception { + + if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { + DynamicWriteResult result = + ((CommittableWithLineage) element.getValue()).getCommittable(); + WriteTarget key = result.key(); + results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result); + } + } + + private ManifestOutputFileFactory outputFileFactory(String tableName) { + return outputFileFactories.get( + tableName, + unused -> { + Table table = catalog.loadTable(TableIdentifier.parse(tableName)); + specs.put(tableName, table.specs()); + return FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, subTaskId, attemptId); + }); + } + + private PartitionSpec spec(String tableName, int specId) { + Map knownSpecs = specs.getIfPresent(tableName); + if (knownSpecs != null) { + PartitionSpec spec = knownSpecs.get(specId); + if (spec != null) { + return spec; + } + } + + Table table = catalog.loadTable(TableIdentifier.parse(tableName)); + return table.specs().get(specId); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java new file mode 100644 index 000000000000..cf5f423fd7ff --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.iceberg.flink.sink.WriteResultSerializer; +import org.apache.iceberg.io.WriteResult; + +class DynamicWriteResultSerializer implements SimpleVersionedSerializer { + + private static final int VERSION = 1; + private static final WriteResultSerializer WRITE_RESULT_SERIALIZER = new WriteResultSerializer(); + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(DynamicWriteResult writeResult) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + writeResult.key().serializeTo(view); + byte[] result = WRITE_RESULT_SERIALIZER.serialize(writeResult.writeResult()); + view.write(result); + return out.toByteArray(); + } + + @Override + public DynamicWriteResult deserialize(int version, byte[] serialized) throws IOException { + if (version == 1) { + DataInputDeserializer view = new DataInputDeserializer(serialized); + WriteTarget key = WriteTarget.deserializeFrom(view); + byte[] resultBuf = new byte[view.available()]; + view.read(resultBuf); + WriteResult writeResult = WRITE_RESULT_SERIALIZER.deserialize(version, resultBuf); + return new DynamicWriteResult(key, writeResult); + } + + throw new IOException("Unrecognized version or corrupt state: " + version); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java new file mode 100644 index 000000000000..3851dbf95603 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg writer implementation for the {@link SinkWriter} interface. Used by the + * DynamicIcebergSink. Writes out the data to the final place, and emits {@link DynamicWriteResult} + * for every unique {@link WriteTarget} at checkpoint time. + */ +class DynamicWriter implements CommittingSinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicWriter.class); + + private final Cache taskWriterFactories; + private final Map> writers; + private final DynamicWriterMetrics metrics; + private final int subTaskId; + private final int attemptId; + private final Catalog catalog; + private final FileFormat dataFileFormat; + private final long targetDataFileSize; + private final Map commonWriteProperties; + + DynamicWriter( + Catalog catalog, + FileFormat dataFileFormat, + long targetDataFileSize, + Map commonWriteProperties, + int cacheMaximumSize, + DynamicWriterMetrics metrics, + int subTaskId, + int attemptId) { + this.catalog = catalog; + this.dataFileFormat = dataFileFormat; + this.targetDataFileSize = targetDataFileSize; + this.commonWriteProperties = commonWriteProperties; + this.metrics = metrics; + this.subTaskId = subTaskId; + this.attemptId = attemptId; + this.taskWriterFactories = Caffeine.newBuilder().maximumSize(cacheMaximumSize).build(); + this.writers = Maps.newHashMap(); + + LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", subTaskId, attemptId); + } + + @Override + public void write(DynamicRecordInternal element, Context context) + throws IOException, InterruptedException { + writers + .computeIfAbsent( + new WriteTarget( + element.tableName(), + element.branch(), + element.schema().schemaId(), + element.spec().specId(), + element.upsertMode(), + element.equalityFields()), + writerKey -> { + RowDataTaskWriterFactory taskWriterFactory = + taskWriterFactories.get( + writerKey, + factoryKey -> { + Table table = + catalog.loadTable(TableIdentifier.parse(factoryKey.tableName())); + + // TODO: Handle precedence correctly for the write properties coming from + // the sink conf and from the table defaults + Map tableWriteProperties = + Maps.newHashMap(commonWriteProperties); + tableWriteProperties.putAll(table.properties()); + + List equalityFieldIds = + getEqualityFields(table, element.equalityFields()); + if (element.upsertMode()) { + Preconditions.checkState( + !equalityFieldIds.isEmpty(), + "Equality field columns shouldn't be empty when configuring to use UPSERT data."); + if (!table.spec().isUnpartitioned()) { + for (PartitionField partitionField : table.spec().fields()) { + Preconditions.checkState( + equalityFieldIds.contains(partitionField.sourceId()), + "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", + partitionField, + equalityFieldIds); + } + } + } + + LOG.debug("Creating new writer factory for table '{}'", table.name()); + return new RowDataTaskWriterFactory( + () -> table, + FlinkSchemaUtil.convert(element.schema()), + targetDataFileSize, + dataFileFormat, + tableWriteProperties, + equalityFieldIds, + element.upsertMode(), + element.schema(), + element.spec()); + }); + + taskWriterFactory.initialize(subTaskId, attemptId); + return taskWriterFactory.create(); + }) + .write(element.rowData()); + } + + @Override + public void flush(boolean endOfInput) { + // flush is used to handle flush/endOfInput, so no action is taken here. + } + + @Override + public void close() throws Exception { + for (TaskWriter writer : writers.values()) { + writer.close(); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("subtaskId", subTaskId) + .add("attemptId", attemptId) + .add("dataFileFormat", dataFileFormat) + .add("targetDataFileSize", targetDataFileSize) + .add("writeProperties", commonWriteProperties) + .toString(); + } + + @Override + public Collection prepareCommit() throws IOException { + List result = Lists.newArrayList(); + for (Map.Entry> entry : writers.entrySet()) { + long startNano = System.nanoTime(); + WriteResult writeResult = entry.getValue().complete(); + WriteTarget writeTarget = entry.getKey(); + metrics.updateFlushResult(writeTarget.tableName(), writeResult); + metrics.flushDuration( + writeTarget.tableName(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); + LOG.debug( + "Iceberg writer for table {} subtask {} attempt {} flushed {} data files and {} delete files", + writeTarget.tableName(), + subTaskId, + attemptId, + writeResult.dataFiles().length, + writeResult.deleteFiles().length); + + result.add(new DynamicWriteResult(writeTarget, writeResult)); + } + + writers.clear(); + + return result; + } + + private static List getEqualityFields(Table table, List equalityFieldIds) { + if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) { + return equalityFieldIds; + } + Set identifierFieldIds = table.schema().identifierFieldIds(); + if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) { + return Lists.newArrayList(identifierFieldIds); + } + return Collections.emptyList(); + } + + @VisibleForTesting + DynamicWriterMetrics getMetrics() { + return metrics; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java new file mode 100644 index 000000000000..2e1f82df9d2d --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Map; +import org.apache.flink.metrics.MetricGroup; +import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +class DynamicWriterMetrics { + + private final Map metrics; + private final MetricGroup mainMetricsGroup; + + DynamicWriterMetrics(MetricGroup mainMetricsGroup) { + this.mainMetricsGroup = mainMetricsGroup; + this.metrics = Maps.newHashMap(); + } + + public void updateFlushResult(String fullTableName, WriteResult result) { + writerMetrics(fullTableName).updateFlushResult(result); + } + + public void flushDuration(String fullTableName, long flushDurationMs) { + writerMetrics(fullTableName).flushDuration(flushDurationMs); + } + + IcebergStreamWriterMetrics writerMetrics(String fullTableName) { + return metrics.computeIfAbsent( + fullTableName, tableName -> new IcebergStreamWriterMetrics(mainMetricsGroup, tableName)); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java new file mode 100644 index 000000000000..0a43404d13ad --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +class WriteTarget implements Serializable { + + private final String tableName; + private final String branch; + private final Integer schemaId; + private final Integer specId; + private final boolean upsertMode; + private final List equalityFields; + + WriteTarget( + String tableName, + String branch, + Integer schemaId, + Integer specId, + boolean upsertMode, + List equalityFields) { + this.tableName = tableName; + this.branch = branch != null ? branch : "main"; + this.schemaId = schemaId; + this.specId = specId; + this.upsertMode = upsertMode; + this.equalityFields = equalityFields; + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + Integer schemaId() { + return schemaId; + } + + Integer specId() { + return specId; + } + + boolean upsertMode() { + return upsertMode; + } + + List equalityFields() { + return equalityFields; + } + + void serializeTo(DataOutputView view) throws IOException { + view.writeUTF(tableName); + view.writeUTF(branch); + view.writeInt(schemaId); + view.writeInt(specId); + view.writeBoolean(upsertMode); + view.writeInt(equalityFields.size()); + for (Integer equalityField : equalityFields) { + view.writeInt(equalityField); + } + } + + static WriteTarget deserializeFrom(DataInputView view) throws IOException { + return new WriteTarget( + view.readUTF(), + view.readUTF(), + view.readInt(), + view.readInt(), + view.readBoolean(), + readList(view)); + } + + private static List readList(DataInputView view) throws IOException { + int numFields = view.readInt(); + List equalityFields = Lists.newArrayList(); + for (int i = 0; i < numFields; i++) { + equalityFields.add(view.readInt()); + } + + return equalityFields; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + WriteTarget that = (WriteTarget) other; + return Objects.equals(tableName, that.tableName) + && Objects.equals(branch, that.branch) + && Objects.equals(schemaId, that.schemaId) + && Objects.equals(specId, that.specId) + && upsertMode == that.upsertMode + && Objects.equals(equalityFields, that.equalityFields); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch, schemaId, specId, upsertMode, equalityFields); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .add("schemaId", schemaId) + .add("specId", specId) + .add("upsertMode", upsertMode) + .toString(); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java new file mode 100644 index 000000000000..f4109a6476aa --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +class TestDynamicCommittableSerializer { + + @Test + void testRoundtrip() throws IOException { + DynamicCommittable committable = + new DynamicCommittable( + new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + new byte[] {3, 4}, + JobID.generate().toHexString(), + new OperatorID().toHexString(), + 5); + + DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); + assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(committable))) + .isEqualTo(committable); + } + + @Test + void testUnsupportedVersion() throws IOException { + DynamicCommittable committable = + new DynamicCommittable( + new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + new byte[] {3, 4}, + JobID.generate().toHexString(), + new OperatorID().toHexString(), + 5); + + DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(committable))) + .hasMessage("Unrecognized version or corrupt state: -1") + .isInstanceOf(IOException.class); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java new file mode 100644 index 000000000000..d9129d6eacf6 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.connector.sink2.Committer.CommitRequest; +import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class TestDynamicCommitter { + + static final String DB = "db"; + static final String TABLE1 = "table"; + static final String TABLE2 = "table2"; + + @RegisterExtension + static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension(DB, TABLE1); + + Catalog catalog; + + private static final DataFile DATA_FILE = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-1.parquet") + .withFileSizeInBytes(0) + .withMetrics( + new Metrics( + 42L, + null, // no column sizes + ImmutableMap.of(1, 5L), // value count + ImmutableMap.of(1, 0L), // null count + null, + ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds + ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds + )) + .build(); + + @BeforeEach + void before() { + catalog = CATALOG_EXTENSION.catalog(); + Schema schema1 = new Schema(42); + Schema schema2 = new Schema(43); + catalog.createTable(TableIdentifier.of(TABLE1), schema1); + catalog.createTable(TableIdentifier.of(TABLE2), schema2); + } + + @Test + void testCommit() throws Exception { + Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table1.snapshots()).isEmpty(); + Table table2 = catalog.loadTable(TableIdentifier.of(TABLE2)); + assertThat(table2.snapshots()).isEmpty(); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter dynamicCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + WriteTarget writeTarget1 = + new WriteTarget(TABLE1, "branch", 42, 0, true, Lists.newArrayList(1, 2)); + WriteTarget writeTarget2 = + new WriteTarget(TABLE1, "branch2", 43, 0, true, Lists.newArrayList(1, 2)); + WriteTarget writeTarget3 = + new WriteTarget(TABLE2, "branch2", 43, 0, true, Lists.newArrayList(1, 2)); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + byte[] deltaManifest1 = + aggregator.writeToManifest( + writeTarget1, + Lists.newArrayList( + new DynamicWriteResult( + writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build())), + 0); + byte[] deltaManifest2 = + aggregator.writeToManifest( + writeTarget2, + Lists.newArrayList( + new DynamicWriteResult( + writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build())), + 0); + byte[] deltaManifest3 = + aggregator.writeToManifest( + writeTarget3, + Lists.newArrayList( + new DynamicWriteResult( + writeTarget3, WriteResult.builder().addDataFiles(DATA_FILE).build())), + 0); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 10; + + CommitRequest commitRequest1 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + + CommitRequest commitRequest2 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + + CommitRequest commitRequest3 = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId)); + + dynamicCommitter.commit(Lists.newArrayList(commitRequest1, commitRequest2, commitRequest3)); + + table1.refresh(); + assertThat(table1.snapshots()).hasSize(2); + Snapshot first = Iterables.getFirst(table1.snapshots(), null); + assertThat(first.summary()) + .containsAllEntriesOf( + (Map) + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + Snapshot second = Iterables.get(table1.snapshots(), 1, null); + assertThat(second.summary()) + .containsAllEntriesOf( + (Map) + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + + table2.refresh(); + assertThat(table2.snapshots()).hasSize(1); + Snapshot third = Iterables.getFirst(table2.snapshots(), null); + assertThat(third.summary()) + .containsAllEntriesOf( + (Map) + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + + @Test + void testAlreadyCommitted() throws Exception { + Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table1.snapshots()).isEmpty(); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter dynamicCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + WriteTarget writeTarget = + new WriteTarget(TABLE1, "branch", 42, 0, false, Lists.newArrayList(1, 2)); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 10; + + byte[] deltaManifest = + aggregator.writeToManifest( + writeTarget, + Lists.newArrayList( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId); + + CommitRequest commitRequest = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + + dynamicCommitter.commit(Lists.newArrayList(commitRequest)); + + CommitRequest oldCommitRequest = + new MockCommitRequest<>( + new DynamicCommittable( + writeTarget, deltaManifest, jobId, operatorId, checkpointId - 1)); + + // Old commits requests shouldn't affect the result + dynamicCommitter.commit(Lists.newArrayList(oldCommitRequest)); + + table1.refresh(); + assertThat(table1.snapshots()).hasSize(1); + Snapshot first = Iterables.getFirst(table1.snapshots(), null); + assertThat(first.summary()) + .containsAllEntriesOf( + (Map) + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } + + @Test + void testReplacePartitions() throws Exception { + Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table1.snapshots()).isEmpty(); + + // Overwrite mode is active + boolean overwriteMode = true; + int workerPoolSize = 1; + String sinkId = "sinkId"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter dynamicCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + sinkId, + committerMetrics); + + WriteTarget writeTarget = + new WriteTarget(TABLE1, "branch", 42, 0, false, Lists.newArrayList(1, 2)); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + final String jobId = JobID.generate().toHexString(); + final String operatorId = new OperatorID().toHexString(); + final int checkpointId = 10; + + byte[] deltaManifest = + aggregator.writeToManifest( + writeTarget, + Lists.newArrayList( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId); + + CommitRequest commitRequest = + new MockCommitRequest<>( + new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + + dynamicCommitter.commit(Lists.newArrayList(commitRequest)); + + byte[] overwriteManifest = + aggregator.writeToManifest( + writeTarget, + Lists.newArrayList( + new DynamicWriteResult( + writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), + checkpointId + 1); + + CommitRequest overwriteRequest = + new MockCommitRequest<>( + new DynamicCommittable( + writeTarget, overwriteManifest, jobId, operatorId, checkpointId + 1)); + + dynamicCommitter.commit(Lists.newArrayList(overwriteRequest)); + + table1.refresh(); + assertThat(table1.snapshots()).hasSize(2); + Snapshot latestSnapshot = Iterables.getLast(table1.snapshots()); + assertThat(latestSnapshot.summary()) + .containsAllEntriesOf( + (Map) + ImmutableMap.builder() + .put("replace-partitions", "true") + .put("added-data-files", "1") + .put("added-records", "42") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", String.valueOf(checkpointId + 1)) + .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "0") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "0") + .put("total-records", "42") + .build()); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java new file mode 100644 index 000000000000..137b87bb171d --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class TestDynamicWriteResultAggregator { + + @RegisterExtension + static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); + + @Test + void testAggregator() throws Exception { + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { + testHarness.open(); + + WriteTarget writeTarget1 = + new WriteTarget("table", "branch", 42, 0, true, Lists.newArrayList()); + DynamicWriteResult dynamicWriteResult1 = + new DynamicWriteResult(writeTarget1, WriteResult.builder().build()); + WriteTarget writeTarget2 = + new WriteTarget("table2", "branch", 42, 0, true, Lists.newArrayList(1, 2)); + DynamicWriteResult dynamicWriteResult2 = + new DynamicWriteResult(writeTarget2, WriteResult.builder().build()); + + CommittableWithLineage committable1 = + new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); + StreamRecord> record1 = + new StreamRecord<>(committable1); + testHarness.processElement(record1); + CommittableWithLineage committable2 = + new CommittableWithLineage<>(dynamicWriteResult2, 0, 0); + StreamRecord> record2 = + new StreamRecord<>(committable2); + testHarness.processElement(record2); + + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.prepareSnapshotPreBarrier(1L); + // Contains a CommittableSummary + DynamicCommittable + assertThat(testHarness.getRecordOutput()).hasSize(3); + + testHarness.prepareSnapshotPreBarrier(2L); + // Only contains a CommittableSummary + assertThat(testHarness.getRecordOutput()).hasSize(4); + } + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java new file mode 100644 index 000000000000..bede5d42b9f4 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +class TestDynamicWriteResultSerializer { + + private static final DataFile DATA_FILE = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-1.parquet") + .withFileSizeInBytes(0) + .withMetrics( + new Metrics( + 42L, + null, + ImmutableMap.of(1, 5L), + ImmutableMap.of(1, 0L), + null, + ImmutableMap.of(1, ByteBuffer.allocate(1)), + ImmutableMap.of(1, ByteBuffer.allocate(1)))) + .build(); + + @Test + void testRoundtrip() throws IOException { + DynamicWriteResult dynamicWriteResult = + new DynamicWriteResult( + new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + WriteResult.builder().addDataFiles(DATA_FILE).build()); + + DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); + DynamicWriteResult copy = + serializer.deserialize(serializer.getVersion(), serializer.serialize(dynamicWriteResult)); + + assertThat(copy.writeResult().dataFiles()).hasSize(1); + DataFile dataFile = copy.writeResult().dataFiles()[0]; + // DataFile doesn't implement equals, but we can still do basic checks + assertThat(dataFile.path()).isEqualTo("/path/to/data-1.parquet"); + assertThat(dataFile.recordCount()).isEqualTo(42L); + } + + @Test + void testUnsupportedVersion() throws IOException { + DynamicWriteResult dynamicWriteResult = + new DynamicWriteResult( + new WriteTarget("table", "branch", 42, 23, false, Lists.newArrayList(1, 2)), + WriteResult.builder().addDataFiles(DATA_FILE).build()); + + DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(dynamicWriteResult))) + .hasMessage("Unrecognized version or corrupt state: -1") + .isInstanceOf(IOException.class); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java new file mode 100644 index 000000000000..0a723c9d5700 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.File; +import java.net.URI; +import java.util.Collection; +import java.util.Map; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +class TestDynamicWriter extends TestFlinkIcebergSinkBase { + + private static final TableIdentifier TABLE1 = TableIdentifier.of("myTable1"); + private static final TableIdentifier TABLE2 = TableIdentifier.of("myTable2"); + + @Test + void testDynamicWriter() throws Exception { + Catalog catalog = CATALOG_EXTENSION.catalog(); + Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA); + Table table2 = catalog.createTable(TABLE2, SimpleDataUtil.SCHEMA); + + DynamicWriter dynamicWriter = createDynamicWriter(catalog); + + DynamicRecordInternal record1 = getDynamicRecordInternal(table1); + DynamicRecordInternal record2 = getDynamicRecordInternal(table2); + + assertThat(getNumDataFiles(table1)).isEqualTo(0); + + dynamicWriter.write(record1, null); + dynamicWriter.write(record2, null); + Collection writeResults = dynamicWriter.prepareCommit(); + + assertThat(writeResults.size()).isEqualTo(2); + assertThat(getNumDataFiles(table1)).isEqualTo(1); + assertThat( + dynamicWriter + .getMetrics() + .writerMetrics(TABLE1.name()) + .getFlushedDataFiles() + .getCount()) + .isEqualTo(1); + assertThat( + dynamicWriter + .getMetrics() + .writerMetrics(TABLE2.name()) + .getFlushedDataFiles() + .getCount()) + .isEqualTo(1); + + WriteResult wr1 = writeResults.iterator().next().writeResult(); + assertThat(wr1.dataFiles().length).isEqualTo(1); + assertThat(wr1.dataFiles()[0].format()).isEqualTo(FileFormat.PARQUET); + assertThat(wr1.deleteFiles()).isEmpty(); + + dynamicWriter.write(record1, null); + dynamicWriter.write(record2, null); + writeResults = dynamicWriter.prepareCommit(); + + assertThat(writeResults.size()).isEqualTo(2); + assertThat(getNumDataFiles(table1)).isEqualTo(2); + assertThat( + dynamicWriter + .getMetrics() + .writerMetrics(TABLE1.name()) + .getFlushedDataFiles() + .getCount()) + .isEqualTo(2); + assertThat( + dynamicWriter + .getMetrics() + .writerMetrics(TABLE2.name()) + .getFlushedDataFiles() + .getCount()) + .isEqualTo(2); + + WriteResult wr2 = writeResults.iterator().next().writeResult(); + assertThat(wr2.dataFiles().length).isEqualTo(1); + assertThat(wr2.dataFiles()[0].format()).isEqualTo(FileFormat.PARQUET); + assertThat(wr2.deleteFiles()).isEmpty(); + + dynamicWriter.close(); + } + + @Test + void testDynamicWriterUpsert() throws Exception { + Catalog catalog = CATALOG_EXTENSION.catalog(); + DynamicWriter dyamicWriter = createDynamicWriter(catalog); + Table table1 = CATALOG_EXTENSION.catalog().createTable(TABLE1, SimpleDataUtil.SCHEMA); + + DynamicRecordInternal record = getDynamicRecordInternal(table1); + record.setUpsertMode(true); + record.setEqualityFieldIds(Lists.newArrayList(1)); + + dyamicWriter.write(record, null); + dyamicWriter.prepareCommit(); + + assertThat( + dyamicWriter + .getMetrics() + .writerMetrics(TABLE1.name()) + .getFlushedDeleteFiles() + .getCount()) + .isEqualTo(1); + assertThat( + dyamicWriter.getMetrics().writerMetrics(TABLE1.name()).getFlushedDataFiles().getCount()) + .isEqualTo(1); + } + + @Test + void testDynamicWriterUpsertNoEqualityFields() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + DynamicWriter dyamicWriter = createDynamicWriter(catalog); + Table table1 = CATALOG_EXTENSION.catalog().createTable(TABLE1, SimpleDataUtil.SCHEMA); + + DynamicRecordInternal record = getDynamicRecordInternal(table1); + record.setUpsertMode(true); + + assertThatThrownBy(() -> dyamicWriter.write(record, null)) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "Equality field columns shouldn't be empty when configuring to use UPSERT data."); + } + + private static @NotNull DynamicWriter createDynamicWriter(Catalog catalog) { + DynamicWriter dynamicWriter = + new DynamicWriter( + catalog, + FileFormat.PARQUET, + 1024L, + Map.of(), + 100, + new DynamicWriterMetrics(new UnregisteredMetricsGroup()), + 0, + 0); + return dynamicWriter; + } + + private static @NotNull DynamicRecordInternal getDynamicRecordInternal(Table table1) { + DynamicRecordInternal record = new DynamicRecordInternal(); + record.setTableName(TableIdentifier.parse(table1.name()).name()); + record.setSchema(table1.schema()); + record.setSpec(table1.spec()); + record.setRowData(SimpleDataUtil.createRowData(1, "test")); + return record; + } + + private static int getNumDataFiles(Table table) { + File dataDir = new File(URI.create(table.location()).getPath(), "data"); + if (dataDir.exists()) { + return dataDir.listFiles((dir, name) -> !name.startsWith(".")).length; + } + return 0; + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 720a0a582e94..70a512e7c7da 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -38,7 +38,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -public class TestTableSerializerCache { +class TestTableSerializerCache { @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table");