diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 8e32c2201f7f..2978cd68e350 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -159,13 +159,17 @@ This section introduce all available spark procedures about paimon. rollback - To rollback to a specific version of target table. Argument: + To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
  • table: the target table identifier. Cannot be empty.
  • -
  • version: id of the snapshot or name of tag that will roll back to.
  • +
  • version: id of the snapshot or name of tag that will roll back to, version would be Deprecated.
  • +
  • snapshot: snapshot that will roll back to.
  • +
  • tag: tag that will roll back to.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    - CALL sys.rollback(table => 'default.T', version => 10) + CALL sys.rollback(table => 'default.T', version => 10)

    + CALL sys.rollback(table => 'default.T', tag => 'tag1') + CALL sys.rollback(table => 'default.T', snapshot => 2) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 6d004e946607..d9a88763320e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -18,6 +18,9 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +29,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to rollback to a snapshot or a tag. */ @@ -35,7 +39,9 @@ public class RollbackProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), // snapshot id or tag name - ProcedureParameter.required("version", StringType) + ProcedureParameter.optional("version", StringType), + ProcedureParameter.optional("snapshot", LongType), + ProcedureParameter.optional("tag", StringType) }; private static final StructType OUTPUT_TYPE = @@ -61,15 +67,35 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String version = args.getString(1); + String version = args.isNullAt(1) ? null : args.getString(1); return modifyPaimonTable( tableIdent, table -> { - if (version.chars().allMatch(Character::isDigit)) { - table.rollbackTo(Long.parseLong(version)); + Long snapshot = null; + String tag = null; + if (!StringUtils.isNullOrWhitespaceOnly(version)) { + Preconditions.checkState( + args.isNullAt(2) && args.isNullAt(3), + "only can set one of version/snapshot/tag in RollbackProcedure."); + if (version.chars().allMatch(Character::isDigit)) { + snapshot = Long.parseLong(version); + } else { + tag = version; + } + } else { + Preconditions.checkState( + (args.isNullAt(2) && !args.isNullAt(3) + || !args.isNullAt(2) && args.isNullAt(3)), + "only can set one of version/snapshot/tag in RollbackProcedure."); + snapshot = args.isNullAt(2) ? null : args.getLong(2); + tag = args.isNullAt(3) ? null : args.getString(3); + } + + if (snapshot != null) { + table.rollbackTo(snapshot); } else { - table.rollbackTo(version); + table.rollbackTo(tag); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 457c5ba513ec..325e06135b71 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -94,6 +94,66 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } + test("Paimon Procedure: rollback to tag check test") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') + |""".stripMargin) + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + // snapshot-1 + spark.sql("insert into T select 1, 'a'") + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), + Row(true) :: Nil) + + // snapshot-2 + spark.sql("insert into T select 2, 'b'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + spark.sql("insert into T select 3, 'c'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + // snapshot-4 + spark.sql("insert into T select 4, 'd'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '4')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + // version/snapshot/tag can only set one of them + assertThrows[RuntimeException] { + spark.sql( + "CALL paimon.sys.rollback(table => 'test.T', version => '20250122', tag => '20250122')") + } + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122', snapshot => 1)") + } + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122', snapshot => 1)") + } + + // rollback to snapshot + spark.sql("CALL paimon.sys.rollback(table => 'test.T', snapshot => 2)") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // rollback to tag + spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122')") + checkAnswer(query(), Row(1, "a") :: Nil) + } + test("Paimon Procedure: rollback to timestamp") { failAfter(streamingTimeout) { withTempDir {