From efc504ccd97e5cde50a27dbdb521769502daaf75 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 26 Dec 2025 15:27:05 +0800 Subject: [PATCH] [rest] Add fromSnapshot to rollback --- docs/static/rest-catalog-open-api.yaml | 4 ++ .../java/org/apache/paimon/rest/RESTApi.java | 18 ++++++- .../rest/requests/RollbackTableRequest.java | 20 +++++++- .../paimon/catalog/AbstractCatalog.java | 2 +- .../org/apache/paimon/catalog/Catalog.java | 19 +++++++- .../paimon/catalog/DelegateCatalog.java | 4 +- .../org/apache/paimon/rest/RESTCatalog.java | 4 +- .../apache/paimon/rest/MockRESTMessage.java | 4 +- .../apache/paimon/rest/RESTCatalogServer.java | 48 ++++++++++++------- .../apache/paimon/rest/RESTCatalogTest.java | 13 ++++- 10 files changed, 106 insertions(+), 30 deletions(-) diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index 1cbb02040fd6..7641ab5cb990 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -2763,6 +2763,10 @@ components: properties: instant: $ref: '#/components/schemas/Instant' + fromSnapshot: + type: integer + format: int64 + nullable: true Instant: anyOf: - $ref: '#/components/schemas/SnapshotInstant' diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 648c39551391..57d2e2989c15 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -595,7 +595,23 @@ public boolean commitSnapshot( * this table */ public void rollbackTo(Identifier identifier, Instant instant) { - RollbackTableRequest request = new RollbackTableRequest(instant); + rollbackTo(identifier, instant, null); + } + + /** + * Rollback instant for table. + * + * @param identifier database name and table name. + * @param instant instant to rollback + * @param fromSnapshot snapshot from, success only occurs when the latest snapshot is this + * snapshot. + * @throws NoSuchResourceException Exception thrown on HTTP 404 means the table or the snapshot + * or the tag not exists + * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for + * this table + */ + public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) { + RollbackTableRequest request = new RollbackTableRequest(instant, fromSnapshot); client.post( resourcePaths.rollbackTable( identifier.getDatabaseName(), identifier.getObjectName()), diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackTableRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackTableRequest.java index 1995ee5df95d..9c8ca25c66e9 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackTableRequest.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackTableRequest.java @@ -24,24 +24,42 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + /** Request for rollback table. */ @JsonIgnoreProperties(ignoreUnknown = true) public class RollbackTableRequest implements RESTRequest { private static final String FIELD_INSTANT = "instant"; + private static final String FIELD_FROM_SNAPSHOT = "fromSnapshot"; @JsonProperty(FIELD_INSTANT) private final Instant instant; + @JsonProperty(FIELD_FROM_SNAPSHOT) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final Long fromSnapshot; + @JsonCreator - public RollbackTableRequest(@JsonProperty(FIELD_INSTANT) Instant instant) { + public RollbackTableRequest( + @JsonProperty(FIELD_INSTANT) Instant instant, + @JsonProperty(FIELD_FROM_SNAPSHOT) @Nullable Long fromSnapshot) { this.instant = instant; + this.fromSnapshot = fromSnapshot; } @JsonGetter(FIELD_INSTANT) public Instant getInstant() { return instant; } + + @JsonGetter(FIELD_FROM_SNAPSHOT) + @Nullable + public Long getFromSnapshot() { + return fromSnapshot; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 97e8436a7582..1fd023aa9feb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -558,7 +558,7 @@ public PagedList listSnapshotsPaged( } @Override - public void rollbackTo(Identifier identifier, Instant instant) + public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) throws Catalog.TableNotExistException { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 7d98180c3c2e..dd9521b12ef4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -717,7 +717,24 @@ PagedList listSnapshotsPaged( * @throws UnsupportedOperationException if the catalog does not {@link * #supportsVersionManagement()} */ - void rollbackTo(Identifier identifier, Instant instant) throws Catalog.TableNotExistException; + default void rollbackTo(Identifier identifier, Instant instant) + throws Catalog.TableNotExistException { + rollbackTo(identifier, instant, null); + } + + /** + * rollback table by the given {@link Identifier} and instant. + * + * @param identifier path of the table + * @param instant like snapshotId or tagName + * @param fromSnapshot snapshot from, success only occurs when the latest snapshot is this + * snapshot. + * @throws Catalog.TableNotExistException if the table does not exist + * @throws UnsupportedOperationException if the catalog does not {@link + * #supportsVersionManagement()} + */ + void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) + throws Catalog.TableNotExistException; /** * Create a new branch for this table. By default, an empty branch will be created using the diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 9fc057aa90fd..eff0c0b2295c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -207,9 +207,9 @@ public PagedList listSnapshotsPaged( } @Override - public void rollbackTo(Identifier identifier, Instant instant) + public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) throws Catalog.TableNotExistException { - wrapped.rollbackTo(identifier, instant); + wrapped.rollbackTo(identifier, instant, fromSnapshot); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index cff58bf71389..eeaede934de2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -373,10 +373,10 @@ public boolean commitSnapshot( } @Override - public void rollbackTo(Identifier identifier, Instant instant) + public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) throws Catalog.TableNotExistException { try { - api.rollbackTo(identifier, instant); + api.rollbackTo(identifier, instant, fromSnapshot); } catch (NoSuchResourceException e) { if (StringUtils.equals(e.resourceType(), ErrorResponse.RESOURCE_TYPE_SNAPSHOT)) { throw new IllegalArgumentException( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index fe15ba537052..f487815e4d61 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -267,11 +267,11 @@ public static GetTableTokenResponse getTableCredentialsResponse() { } public static RollbackTableRequest rollbackTableRequestBySnapshot(long snapshotId) { - return new RollbackTableRequest(Instant.snapshot(snapshotId)); + return new RollbackTableRequest(Instant.snapshot(snapshotId), null); } public static RollbackTableRequest rollbackTableRequestByTag(String tagName) { - return new RollbackTableRequest(Instant.tag(tagName)); + return new RollbackTableRequest(Instant.tag(tagName), null); } public static AlterViewRequest alterViewRequest() { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index f4cad6e47d8a..65b3a65ff810 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -118,6 +118,8 @@ import org.slf4j.LoggerFactory; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import javax.annotation.Nullable; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; @@ -478,7 +480,8 @@ public MockResponse dispatch(RecordedRequest request) { long snapshotId = ((Instant.SnapshotInstant) requestBody.getInstant()) .getSnapshotId(); - return rollbackTableByIdHandle(identifier, snapshotId); + return rollbackTableByIdHandle( + identifier, snapshotId, requestBody.getFromSnapshot()); } else if (requestBody.getInstant() instanceof Instant.TagInstant) { String tagName = ((Instant.TagInstant) requestBody.getInstant()) @@ -844,26 +847,35 @@ private MockResponse commitTableHandle(Identifier identifier, String data) throw requestBody.getStatistics()); } - private MockResponse rollbackTableByIdHandle(Identifier identifier, long snapshotId) - throws Exception { + private MockResponse rollbackTableByIdHandle( + Identifier identifier, long snapshotId, @Nullable Long fromSnapshot) throws Exception { FileStoreTable table = getFileTable(identifier); String identifierWithSnapshotId = geTableFullNameWithSnapshotId(identifier, snapshotId); - if (tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) { - table = - table.copy( - Collections.singletonMap( - SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true")); - long latestSnapshotId = table.snapshotManager().latestSnapshotId(); - table.rollbackTo(snapshotId); - cleanSnapshot(identifier, snapshotId, latestSnapshotId); - tableLatestSnapshotStore.put( - identifier.getFullName(), - tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId)); - return new MockResponse().setResponseCode(200); + TableSnapshot toSnapshot = tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId); + if (toSnapshot == null) { + return mockResponse( + new ErrorResponse( + ErrorResponse.RESOURCE_TYPE_SNAPSHOT, "" + snapshotId, "", 404), + 404); } - return mockResponse( - new ErrorResponse(ErrorResponse.RESOURCE_TYPE_SNAPSHOT, "" + snapshotId, "", 404), - 404); + long latestSnapshotId = table.snapshotManager().latestSnapshotId(); + if (fromSnapshot != null && fromSnapshot != latestSnapshotId) { + return mockResponse( + new ErrorResponse( + null, + null, + String.format( + "Latest snapshot %s is not %s", latestSnapshotId, fromSnapshot), + 500), + 500); + } + table = + table.copy( + Collections.singletonMap(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true")); + table.rollbackTo(snapshotId); + cleanSnapshot(identifier, snapshotId, latestSnapshotId); + tableLatestSnapshotStore.put(identifier.getFullName(), toSnapshot); + return new MockResponse().setResponseCode(200); } private MockResponse rollbackTableByTagNameHandle(Identifier identifier, String tagName) diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 4694c4f9c85e..b0910e3e65ed 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -50,6 +50,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Instant; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableSnapshot; import org.apache.paimon.table.object.ObjectTable; @@ -1751,24 +1752,32 @@ public void testTableRollback() throws Exception { GenericRow record = GenericRow.of(i); write.write(record); commit.commit(i, write.prepareCommit(false, i)); - table.createTag("tag-" + i); + table.createTag("tag-" + (i + 1)); } write.close(); commit.close(); + + // rollback to snapshot 4 long rollbackToSnapshotId = 4; table.rollbackTo(rollbackToSnapshotId); assertThat(table.snapshotManager().snapshot(rollbackToSnapshotId)) .isEqualTo(restCatalog.loadSnapshot(identifier).get().snapshot()); assertThat(table.tagManager().tagExists("tag-" + (rollbackToSnapshotId + 2))).isFalse(); assertThat(table.snapshotManager().snapshotExists(rollbackToSnapshotId + 1)).isFalse(); - assertThrows( IllegalArgumentException.class, () -> table.rollbackTo(rollbackToSnapshotId + 1)); + // rollback to snapshot 3 String rollbackToTagName = "tag-" + (rollbackToSnapshotId - 1); table.rollbackTo(rollbackToTagName); Snapshot tagSnapshot = table.tagManager().getOrThrow(rollbackToTagName).trimToSnapshot(); assertThat(tagSnapshot).isEqualTo(restCatalog.loadSnapshot(identifier).get().snapshot()); + + // rollback to snapshot 2 from snapshot + assertThatThrownBy(() -> catalog.rollbackTo(identifier, Instant.snapshot(2L), 4L)) + .hasMessageContaining("Latest snapshot 3 is not 4"); + catalog.rollbackTo(identifier, Instant.snapshot(2L), 3L); + assertThat(table.latestSnapshot().get().id()).isEqualTo(2); } @Test