diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 59b02f82bf8c..7a9b23807392 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -412,6 +412,28 @@ All available procedures are listed below. CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000) +
+ * -- rollback to the snapshot which earlier or equal than watermark.
+ * CALL sys.rollback_to_watermark('tableId', watermark)
+ *
+ */
+public class RollbackToWatermarkProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "rollback_to_watermark";
+
+ public String[] call(ProcedureContext procedureContext, String tableId, long watermark)
+ throws Catalog.TableNotExistException {
+ Preconditions.checkNotNull(tableId, "table can not be empty");
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+ Preconditions.checkNotNull(
+ snapshot, String.format("count not find snapshot earlier than %s", watermark));
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 000000000000..ab1ea8080de9
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Rollback to watermark procedure. Usage:
+ *
+ *
+ * -- rollback to the snapshot which earlier or equal than watermark.
+ * CALL sys.rollback_to_watermark(`table` => 'tableId', watermark => watermark)
+ *
+ */
+public class RollbackToWatermarkProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "rollback_to_watermark";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "watermark", type = @DataTypeHint("BIGINT"))
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId, Long watermark)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+ Preconditions.checkNotNull(
+ snapshot, String.format("count not find snapshot earlier than %s", watermark));
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 0ff3ac1f1e1c..6c3b0e7664c0 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -61,6 +61,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
+org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
new file mode 100644
index 000000000000..f87ecd24756b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link RollbackToWatermarkProcedure}. */
+public class RollbackToWatermarkProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testCreateTagsFromSnapshotsWatermark() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+
+ // create snapshot 1 with watermark 1000.
+ sql(
+ "insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k1', '2024-12-02')");
+ // create snapshot 2 with watermark 2000.
+ sql(
+ "insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k2', '2024-12-02')");
+ // create snapshot 3 with watermark 3000.
+ sql(
+ "insert into T/*+ OPTIONS('end-input.watermark'= '3000') */ values('k3', '2024-12-02')");
+
+ FileStoreTable table = paimonTable("T");
+
+ long watermark1 = table.snapshotManager().snapshot(1).watermark();
+ long watermark2 = table.snapshotManager().snapshot(2).watermark();
+ long watermark3 = table.snapshotManager().snapshot(3).watermark();
+
+ assertThat(watermark1 == 1000).isTrue();
+ assertThat(watermark2 == 2000).isTrue();
+ assertThat(watermark3 == 3000).isTrue();
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3, 2024-12-02]");
+
+ sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 2001)");
+
+ // check for snapshot 2
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]");
+
+ sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 1001)");
+
+ // check for snapshot 1
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-12-02]");
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 21f14e5d7a38..b2fa66a15090 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -43,6 +43,7 @@
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;
+import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -72,6 +73,7 @@ private static Map