From 883b7be692f32f5d48b68c1a7be0ae2a8c385776 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 12 Mar 2025 11:38:18 -0400 Subject: [PATCH 1/4] remove already committed files --- .../IO_Iceberg_Integration_Tests.json | 2 +- .../sdk/io/iceberg/AppendFilesToTables.java | 34 +++++++++++++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index fed72a381d5e..5e4bf9492e11 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.metrics.Counter; @@ -38,6 +39,7 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -128,13 +130,13 @@ public void processElement( BoundedWindow window) throws IOException { String tableStringIdentifier = element.getKey(); - Iterable fileWriteResults = element.getValue(); + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + Iterable fileWriteResults = + removeAlreadyCommittedFiles(table, element.getValue()); if (!fileWriteResults.iterator().hasNext()) { return; } - Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); - // vast majority of the time, we will simply append data files. // in the rare case we get a batch that contains multiple partition specs, we will group // data into manifest files and append. @@ -211,5 +213,31 @@ private ManifestWriter createManifestWriter( tableLocation, manifestFilePrefix, uuid, spec.specId())); return ManifestFiles.write(spec, io.newOutputFile(location)); } + + // If bundle fails following a successful commit and gets retried, it may attempt to re-commit + // the same data. + // To mitigate, we check the files in this bundle and remove anything that was already + // committed in the last successful snapshot. + // + // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the + // "last successful snapshot" might reflect commits from other sources. Ideally, we would make + // this stateful, but that is update incompatible. + // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing + private Iterable removeAlreadyCommittedFiles( + Table table, Iterable fileWriteResults) { + if (table.currentSnapshot() == null) { + return fileWriteResults; + } + + List committedFiles = + Streams.stream(table.currentSnapshot().addedDataFiles(table.io())) + .map(DataFile::path) + .map(CharSequence::toString) + .collect(Collectors.toList()); + + return Streams.stream(fileWriteResults) + .filter(f -> !committedFiles.contains(f.getSerializableDataFile().getPath())) + .collect(Collectors.toList()); + } } } From 5bf7d4e53b4883d2c7c6e7d4af648119f3afc2f5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 12 Mar 2025 11:42:27 -0400 Subject: [PATCH 2/4] changes --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index c75a42d25945..43702738ae25 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105)) +* [IcebergIO] Address edge case where bundle retry following a successful data commit results in data duplication ([#34264](https://github.com/apache/beam/pull/34264)) ## New Features / Improvements From 04154f54312f07187decedcae8d53279bd2adbb1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 20 Mar 2025 08:20:21 -0400 Subject: [PATCH 3/4] simplify validation --- .../sdk/io/iceberg/AppendFilesToTables.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 5e4bf9492e11..12387994c22e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.KvCoder; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; @@ -131,9 +133,8 @@ public void processElement( throws IOException { String tableStringIdentifier = element.getKey(); Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); - Iterable fileWriteResults = - removeAlreadyCommittedFiles(table, element.getValue()); - if (!fileWriteResults.iterator().hasNext()) { + Iterable fileWriteResults = element.getValue(); + if (shouldSkip(table, fileWriteResults)) { return; } @@ -214,30 +215,35 @@ private ManifestWriter createManifestWriter( return ManifestFiles.write(spec, io.newOutputFile(location)); } - // If bundle fails following a successful commit and gets retried, it may attempt to re-commit - // the same data. - // To mitigate, we check the files in this bundle and remove anything that was already - // committed in the last successful snapshot. + // If the process call fails immediately after a successful commit, it gets retried with + // the same data, possibly leading to data duplication. + // To mitigate, we skip the current batch of files if it matches the most recently committed + // batch. // // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the // "last successful snapshot" might reflect commits from other sources. Ideally, we would make // this stateful, but that is update incompatible. // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing - private Iterable removeAlreadyCommittedFiles( - Table table, Iterable fileWriteResults) { + private boolean shouldSkip(Table table, Iterable fileWriteResults) { if (table.currentSnapshot() == null) { - return fileWriteResults; + return false; + } + if (!fileWriteResults.iterator().hasNext()) { + return true; } - List committedFiles = + Set filesCommittedLastSnapshot = Streams.stream(table.currentSnapshot().addedDataFiles(table.io())) .map(DataFile::path) .map(CharSequence::toString) - .collect(Collectors.toList()); - - return Streams.stream(fileWriteResults) - .filter(f -> !committedFiles.contains(f.getSerializableDataFile().getPath())) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); + + // Check if the current batch is identical to the most recently committed batch. + // Upstream GBK means we always get the same batch of files on retry, + // so a single overlapping file means the whole batch is identical. + return Iterables.size(fileWriteResults) == filesCommittedLastSnapshot.size() + && filesCommittedLastSnapshot.contains( + fileWriteResults.iterator().next().getSerializableDataFile().getPath()); } } } From c230e353c822101c4e741fe25794427cc25859b5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 20 Mar 2025 10:44:32 -0400 Subject: [PATCH 4/4] validate without loading the whole collection into memory --- .../sdk/io/iceberg/AppendFilesToTables.java | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 12387994c22e..024e0336e67d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -22,9 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.metrics.Counter; @@ -40,8 +38,6 @@ import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -232,18 +228,17 @@ private boolean shouldSkip(Table table, Iterable fileWriteResul return true; } - Set filesCommittedLastSnapshot = - Streams.stream(table.currentSnapshot().addedDataFiles(table.io())) - .map(DataFile::path) - .map(CharSequence::toString) - .collect(Collectors.toSet()); - // Check if the current batch is identical to the most recently committed batch. // Upstream GBK means we always get the same batch of files on retry, // so a single overlapping file means the whole batch is identical. - return Iterables.size(fileWriteResults) == filesCommittedLastSnapshot.size() - && filesCommittedLastSnapshot.contains( - fileWriteResults.iterator().next().getSerializableDataFile().getPath()); + String sampleCommittedDataFilePath = + table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString(); + for (FileWriteResult result : fileWriteResults) { + if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) { + return true; + } + } + return false; } } }