Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105))
* [IcebergIO] Address edge case where bundle retry following a successful data commit results in data duplication ([#34264](https://github.com/apache/beam/pull/34264))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,12 @@ public void processElement(
BoundedWindow window)
throws IOException {
String tableStringIdentifier = element.getKey();
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (!fileWriteResults.iterator().hasNext()) {
if (shouldSkip(table, fileWriteResults)) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
// data into manifest files and append.
Expand Down Expand Up @@ -211,5 +210,35 @@ private ManifestWriter<DataFile> createManifestWriter(
tableLocation, manifestFilePrefix, uuid, spec.specId()));
return ManifestFiles.write(spec, io.newOutputFile(location));
}

// If the process call fails immediately after a successful commit, it gets retried with
// the same data, possibly leading to data duplication.
// To mitigate, we skip the current batch of files if it matches the most recently committed
// batch.
//
// TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the
// "last successful snapshot" might reflect commits from other sources. Ideally, we would make
// this stateful, but that is update incompatible.
// TODO(ahmedabu98): add load test pipelines with intentional periodic crashing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not feasible to have a meaningful test for this without Dataflow

private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResults) {
if (table.currentSnapshot() == null) {
return false;
}
if (!fileWriteResults.iterator().hasNext()) {
return true;
}

// Check if the current batch is identical to the most recently committed batch.
// Upstream GBK means we always get the same batch of files on retry,
// so a single overlapping file means the whole batch is identical.
String sampleCommittedDataFilePath =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString();
for (FileWriteResult result : fileWriteResults) {
if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) {
return true;
}
}
return false;
}
}
}
Loading