-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Fail publish_changes procedure if there's more than one matching snapshot #14955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Spark: Fail publish_changes procedure if there's more than one matching snapshot #14955
Conversation
0699a5f to
7ed4b84
Compare
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
Outdated
Show resolved
Hide resolved
| throw new ValidationException( | ||
| "Cannot apply non-unique WAP ID. Found %d snapshots with WAP ID '%s'", | ||
| numMatchingSnapshots, wapId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should not allow this situation like 2 snapshots with same wap id to happen in the first place, during the snapshot creating time ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion here, but this might be considered a more significant / potentially breaking change? Technically having a duplicate WAP ID doesn't cause any problems until they are cherry-picked into main.
Do you think there might be legitimate uses for staging multiple changes under the same WAP ID? For example:
- staging multiple changes, evaluating all of them separately and then deleting all but one before committing.
- creating staged snapshots which are never intended to be published (for testing / evaluation / etc)
I am not super familiar with the original designs behind WAP in iceberg, I'll look through older commits to see if there's any mention of a uniqueness constraint.
| Iterable<Snapshot> wapSnapshots = | ||
| Iterables.filter( | ||
| table.snapshots(), snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about a case where a wap id is reused ? but at a given there is only staged wap-id ? prev snapshot with that wap id was published ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will still fail with a DuplicateWAPCommitException during the cherry pick operation, originating from here:
iceberg/core/src/main/java/org/apache/iceberg/util/WapUtil.java
Lines 42 to 60 in 0004600
| /** | |
| * Check if a given staged snapshot's associated wap-id was already published. Does not fail for | |
| * non-WAP workflows. | |
| * | |
| * @param current the current {@link TableMetadata metadata} for the target table | |
| * @param wapSnapshotId a snapshot id which could have been staged and is associated with a wap id | |
| * @return the WAP ID that will be published, if the snapshot has one | |
| */ | |
| public static String validateWapPublish(TableMetadata current, long wapSnapshotId) { | |
| Snapshot cherryPickSnapshot = current.snapshot(wapSnapshotId); | |
| String wapId = stagedWapId(cherryPickSnapshot); | |
| if (wapId != null && !wapId.isEmpty()) { | |
| if (WapUtil.isWapIdPublished(current, wapId)) { | |
| throw new DuplicateWAPCommitException(wapId); | |
| } | |
| } | |
| return wapId; | |
| } |
7ed4b84 to
df314e9
Compare
Closes: #14953 - see this issue for a larger description and reproduction.
Its assumed that
wap.idwill be unique among snapshots, but this doesn't appear to be enforced anywhere which can lead to unexpected results when only the first write is actually published.This PR updates the
publish_changesprocedure to fail when multiple matching snapshots are identified.If this change is approved I will backport it to the other spark versions.