From df314e9b4146aab85ce7dedb6d3646e69cfabb95 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Fri, 2 Jan 2026 14:27:40 -0800 Subject: [PATCH] Fail publish_changes procedure if there's multiple matching snapshots --- .../TestPublishChangesProcedure.java | 19 ++++++++++ .../procedures/PublishChangesProcedure.java | 36 ++++++++++--------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java index 4958fde15d55..93a37956579c 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java @@ -159,6 +159,25 @@ public void testApplyInvalidWapId() { .hasMessage("Cannot apply unknown WAP ID 'not_valid'"); } + @TestTemplate + public void testApplyDuplicateWapId() { + + String wapId = "wap_id_1"; + + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set("spark.wap.id", wapId); + + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + assertThatThrownBy( + () -> sql("CALL %s.system.publish_changes('%s', '%s')", catalogName, tableIdent, wapId)) + .isInstanceOf(ValidationException.class) + .hasMessage("Cannot apply non-unique WAP ID. Found 2 snapshots with WAP ID 'wap_id_1'"); + } + @TestTemplate public void testInvalidApplyWapChangesCases() { assertThatThrownBy( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java index 874888204334..788657609a2f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.procedures; import java.util.Iterator; -import java.util.Optional; import org.apache.iceberg.Snapshot; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -97,23 +96,26 @@ public Iterator call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - Optional wapSnapshot = - Optional.ofNullable( - Iterables.find( - table.snapshots(), - snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)), - null)); - if (!wapSnapshot.isPresent()) { - throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId); + Iterable wapSnapshots = + Iterables.filter( + table.snapshots(), snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot))); + + int numMatchingSnapshots = Iterables.size(wapSnapshots); + + switch (numMatchingSnapshots) { + case 0: + throw new ValidationException("Cannot apply unknown WAP ID '%s'", wapId); + case 1: + long wapSnapshotId = Iterables.getOnlyElement(wapSnapshots).snapshotId(); + table.manageSnapshots().cherrypick(wapSnapshotId).commit(); + Snapshot currentSnapshot = table.currentSnapshot(); + InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); + return asScanIterator(OUTPUT_TYPE, outputRow); + default: + throw new ValidationException( + "Cannot apply non-unique WAP ID. Found %d snapshots with WAP ID '%s'", + numMatchingSnapshots, wapId); } - - long wapSnapshotId = wapSnapshot.get().snapshotId(); - table.manageSnapshots().cherrypick(wapSnapshotId).commit(); - - Snapshot currentSnapshot = table.currentSnapshot(); - - InternalRow outputRow = newInternalRow(wapSnapshotId, currentSnapshot.snapshotId()); - return asScanIterator(OUTPUT_TYPE, outputRow); }); }