diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index c75a42d25945..43702738ae25 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105)) +* [IcebergIO] Address edge case where bundle retry following a successful data commit results in data duplication ([#34264](https://github.com/apache/beam/pull/34264)) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index fed72a381d5e..024e0336e67d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -128,13 +128,12 @@ public void processElement( BoundedWindow window) throws IOException { String tableStringIdentifier = element.getKey(); + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); Iterable fileWriteResults = element.getValue(); - if (!fileWriteResults.iterator().hasNext()) { + if (shouldSkip(table, fileWriteResults)) { return; } - Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); - // vast majority of the time, we will simply append data files. // in the rare case we get a batch that contains multiple partition specs, we will group // data into manifest files and append. @@ -211,5 +210,35 @@ private ManifestWriter createManifestWriter( tableLocation, manifestFilePrefix, uuid, spec.specId())); return ManifestFiles.write(spec, io.newOutputFile(location)); } + + // If the process call fails immediately after a successful commit, it gets retried with + // the same data, possibly leading to data duplication. + // To mitigate, we skip the current batch of files if it matches the most recently committed + // batch. + // + // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the + // "last successful snapshot" might reflect commits from other sources. Ideally, we would make + // this stateful, but that is update incompatible. + // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing + private boolean shouldSkip(Table table, Iterable fileWriteResults) { + if (table.currentSnapshot() == null) { + return false; + } + if (!fileWriteResults.iterator().hasNext()) { + return true; + } + + // Check if the current batch is identical to the most recently committed batch. + // Upstream GBK means we always get the same batch of files on retry, + // so a single overlapping file means the whole batch is identical. + String sampleCommittedDataFilePath = + table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString(); + for (FileWriteResult result : fileWriteResults) { + if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) { + return true; + } + } + return false; + } } }