From 3521de583d7f82b67ec2aa0054b8cda6acadae05 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 5 Apr 2022 22:09:01 -0700 Subject: [PATCH] Core, API: Add snapshot reference create, replace, rename, remove, and fastForward operations --- .../org/apache/iceberg/ManageSnapshots.java | 117 +++++ .../org/apache/iceberg/BaseTransaction.java | 8 + .../org/apache/iceberg/MetadataUpdate.java | 42 +- .../org/apache/iceberg/SnapshotManager.java | 93 ++++ .../org/apache/iceberg/TableMetadata.java | 65 ++- .../UpdateSnapshotReferencesOperation.java | 210 ++++++++ .../org/apache/iceberg/util/SnapshotUtil.java | 12 + .../apache/iceberg/TestSnapshotManager.java | 449 +++++++++++++++++- 8 files changed, 974 insertions(+), 22 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java index b3d98ed3052c..b534711d6500 100644 --- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java @@ -80,4 +80,121 @@ public interface ManageSnapshots extends PendingUpdate { * wapId */ ManageSnapshots cherrypick(long snapshotId); + + /** + * Create a new branch pointing to the given snapshot id. + * + * @param name branch name + * @param snapshotId id of the snapshot which will be the head of the branch + * @return this for method chaining + * @throws IllegalArgumentException if a branch with the given name already exists + */ + ManageSnapshots createBranch(String name, long snapshotId); + + /** + * Create a new tag pointing to the given snapshot id + * + * @param name tag name + * @param snapshotId snapshotId for the head of the new branch. + * @return this for method chaining + * @throws IllegalArgumentException if a tag with the given name already exists + */ + ManageSnapshots createTag(String name, long snapshotId); + + /** + * Remove a branch by name + * + * @param name branch name + * @return this for method chaining + * @throws IllegalArgumentException if the branch does not exist + */ + ManageSnapshots removeBranch(String name); + + /** + * Rename a branch + * + * @param name name of branch to rename + * @param newName the desired new name of the branch + * @throws IllegalArgumentException if the branch to rename does not exist or if there is already a branch + * with the same name as the desired new name. + */ + ManageSnapshots renameBranch(String name, String newName); + + /** + * Remove the tag with the given name. + * + * @param name tag name + * @return this for method chaining + * @throws IllegalArgumentException if the branch does not exist + */ + ManageSnapshots removeTag(String name); + + /** + * Replaces the tag with the given name to point to the specified snapshot. + * + * @param name Tag to replace + * @param snapshotId new snapshot id for the given tag + * @return this for method chaining + */ + ManageSnapshots replaceTag(String name, long snapshotId); + + /** + * Replaces the branch with the given name to point to the specified snapshot + * + * @param name Branch to replace + * @param snapshotId new snapshot id for the given branch + * @return this for method chaining + */ + ManageSnapshots replaceBranch(String name, long snapshotId); + + /** + * Replaces the branch with the given name to point to the source snapshot. + * The source branch will remain unchanged, the target branch will retain its retention properties. + * + * @param name Branch to replace + * @param source Source reference for the target to be replaced with + * @return this for method chaining + */ + ManageSnapshots replaceBranch(String name, String source); + + /** + * Performs a fast-forward of the given target branch up to the source snapshot if target is an ancestor of source. + * The source branch will remain unchanged, the target branch will retain its retention properties. + * + * @param name Branch to fast-forward + * @param source Source reference for the target to be fast forwarded to + * @return this for method chaining + * @throws IllegalArgumentException if the target branch is not an ancestor of source + */ + ManageSnapshots fastForwardBranch(String name, String source); + + /** + * Updates the minimum number of snapshots to keep for a branch. + * + * @param branchName branch name + * @param minSnapshotsToKeep minimum number of snapshots to retain on the branch + * @return this for method chaining + * @throws IllegalArgumentException if the branch does not exist + */ + ManageSnapshots setMinSnapshotsToKeep(String branchName, int minSnapshotsToKeep); + + /** + * Updates the max snapshot age for a branch. + * + * @param branchName branch name + * @param maxSnapshotAgeMs maximum snapshot age in milliseconds to retain on branch + * @return this for method chaining + * @throws IllegalArgumentException if the branch does not exist + */ + ManageSnapshots setMaxSnapshotAgeMs(String branchName, long maxSnapshotAgeMs); + + /** + * Updates the retention policy for a reference. + * + * @param name branch name + * @param maxRefAgeMs retention age in milliseconds of the tag reference itself + * @return this for method chaining + * @throws IllegalArgumentException if the reference does not exist + */ + ManageSnapshots setMaxRefAgeMs(String name, long maxRefAgeMs); } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index e63f7bf7e741..422bc9342224 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -238,6 +238,14 @@ SetSnapshotOperation setBranchSnapshot() { return set; } + UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation() { + checkLastOperationCommitted("UpdateSnapshotReferencesOperation"); + UpdateSnapshotReferencesOperation manageSnapshotRefOperation = + new UpdateSnapshotReferencesOperation(transactionOps); + updates.add(manageSnapshotRefOperation); + return manageSnapshotRefOperation; + } + @Override public void commitTransaction() { Preconditions.checkState(hasLastOpCommitted, diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index dba7a9a4f1e0..59c44b55ba54 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.io.Serializable; +import java.util.Locale; import java.util.Map; import java.util.Set; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -227,31 +228,60 @@ public String name() { @Override public void applyTo(TableMetadata.Builder metadataBuilder) { - // TODO: this should be generalized when tagging is supported - metadataBuilder.removeBranch(name); + metadataBuilder.removeRef(name); } } class SetSnapshotRef implements MetadataUpdate { private final String name; - private final long snapshotId; - - public SetSnapshotRef(String name, long snapshotId) { + private final Long snapshotId; + private final SnapshotRefType type; + private Integer minSnapshotsToKeep; + private Long maxSnapshotAgeMs; + private Long maxRefAgeMs; + + public SetSnapshotRef(String name, Long snapshotId, SnapshotRefType type, Integer minSnapshotsToKeep, + Long maxSnapshotAgeMs, Long maxRefAgeMs) { this.name = name; this.snapshotId = snapshotId; + this.type = type; + this.minSnapshotsToKeep = minSnapshotsToKeep; + this.maxSnapshotAgeMs = maxSnapshotAgeMs; + this.maxRefAgeMs = maxRefAgeMs; } public String name() { return name; } + public String type() { + return type.name().toLowerCase(Locale.ROOT); + } + public long snapshotId() { return snapshotId; } + public Integer minSnapshotsToKeep() { + return minSnapshotsToKeep; + } + + public Long maxSnapshotAgeMs() { + return maxSnapshotAgeMs; + } + + public Long maxRefAgeMs() { + return maxRefAgeMs; + } + @Override public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.setBranchSnapshot(snapshotId, name); + SnapshotRef ref = SnapshotRef.builderFor(snapshotId, type) + .minSnapshotsToKeep(minSnapshotsToKeep) + .maxSnapshotAgeMs(maxSnapshotAgeMs) + .maxRefAgeMs(maxRefAgeMs) + .build(); + metadataBuilder.setRef(name, ref); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java index 2911c3add4b5..60b9d34b9d6b 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java @@ -24,6 +24,7 @@ public class SnapshotManager implements ManageSnapshots { private final BaseTransaction transaction; + private UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation; SnapshotManager(String tableName, TableOperations ops) { Preconditions.checkState(ops.current() != null, "Cannot manage snapshots: table %s does not exist", tableName); @@ -32,28 +33,119 @@ public class SnapshotManager implements ManageSnapshots { @Override public ManageSnapshots cherrypick(long snapshotId) { + commitIfRefUpdatesExist(); transaction.cherryPick().cherrypick(snapshotId).commit(); return this; } @Override public ManageSnapshots setCurrentSnapshot(long snapshotId) { + commitIfRefUpdatesExist(); transaction.setBranchSnapshot().setCurrentSnapshot(snapshotId).commit(); return this; } @Override public ManageSnapshots rollbackToTime(long timestampMillis) { + commitIfRefUpdatesExist(); transaction.setBranchSnapshot().rollbackToTime(timestampMillis).commit(); return this; } @Override public ManageSnapshots rollbackTo(long snapshotId) { + commitIfRefUpdatesExist(); transaction.setBranchSnapshot().rollbackTo(snapshotId).commit(); return this; } + @Override + public ManageSnapshots createBranch(String name, long snapshotId) { + updateSnapshotReferencesOperation().createBranch(name, snapshotId); + return this; + } + + @Override + public ManageSnapshots createTag(String name, long snapshotId) { + updateSnapshotReferencesOperation().createTag(name, snapshotId); + return this; + } + + @Override + public ManageSnapshots removeBranch(String name) { + updateSnapshotReferencesOperation().removeBranch(name); + return this; + } + + @Override + public ManageSnapshots removeTag(String name) { + updateSnapshotReferencesOperation().removeTag(name); + return this; + } + + @Override + public ManageSnapshots setMinSnapshotsToKeep(String name, int minSnapshotsToKeep) { + updateSnapshotReferencesOperation().setMinSnapshotsToKeep(name, minSnapshotsToKeep); + return this; + } + + @Override + public ManageSnapshots setMaxSnapshotAgeMs(String name, long maxSnapshotAgeMs) { + updateSnapshotReferencesOperation().setMaxSnapshotAgeMs(name, maxSnapshotAgeMs); + return this; + } + + @Override + public ManageSnapshots setMaxRefAgeMs(String name, long maxRefAgeMs) { + updateSnapshotReferencesOperation().setMaxRefAgeMs(name, maxRefAgeMs); + return this; + } + + @Override + public ManageSnapshots replaceTag(String name, long snapshotId) { + updateSnapshotReferencesOperation().replaceTag(name, snapshotId); + return this; + } + + @Override + public ManageSnapshots replaceBranch(String name, long snapshotId) { + updateSnapshotReferencesOperation().replaceBranch(name, snapshotId); + return this; + } + + @Override + public ManageSnapshots replaceBranch(String name, String source) { + updateSnapshotReferencesOperation().replaceBranch(name, source); + return this; + } + + @Override + public ManageSnapshots fastForwardBranch(String name, String source) { + updateSnapshotReferencesOperation().fastForward(name, source); + return this; + } + + @Override + public ManageSnapshots renameBranch(String name, String newName) { + updateSnapshotReferencesOperation().renameBranch(name, newName); + return this; + } + + private UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation() { + if (updateSnapshotReferencesOperation == null) { + this.updateSnapshotReferencesOperation = transaction.updateSnapshotReferencesOperation(); + } + + return updateSnapshotReferencesOperation; + } + + private void commitIfRefUpdatesExist() { + if (updateSnapshotReferencesOperation != null) { + updateSnapshotReferencesOperation.commit(); + updateSnapshotReferencesOperation = null; + } + } + @Override public Snapshot apply() { return transaction.table().currentSnapshot(); @@ -61,6 +153,7 @@ public Snapshot apply() { @Override public void commit() { + commitIfRefUpdatesExist(); transaction.commitTransaction(); } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index e08755cbf1d1..b93605c5254b 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -612,7 +612,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update return new Builder(this) .upgradeFormatVersion(newFormatVersion) - .removeBranch(SnapshotRef.MAIN_BRANCH) + .removeRef(SnapshotRef.MAIN_BRANCH) .setCurrentSchema(freshSchema, newLastColumnId.get()) .setDefaultPartitionSpec(freshSpec) .setDefaultSortOrder(freshSortOrder) @@ -1019,6 +1019,55 @@ public Builder setBranchSnapshot(long snapshotId, String branch) { return this; } + public Builder setRef(String name, SnapshotRef ref) { + SnapshotRef existingRef = refs.get(name); + if (existingRef != null && existingRef.equals(ref)) { + return this; + } + + long snapshotId = ref.snapshotId(); + Snapshot snapshot = snapshotsById.get(snapshotId); + ValidationException.check(snapshot != null, "Cannot set %s to unknown snapshot: %s", name, snapshotId); + if (isAddedSnapshot(snapshotId)) { + this.lastUpdatedMillis = snapshot.timestampMillis(); + } + + if (SnapshotRef.MAIN_BRANCH.equals(name)) { + this.currentSnapshotId = ref.snapshotId(); + if (lastUpdatedMillis == null) { + this.lastUpdatedMillis = System.currentTimeMillis(); + } + + snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, ref.snapshotId())); + } + + refs.put(name, ref); + MetadataUpdate.SetSnapshotRef refUpdate = new MetadataUpdate.SetSnapshotRef( + name, ref.snapshotId(), ref.type(), ref.minSnapshotsToKeep(), ref.maxSnapshotAgeMs(), ref.maxRefAgeMs()); + changes.add(refUpdate); + return this; + } + + public Builder removeRef(String name) { + if (SnapshotRef.MAIN_BRANCH.equals(name)) { + this.currentSnapshotId = -1; + snapshotLog.clear(); + } + + SnapshotRef ref = refs.remove(name); + if (ref != null) { + changes.add(new MetadataUpdate.RemoveSnapshotRef(name)); + } + + return this; + } + + /** + * Removes the given branch + * + * @deprecated will be removed in 0.15.0. Use removeRef instead. + */ + @Deprecated public Builder removeBranch(String branch) { if (SnapshotRef.MAIN_BRANCH.equals(branch)) { this.currentSnapshotId = -1; @@ -1061,7 +1110,7 @@ public Builder removeSnapshots(Collection idsToRemove) { } } - danglingRefs.forEach(this::removeBranch); + danglingRefs.forEach(this::removeRef); return this; } @@ -1296,15 +1345,6 @@ private void setBranchSnapshotInternal(Snapshot snapshot, String branch) { "Last sequence number %s is less than existing snapshot sequence number %s", lastSequenceNumber, snapshot.sequenceNumber()); - // if the snapshot was added in this change set, use its timestamp - this.lastUpdatedMillis = isAddedSnapshot(snapshot.snapshotId()) ? - snapshot.timestampMillis() : System.currentTimeMillis(); - - if (SnapshotRef.MAIN_BRANCH.equals(branch)) { - this.currentSnapshotId = replacementSnapshotId; - snapshotLog.add(new SnapshotLogEntry(lastUpdatedMillis, replacementSnapshotId)); - } - SnapshotRef newRef; if (ref != null) { newRef = SnapshotRef.builderFrom(ref, replacementSnapshotId).build(); @@ -1312,8 +1352,7 @@ private void setBranchSnapshotInternal(Snapshot snapshot, String branch) { newRef = SnapshotRef.branchBuilder(replacementSnapshotId).build(); } - refs.put(branch, newRef); - changes.add(new MetadataUpdate.SetSnapshotRef(branch, replacementSnapshotId)); + setRef(branch, newRef); } private static List addPreviousFile( diff --git a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java new file mode 100644 index 000000000000..070d80a06f07 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SnapshotUtil; + +/** + * ToDo: Add SetSnapshotOperation operations such as setCurrentSnapshot, rollBackTime, rollbackTo + * to this class so that we can support those operations for refs. + */ +class UpdateSnapshotReferencesOperation implements PendingUpdate> { + + private final TableOperations ops; + private final Map updatedRefs; + private TableMetadata base; + + UpdateSnapshotReferencesOperation(TableOperations ops) { + this.ops = ops; + this.base = ops.current(); + this.updatedRefs = Maps.newHashMap(base.refs()); + } + + @Override + public Map apply() { + return updatedRefs; + } + + @Override + public void commit() { + TableMetadata updated = internalApply(); + ops.commit(base, updated); + } + + public UpdateSnapshotReferencesOperation createBranch(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef branch = SnapshotRef.branchBuilder(snapshotId).build(); + SnapshotRef existingRef = updatedRefs.put(name, branch); + Preconditions.checkArgument(existingRef == null, "Ref %s already exists", name); + return this; + } + + public UpdateSnapshotReferencesOperation createTag(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef tag = SnapshotRef.tagBuilder(snapshotId).build(); + SnapshotRef existingRef = updatedRefs.put(name, tag); + Preconditions.checkArgument(existingRef == null, "Ref %s already exists", name); + return this; + } + + public UpdateSnapshotReferencesOperation removeBranch(String name) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + Preconditions.checkArgument(!name.equals(SnapshotRef.MAIN_BRANCH), "Cannot remove main branch"); + SnapshotRef ref = updatedRefs.remove(name); + Preconditions.checkArgument(ref != null, "Branch does not exist: %s", name); + Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag not a branch", name); + return this; + } + + public UpdateSnapshotReferencesOperation removeTag(String name) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef ref = updatedRefs.remove(name); + Preconditions.checkArgument(ref != null, "Tag does not exist: %s", name); + Preconditions.checkArgument(ref.isTag(), "Ref %s is a branch not a tag", name); + return this; + } + + public UpdateSnapshotReferencesOperation renameBranch(String name, String newName) { + Preconditions.checkNotNull(name, "Branch to rename cannot be null"); + Preconditions.checkNotNull(newName, "New branch name cannot be null"); + Preconditions.checkArgument(!name.equals(SnapshotRef.MAIN_BRANCH), "Cannot rename main branch"); + SnapshotRef ref = updatedRefs.get(name); + Preconditions.checkArgument(ref != null, "Branch does not exist: %s", name); + Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag not a branch", name); + SnapshotRef existing = updatedRefs.put(newName, ref); + Preconditions.checkArgument(existing == null, "Ref %s already exists", newName); + updatedRefs.remove(name, ref); + return this; + } + + public UpdateSnapshotReferencesOperation replaceBranch(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = updatedRefs.get(name); + Preconditions.checkArgument(ref != null, "Branch does not exist: %s", name); + Preconditions.checkArgument(ref.isBranch(), "Ref %s is a tag not a branch", name); + SnapshotRef updatedRef = SnapshotRef.builderFrom(ref, snapshotId).build(); + updatedRefs.put(name, updatedRef); + return this; + } + + public UpdateSnapshotReferencesOperation replaceBranch(String name, String source) { + return replaceBranch(name, source, false); + } + + public UpdateSnapshotReferencesOperation fastForward(String name, String source) { + return replaceBranch(name, source, true); + } + + private UpdateSnapshotReferencesOperation replaceBranch(String name, String source, boolean fastForward) { + Preconditions.checkNotNull(name, "Target branch cannot be null"); + Preconditions.checkNotNull(source, "Source ref cannot be null"); + SnapshotRef sourceRef = updatedRefs.get(source); + SnapshotRef refToUpdate = updatedRefs.get(name); + Preconditions.checkArgument(refToUpdate != null, "Target branch does not exist: %s", name); + Preconditions.checkArgument(sourceRef != null, "Ref does not exist: %s", source); + Preconditions.checkArgument(refToUpdate.isBranch(), "Ref %s is a tag not a branch", name); + + // Nothing to replace + if (sourceRef.snapshotId() == refToUpdate.snapshotId()) { + return this; + } + + SnapshotRef updatedRef = SnapshotRef.builderFrom(refToUpdate, sourceRef.snapshotId()).build(); + + if (fastForward) { + boolean targetIsAncestor = SnapshotUtil.isAncestorOf(sourceRef.snapshotId(), + refToUpdate.snapshotId(), base::snapshot); + Preconditions.checkArgument(targetIsAncestor, + "Cannot fast-forward: %s is not an ancestor of %s", name, source); + } + + updatedRefs.put(name, updatedRef); + return this; + } + + public UpdateSnapshotReferencesOperation replaceTag(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef ref = updatedRefs.get(name); + Preconditions.checkArgument(ref != null, "Tag does not exist: %s", name); + Preconditions.checkArgument(ref.isTag(), "Ref %s is a branch not a tag", name); + SnapshotRef updatedRef = SnapshotRef.builderFrom(ref, snapshotId).build(); + updatedRefs.put(name, updatedRef); + return this; + } + + public UpdateSnapshotReferencesOperation setMinSnapshotsToKeep(String name, int minSnapshotsToKeep) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = updatedRefs.get(name); + Preconditions.checkArgument(ref != null, "Branch does not exist: %s", name); + SnapshotRef updateBranch = SnapshotRef.builderFrom(ref) + .minSnapshotsToKeep(minSnapshotsToKeep) + .build(); + updatedRefs.put(name, updateBranch); + return this; + } + + public UpdateSnapshotReferencesOperation setMaxSnapshotAgeMs(String name, long maxSnapshotAgeMs) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = updatedRefs.get(name); + Preconditions.checkArgument(ref != null, "Branch does not exist: %s", name); + SnapshotRef updateBranch = SnapshotRef.builderFrom(ref) + .maxSnapshotAgeMs(maxSnapshotAgeMs) + .build(); + updatedRefs.put(name, updateBranch); + return this; + } + + public UpdateSnapshotReferencesOperation setMaxRefAgeMs(String name, long maxRefAgeMs) { + Preconditions.checkNotNull(name, "Reference name cannot be null"); + SnapshotRef ref = updatedRefs.get(name); + Preconditions.checkArgument(ref != null, "Ref does not exist: %s", name); + SnapshotRef updatedRef = SnapshotRef.builderFrom(ref) + .maxRefAgeMs(maxRefAgeMs) + .build(); + updatedRefs.put(name, updatedRef); + return this; + } + + private TableMetadata internalApply() { + TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(base); + // Identify references which have been removed + Map currRefs = base.refs(); + for (Map.Entry currRefEntry : currRefs.entrySet()) { + if (!updatedRefs.containsKey(currRefEntry.getKey())) { + updatedBuilder.removeRef(currRefEntry.getKey()); + } + } + + // Identify references which have been created or updated. + for (Map.Entry newRefEntry : updatedRefs.entrySet()) { + final String name = newRefEntry.getKey(); + SnapshotRef currRef = currRefs.get(name); + SnapshotRef updatedRef = updatedRefs.get(name); + if (currRef == null || !currRef.equals(updatedRef)) { + updatedBuilder.setRef(name, updatedRef); + } + } + + return updatedBuilder.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 55bf27d61955..70e25e9314a5 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -52,6 +52,18 @@ public static boolean isAncestorOf(Table table, long snapshotId, long ancestorSn return false; } + /** + * Returns whether ancestorSnapshotId is an ancestor of snapshotId using the given lookup function. + */ + public static boolean isAncestorOf(long snapshotId, long ancestorSnapshotId, Function lookup) { + for (Snapshot snapshot : ancestorsOf(snapshotId, lookup)) { + if (snapshot.snapshotId() == ancestorSnapshotId) { + return true; + } + } + return false; + } + /** * Returns whether ancestorSnapshotId is an ancestor of the table's current state. */ diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java index 5399cfc44ff6..a9b231f36bee 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java @@ -208,9 +208,7 @@ public void testCherryPickFromBranch() { // pick the snapshot into the current state AssertHelpers.assertThrows("Should reject partition replacement when a partition has been modified", ValidationException.class, "Cannot cherry-pick overwrite not based on an ancestor of the current state", - () -> table.manageSnapshots() - .cherrypick(replaceSnapshotId) - .commit()); + () -> table.manageSnapshots().cherrypick(replaceSnapshotId).commit()); Assert.assertEquals("Failed cherry-pick should not change the table state", lastSnapshotId, table.currentSnapshot().snapshotId()); @@ -250,4 +248,449 @@ public void testCherryPickOverwrite() { lastSnapshotId, table.currentSnapshot().snapshotId()); validateTableFiles(table, FILE_A, FILE_B); } + + @Test + public void testCreateBranch() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + // Test a basic case of creating a branch + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + SnapshotRef expectedBranch = table.ops().refresh().ref("branch1"); + Assert.assertTrue(expectedBranch != null && + expectedBranch.equals(SnapshotRef.branchBuilder(snapshotId).build())); + } + + @Test + public void testCreateBranchFailsWhenRefAlreadyExists() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + // Trying to create a branch with an existing name should fail + AssertHelpers.assertThrows("Creating branch which already exists should fail", + IllegalArgumentException.class, "Ref branch1 already exists", + () -> table.manageSnapshots().createBranch("branch1", snapshotId).commit()); + + // Trying to create another branch within the same chain + AssertHelpers.assertThrows("Creating branch which already exists should fail", + IllegalArgumentException.class, "Ref branch2 already exists", + () -> table.manageSnapshots().createBranch("branch2", snapshotId).createBranch("branch2", snapshotId).commit()); + } + + + @Test + public void testCreateTag() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + // Test a basic case of creating a tag + table.manageSnapshots() + .createTag("tag1", snapshotId) + .commit(); + SnapshotRef expectedTag = table.ops().refresh().ref("tag1"); + + Assert.assertTrue(expectedTag != null && + expectedTag.equals(SnapshotRef.tagBuilder(snapshotId).build())); + } + + @Test + public void testCreateTagFailsWhenRefAlreadyExists() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots() + .createTag("tag1", snapshotId) + .commit(); + + // Trying to create a tag with an existing name should fail + AssertHelpers.assertThrows("Creating tag which already exists should fail", + IllegalArgumentException.class, "Ref tag1 already exists", + () -> table.manageSnapshots().createTag("tag1", snapshotId).commit()); + + // Trying to create another tag within the same chain + AssertHelpers.assertThrows("Creating branch which already exists should fail", + IllegalArgumentException.class, "Ref tag2 already exists", + () -> table.manageSnapshots() + .createTag("tag2", snapshotId) + .createTag("tag2", snapshotId).commit()); + } + + @Test + public void testRemoveBranch() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + // Test a basic case of creating and then removing a branch and tag + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + table.manageSnapshots() + .removeBranch("branch1") + .commit(); + + TableMetadata updated = table.ops().refresh(); + SnapshotRef expectedBranch = updated.ref("branch1"); + Assert.assertNull(expectedBranch); + + // Test chained creating and removal of branch and tag + table.manageSnapshots() + .createBranch("branch2", snapshotId) + .removeBranch("branch2") + .commit(); + updated = table.ops().refresh(); + Assert.assertNull(updated.ref("branch2")); + } + + @Test + public void testRemovingNonExistingBranchFails() { + AssertHelpers.assertThrows("Trying to remove non-existent branch should fail", + IllegalArgumentException.class, "Branch does not exist: non-existing", + () -> table.manageSnapshots().removeBranch("non-existing").commit()); + } + + @Test + public void testRemovingMainBranchFails() { + AssertHelpers.assertThrows("Removing main should fail", + IllegalArgumentException.class, "Cannot remove main branch", + () -> table.manageSnapshots().removeBranch(SnapshotRef.MAIN_BRANCH).commit()); + } + + @Test + public void testRemoveTag() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + // Test a basic case of creating and then removing a branch and tag + table.manageSnapshots() + .createTag("tag1", snapshotId) + .commit(); + table.manageSnapshots() + .removeTag("tag1") + .commit(); + TableMetadata updated = table.ops().refresh(); + SnapshotRef expectedTag = updated.ref("tag1"); + Assert.assertNull(expectedTag); + + // Test chained creating and removal of a tag + table.manageSnapshots() + .createTag("tag2", snapshotId) + .removeTag("tag2") + .commit(); + Assert.assertEquals(updated, table.ops().refresh()); + Assert.assertNull(updated.ref("tag2")); + } + + @Test + public void testRemovingNonExistingTagFails() { + AssertHelpers.assertThrows("Removing a non-existing tag should fail", + IllegalArgumentException.class, "Tag does not exist: non-existing", + () -> table.manageSnapshots().removeTag("non-existing").commit()); + } + + @Test + public void testReplaceBranch() { + table.newAppend() + .appendFile(FILE_A) + .set("wap.id", "123") + .stageOnly() + .commit(); + Snapshot firstSnapshot = Iterables.getOnlyElement(table.snapshots()); + table.manageSnapshots().createBranch("branch1", firstSnapshot.snapshotId()).commit(); + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "456") + .stageOnly() + .commit(); + Snapshot secondSnapshot = Iterables.get(table.snapshots(), 1); + table.manageSnapshots().createBranch("branch2", secondSnapshot.snapshotId()).commit(); + table.manageSnapshots().replaceBranch("branch1", "branch2").commit(); + Assert.assertEquals(table.ops().refresh().ref("branch1").snapshotId(), secondSnapshot.snapshotId()); + } + + @Test + public void testReplaceBranchNonExistingTargetBranchFails() { + AssertHelpers.assertThrows("Replacing a non-existing branch should fail", + IllegalArgumentException.class, "Target branch does not exist: non-existing", + () -> table.manageSnapshots().replaceBranch("non-existing", "other-branch").commit()); + } + + @Test + public void testReplaceBranchNonExistingSourceFails() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + AssertHelpers.assertThrows("Replacing where the source ref does not exist should fail", + IllegalArgumentException.class, "Ref does not exist: non-existing", + () -> table.manageSnapshots().replaceBranch("branch1", "non-existing").commit()); + } + + @Test + public void testFastForward() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "123456789") + .stageOnly() + .commit(); + + Assert.assertEquals(table.currentSnapshot().snapshotId(), 1); + + table.manageSnapshots().createBranch("new-branch-at-staged-snapshot", 2).commit(); + table.manageSnapshots().fastForwardBranch(SnapshotRef.MAIN_BRANCH, "new-branch-at-staged-snapshot").commit(); + + Assert.assertEquals(table.currentSnapshot().snapshotId(), 2); + } + + @Test + public void testFastForwardWhenTargetIsNotAncestorFails() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .set("wap.id", "123456789") + .stageOnly() + .commit(); + + long snapshot = table.currentSnapshot().snapshotId(); + + // Commit a snapshot on main to deviate the branches + table.newAppend() + .appendFile(FILE_C) + .commit(); + + final String newBranch = "new-branch-at-staged-snapshot"; + table.manageSnapshots().createBranch(newBranch, snapshot).commit(); + + AssertHelpers.assertThrows("Fast-forward should fail if target is not an ancestor of the source", + IllegalArgumentException.class, "Cannot fast-forward: main is not an ancestor of new-branch-at-staged-snapshot", + () -> table.manageSnapshots().fastForwardBranch(SnapshotRef.MAIN_BRANCH, newBranch).commit()); + } + + @Test + public void testReplaceTag() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots() + .createTag("tag1", snapshotId) + .commit(); + // Create a new snapshot and replace the tip of branch1 to be the new snapshot + table.newAppend() + .appendFile(FILE_B) + .commit(); + long currentSnapshot = table.ops().refresh().currentSnapshot().snapshotId(); + table.manageSnapshots().replaceTag("tag1", currentSnapshot).commit(); + Assert.assertEquals(table.ops().refresh().ref("tag1").snapshotId(), currentSnapshot); + } + + @Test + public void testUpdatingBranchRetention() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + // Test creating and updating independently + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + table.manageSnapshots() + .setMinSnapshotsToKeep("branch1", 10) + .setMaxSnapshotAgeMs("branch1", 20000) + .commit(); + TableMetadata updated = table.ops().refresh(); + Assert.assertEquals(20000, (long) updated.ref("branch1").maxSnapshotAgeMs()); + Assert.assertEquals(10, (long) updated.ref("branch1").minSnapshotsToKeep()); + // Test creating and updating in a chain + table.manageSnapshots() + .createBranch("branch2", snapshotId) + .setMinSnapshotsToKeep("branch2", 10) + .setMaxSnapshotAgeMs("branch2", 20000) + .commit(); + updated = table.ops().refresh(); + Assert.assertEquals(20000, (long) updated.ref("branch2").maxSnapshotAgeMs()); + Assert.assertEquals(10, (long) updated.ref("branch2").minSnapshotsToKeep()); + } + + @Test + public void testSettingBranchRetentionOnTagFails() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + AssertHelpers.assertThrows("Setting minSnapshotsToKeep should fail for tags", + IllegalArgumentException.class, "Tags do not support setting minSnapshotsToKeep", + () -> table.manageSnapshots().createTag("tag1", snapshotId).setMinSnapshotsToKeep("tag1", 10).commit()); + AssertHelpers.assertThrows("Setting maxSnapshotAgeMs should fail for tags", + IllegalArgumentException.class, "Tags do not support setting maxSnapshotAgeMs", + () -> table.manageSnapshots().createTag("tag1", snapshotId).setMaxSnapshotAgeMs("tag1", 10).commit()); + } + + @Test + public void testUpdatingBranchMaxRefAge() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + final long maxRefAgeMs = 10000; + + // Test creating and updating independently + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + table.manageSnapshots() + .setMaxRefAgeMs("branch1", 10000) + .commit(); + TableMetadata updated = table.ops().refresh(); + Assert.assertEquals(maxRefAgeMs, (long) updated.ref("branch1").maxRefAgeMs()); + Assert.assertEquals(maxRefAgeMs, (long) updated.ref("branch1").maxRefAgeMs()); + } + + @Test + public void testUpdatingTagMaxRefAge() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + final long maxRefAgeMs = 10000; + + // Test creating and updating independently + table.manageSnapshots() + .createTag("tag1", snapshotId) + .commit(); + table.manageSnapshots() + .setMaxRefAgeMs("tag1", maxRefAgeMs) + .commit(); + + TableMetadata updated = table.ops().refresh(); + Assert.assertEquals(maxRefAgeMs, (long) updated.ref("tag1").maxRefAgeMs()); + + // Test creating and updating in a chain + table.manageSnapshots() + .createTag("tag2", snapshotId) + .setMaxRefAgeMs("tag2", maxRefAgeMs) + .commit(); + updated = table.ops().refresh(); + Assert.assertEquals(maxRefAgeMs, (long) updated.ref("tag2").maxRefAgeMs()); + } + + @Test + public void testRenameBranch() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotId = table.currentSnapshot().snapshotId(); + + // Test creating and renaming independently + table.manageSnapshots() + .createBranch("branch1", snapshotId) + .commit(); + table.manageSnapshots() + .renameBranch("branch1", "branch2") + .commit(); + TableMetadata updated = table.ops().refresh(); + Assert.assertNull(updated.ref("branch1")); + Assert.assertEquals(updated.ref("branch2"), SnapshotRef.branchBuilder(snapshotId).build()); + + table.manageSnapshots() + .createBranch("branch3", snapshotId) + .renameBranch("branch3", "branch4") + .commit(); + + updated = table.ops().refresh(); + Assert.assertNull(updated.ref("branch3")); + Assert.assertEquals(updated.ref("branch4"), SnapshotRef.branchBuilder(snapshotId).build()); + } + + @Test + public void testFailRenamingMainBranch() { + AssertHelpers.assertThrows("Renaming main branch should fail", + IllegalArgumentException.class, "Cannot rename main branch", + () -> table.manageSnapshots().renameBranch(SnapshotRef.MAIN_BRANCH, "some-branch").commit()); + } + + @Test + public void testRenamingNonExistingBranchFails() { + AssertHelpers.assertThrows("Renaming non-existent branch should fail", + IllegalArgumentException.class, "Branch does not exist: some-missing-branch", + () -> table.manageSnapshots().renameBranch("some-missing-branch", "some-branch").commit()); + } + + @Test + public void testCreateReferencesAndRollback() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + table.newAppend() + .appendFile(FILE_A) + .commit(); + long snapshotPriorToRollback = table.currentSnapshot().snapshotId(); + + table.manageSnapshots() + .createBranch("branch1", snapshotPriorToRollback) + .createTag("tag1", snapshotPriorToRollback) + .rollbackTo(1) + .commit(); + + TableMetadata current = table.ops().current(); + Assert.assertEquals(current.currentSnapshot().snapshotId(), 1); + SnapshotRef actualTag = current.ref("tag1"); + SnapshotRef actualBranch = current.ref("branch1"); + Assert.assertEquals(1, current.currentSnapshot().snapshotId()); + Assert.assertEquals(SnapshotRef.branchBuilder(snapshotPriorToRollback).build(), actualBranch); + Assert.assertEquals(SnapshotRef.tagBuilder(snapshotPriorToRollback).build(), actualTag); + } + + @Test + public void testCreateReferencesAndCherrypick() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + long currentSnapshot = table.currentSnapshot().snapshotId(); + // stage an overwrite that replaces FILE_A + table.newReplacePartitions() + .addFile(REPLACEMENT_FILE_A) + .stageOnly() + .commit(); + Snapshot staged = Iterables.getLast(table.snapshots()); + + table.manageSnapshots() + .createBranch("branch1", currentSnapshot) + .createTag("tag1", currentSnapshot) + .cherrypick(staged.snapshotId()) + .commit(); + + TableMetadata current = table.ops().current(); + Assert.assertEquals(current.currentSnapshot().snapshotId(), 2); + SnapshotRef actualTag = current.ref("tag1"); + SnapshotRef actualBranch = current.ref("branch1"); + Assert.assertEquals(2, current.currentSnapshot().snapshotId()); + Assert.assertEquals(SnapshotRef.branchBuilder(1).build(), actualBranch); + Assert.assertEquals(SnapshotRef.tagBuilder(1).build(), actualTag); + } }