From 1998c852d323ffa6fae360a378bd10afc0e7d8b5 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 10:44:27 +0800 Subject: [PATCH 01/10] [spark] Fix rollback not correctly identify tag or snapshot --- .../spark/sql/RollbackProcedureTest.scala | 44 ++++++++++++++++++- .../spark/procedure/RollbackProcedure.java | 6 ++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala index 7a3a5730ed3b..d39eeebbc579 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala @@ -35,7 +35,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { // define a pk table and test `forEachBatch` api spark.sql(s""" |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') |""".stripMargin) val location = loadTable("T").location().toString @@ -93,4 +93,46 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } } + + test("Paimon Procedure: rollback to tag check test") { + spark.sql( + s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') + |""".stripMargin) + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + // snapshot-1 + spark.sql("insert into T select 1, 'a'") + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), + Row(true) :: Nil) + + // snapshot-2 + spark.sql("insert into T select 2, 'b'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + spark.sql("insert into T select 2, 'b2'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // rollback to tag + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Nil) + } + } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 6d004e946607..ce6531bdc324 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.TagManager; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -66,7 +68,9 @@ public InternalRow[] call(InternalRow args) { return modifyPaimonTable( tableIdent, table -> { - if (version.chars().allMatch(Character::isDigit)) { + FileStoreTable fileStoreTable = (FileStoreTable) table; + TagManager tagManager = fileStoreTable.tagManager(); + if (version.chars().allMatch(Character::isDigit) && !tagManager.tagExists(version)) { table.rollbackTo(Long.parseLong(version)); } else { table.rollbackTo(version); From 393054ced3f8b959f89019640c9de69179298dc3 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 10:45:20 +0800 Subject: [PATCH 02/10] [spark] Fix rollback not correctly identify tag or snapshot --- .../spark/sql/RollbackProcedureTest.scala | 76 +++++++++---------- .../spark/procedure/RollbackProcedure.java | 4 +- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala index d39eeebbc579..917579e11d2c 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala @@ -95,44 +95,42 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } test("Paimon Procedure: rollback to tag check test") { - spark.sql( - s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') - |""".stripMargin) - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - // snapshot-1 - spark.sql("insert into T select 1, 'a'") - checkAnswer(query(), Row(1, "a") :: Nil) - - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), - Row(true) :: Nil) - - // snapshot-2 - spark.sql("insert into T select 2, 'b'") - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // snapshot-3 - spark.sql("insert into T select 2, 'b2'") - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - assertThrows[RuntimeException] { - spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") - } - // rollback to snapshot - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), - Row(true) :: Nil) - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // rollback to tag - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')"), - Row(true) :: Nil) - checkAnswer(query(), Row(1, "a") :: Nil) - } + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') + |""".stripMargin) + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + // snapshot-1 + spark.sql("insert into T select 1, 'a'") + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), + Row(true) :: Nil) + + // snapshot-2 + spark.sql("insert into T select 2, 'b'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + spark.sql("insert into T select 2, 'b2'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // rollback to tag + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Nil) + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index ce6531bdc324..fc359cfa3969 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.TagManager; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -70,7 +71,8 @@ public InternalRow[] call(InternalRow args) { table -> { FileStoreTable fileStoreTable = (FileStoreTable) table; TagManager tagManager = fileStoreTable.tagManager(); - if (version.chars().allMatch(Character::isDigit) && !tagManager.tagExists(version)) { + if (version.chars().allMatch(Character::isDigit) + && !tagManager.tagExists(version)) { table.rollbackTo(Long.parseLong(version)); } else { table.rollbackTo(version); From 5942273dbaab3ad7190ccafd1ef345d93eac3e04 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 10:48:44 +0800 Subject: [PATCH 03/10] fix --- .../org/apache/paimon/spark/sql/RollbackProcedureTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala index 917579e11d2c..58c5db496893 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala @@ -35,7 +35,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { // define a pk table and test `forEachBatch` api spark.sql(s""" |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') |""".stripMargin) val location = loadTable("T").location().toString From 440fb6c6e2221e7f186191dcad4ffeaa2b9baf93 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 11:15:09 +0800 Subject: [PATCH 04/10] move ut --- .../spark/sql/RollbackProcedureTest.scala | 40 ------------------- .../procedure/RollbackProcedureTest.scala | 39 ++++++++++++++++++ 2 files changed, 39 insertions(+), 40 deletions(-) diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala index 58c5db496893..7a3a5730ed3b 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala @@ -93,44 +93,4 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } } - - test("Paimon Procedure: rollback to tag check test") { - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') - |""".stripMargin) - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - // snapshot-1 - spark.sql("insert into T select 1, 'a'") - checkAnswer(query(), Row(1, "a") :: Nil) - - checkAnswer( - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), - Row(true) :: Nil) - - // snapshot-2 - spark.sql("insert into T select 2, 'b'") - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // snapshot-3 - spark.sql("insert into T select 2, 'b2'") - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - assertThrows[RuntimeException] { - spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") - } - // rollback to snapshot - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), - Row(true) :: Nil) - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // rollback to tag - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')"), - Row(true) :: Nil) - checkAnswer(query(), Row(1, "a") :: Nil) - } - } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 457c5ba513ec..339fc75b9ccd 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -94,6 +94,45 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } + test("Paimon Procedure: rollback to tag check test") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc') + |""".stripMargin) + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + // snapshot-1 + spark.sql("insert into T select 1, 'a'") + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"), + Row(true) :: Nil) + + // snapshot-2 + spark.sql("insert into T select 2, 'b'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + spark.sql("insert into T select 2, 'b2'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // rollback to tag + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Nil) + } + test("Paimon Procedure: rollback to timestamp") { failAfter(streamingTimeout) { withTempDir { From e96e704f41272ff69e670d60eb014191921cb9d5 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 14:38:18 +0800 Subject: [PATCH 05/10] add type --- docs/content/spark/procedures.md | 4 +++- .../spark/procedure/RollbackProcedure.java | 24 ++++++++++++------- .../procedure/RollbackProcedureTest.scala | 10 +++++--- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 8e32c2201f7f..f985474eddf7 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -162,10 +162,12 @@ This section introduce all available spark procedures about paimon. To rollback to a specific version of target table. Argument:
  • table: the target table identifier. Cannot be empty.
  • version: id of the snapshot or name of tag that will roll back to.
  • +
  • type: type of version and default is snapshot, user can set it with snapshot or tag.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    - CALL sys.rollback(table => 'default.T', version => 10) + CALL sys.rollback(table => 'default.T', version => 10)

    + CALL sys.rollback(table => 'default.T', version => '20250122', type => 'tag') diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index fc359cfa3969..45f23f4d64b3 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -18,9 +18,6 @@ package org.apache.paimon.spark.procedure; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.TagManager; - import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -38,7 +35,8 @@ public class RollbackProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), // snapshot id or tag name - ProcedureParameter.required("version", StringType) + ProcedureParameter.required("version", StringType), + ProcedureParameter.optional("type", StringType) }; private static final StructType OUTPUT_TYPE = @@ -65,18 +63,26 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String version = args.getString(1); + String type = args.isNullAt(2) ? "snapshot" : args.getString(2); + if (!type.equals("snapshot") && !type.equals("tag")) { + throw new IllegalArgumentException( + "type in RollbackProcedure must be one of snapshot or tag."); + } return modifyPaimonTable( tableIdent, table -> { - FileStoreTable fileStoreTable = (FileStoreTable) table; - TagManager tagManager = fileStoreTable.tagManager(); - if (version.chars().allMatch(Character::isDigit) - && !tagManager.tagExists(version)) { - table.rollbackTo(Long.parseLong(version)); + if (type.equals("snapshot")) { + if (version.chars().allMatch(Character::isDigit)) { + table.rollbackTo(Long.parseLong(version)); + } else { + throw new IllegalArgumentException( + "version should be a digit if rollback to a snapshot."); + } } else { table.rollbackTo(version); } + InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; }); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 339fc75b9ccd..e60633df7cab 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -126,10 +126,14 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { Row(true) :: Nil) checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + // digtal version would throw out if not set type = tag + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')") + } + // rollback to tag - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')"), - Row(true) :: Nil) + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122', type => 'tag')") + checkAnswer(query(), Row(1, "a") :: Nil) } From a4cb4ff85a4fa3b9109bae4c1a5f07d7e4fcb821 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 14:43:34 +0800 Subject: [PATCH 06/10] add type --- .../org/apache/paimon/spark/procedure/RollbackProcedure.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 45f23f4d64b3..17b01feadf6a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -82,7 +82,6 @@ public InternalRow[] call(InternalRow args) { } else { table.rollbackTo(version); } - InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; }); From b1b58a2485323a22827fd56bca5b13e3a970640c Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 14:59:52 +0800 Subject: [PATCH 07/10] comments --- .../spark/procedure/RollbackProcedure.java | 28 +++++++++++-------- .../procedure/RollbackProcedureTest.scala | 15 ++++++++-- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 17b01feadf6a..cabf1e9f6cfb 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; /** A procedure to rollback to a snapshot or a tag. */ @@ -35,8 +36,9 @@ public class RollbackProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), // snapshot id or tag name - ProcedureParameter.required("version", StringType), - ProcedureParameter.optional("type", StringType) + ProcedureParameter.optional("version", StringType), + ProcedureParameter.optional("snapshot", LongType), + ProcedureParameter.optional("tag", StringType) }; private static final StructType OUTPUT_TYPE = @@ -62,25 +64,29 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String version = args.getString(1); - String type = args.isNullAt(2) ? "snapshot" : args.getString(2); - if (!type.equals("snapshot") && !type.equals("tag")) { + String version = args.isNullAt(1) ? null : args.getString(1); + Long snapshot = args.isNullAt(2) ? null : args.getLong(2); + String tag = args.isNullAt(3) ? null : args.getString(3); + if ((version != null && snapshot != null) + || (snapshot != null && tag != null) + || (tag != null && version != null)) { throw new IllegalArgumentException( - "type in RollbackProcedure must be one of snapshot or tag."); + "only can set one of version/snapshot/tag in RollbackProcedure."); } return modifyPaimonTable( tableIdent, table -> { - if (type.equals("snapshot")) { + if (snapshot != null) { + table.rollbackTo(snapshot); + } else if (tag != null) { + table.rollbackTo(tag); + } else { if (version.chars().allMatch(Character::isDigit)) { table.rollbackTo(Long.parseLong(version)); } else { - throw new IllegalArgumentException( - "version should be a digit if rollback to a snapshot."); + table.rollbackTo(version); } - } else { - table.rollbackTo(version); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index e60633df7cab..3ce674db5275 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -126,13 +126,22 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { Row(true) :: Nil) checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - // digtal version would throw out if not set type = tag + // version/snapshot/tag can only set one of them assertThrows[RuntimeException] { - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122')") + spark.sql( + "CALL paimon.sys.rollback(table => 'test.T', version => '20250122', tag => '20250122')") + } + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122', snapshot => 1)") + } + + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122', snapshot => 1)") } // rollback to tag - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122', type => 'tag')") + spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122')") checkAnswer(query(), Row(1, "a") :: Nil) } From 4a4a88d605b15067939f70d22ca93d521b347c8c Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 15:03:08 +0800 Subject: [PATCH 08/10] fix doc and comment --- docs/content/spark/procedures.md | 5 +++-- .../apache/paimon/spark/procedure/RollbackProcedure.java | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index f985474eddf7..e8bac6c7eba0 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -159,10 +159,11 @@ This section introduce all available spark procedures about paimon. rollback - To rollback to a specific version of target table. Argument: + To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
  • table: the target table identifier. Cannot be empty.
  • version: id of the snapshot or name of tag that will roll back to.
  • -
  • type: type of version and default is snapshot, user can set it with snapshot or tag.
  • +
  • snapshot: snapshot that will roll back to.
  • +
  • tag: tag that will roll back to.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index cabf1e9f6cfb..34b9369ed1a5 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -71,7 +71,12 @@ public InternalRow[] call(InternalRow args) { || (snapshot != null && tag != null) || (tag != null && version != null)) { throw new IllegalArgumentException( - "only can set one of version/snapshot/tag in RollbackProcedure."); + "only can set only one of version/snapshot/tag in RollbackProcedure."); + } + + if (version == null && snapshot == null && tag == null) { + throw new IllegalArgumentException( + "must set one of version/snapshot/tag in RollbackProcedure."); } return modifyPaimonTable( From a6c0a37c2867548b16516356a99c6b1277965c80 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 20 Jan 2025 15:30:11 +0800 Subject: [PATCH 09/10] comments --- .../org/apache/paimon/spark/procedure/RollbackProcedure.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 34b9369ed1a5..646e186c6d30 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -71,7 +71,7 @@ public InternalRow[] call(InternalRow args) { || (snapshot != null && tag != null) || (tag != null && version != null)) { throw new IllegalArgumentException( - "only can set only one of version/snapshot/tag in RollbackProcedure."); + "only can set one of version/snapshot/tag in RollbackProcedure."); } if (version == null && snapshot == null && tag == null) { From 542b9ce7f115ea69a0f43fd9a3fe60966c8f7a29 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Tue, 21 Jan 2025 10:30:59 +0800 Subject: [PATCH 10/10] comments --- docs/content/spark/procedures.md | 5 ++- .../spark/procedure/RollbackProcedure.java | 44 ++++++++++--------- .../procedure/RollbackProcedureTest.scala | 20 ++++++--- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index e8bac6c7eba0..2978cd68e350 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -161,14 +161,15 @@ This section introduce all available spark procedures about paimon. To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
  • table: the target table identifier. Cannot be empty.
  • -
  • version: id of the snapshot or name of tag that will roll back to.
  • +
  • version: id of the snapshot or name of tag that will roll back to, version would be Deprecated.
  • snapshot: snapshot that will roll back to.
  • tag: tag that will roll back to.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    CALL sys.rollback(table => 'default.T', version => 10)

    - CALL sys.rollback(table => 'default.T', version => '20250122', type => 'tag') + CALL sys.rollback(table => 'default.T', tag => 'tag1') + CALL sys.rollback(table => 'default.T', snapshot => 2) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 646e186c6d30..d9a88763320e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -18,6 +18,9 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -65,33 +68,34 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String version = args.isNullAt(1) ? null : args.getString(1); - Long snapshot = args.isNullAt(2) ? null : args.getLong(2); - String tag = args.isNullAt(3) ? null : args.getString(3); - if ((version != null && snapshot != null) - || (snapshot != null && tag != null) - || (tag != null && version != null)) { - throw new IllegalArgumentException( - "only can set one of version/snapshot/tag in RollbackProcedure."); - } - - if (version == null && snapshot == null && tag == null) { - throw new IllegalArgumentException( - "must set one of version/snapshot/tag in RollbackProcedure."); - } return modifyPaimonTable( tableIdent, table -> { - if (snapshot != null) { - table.rollbackTo(snapshot); - } else if (tag != null) { - table.rollbackTo(tag); - } else { + Long snapshot = null; + String tag = null; + if (!StringUtils.isNullOrWhitespaceOnly(version)) { + Preconditions.checkState( + args.isNullAt(2) && args.isNullAt(3), + "only can set one of version/snapshot/tag in RollbackProcedure."); if (version.chars().allMatch(Character::isDigit)) { - table.rollbackTo(Long.parseLong(version)); + snapshot = Long.parseLong(version); } else { - table.rollbackTo(version); + tag = version; } + } else { + Preconditions.checkState( + (args.isNullAt(2) && !args.isNullAt(3) + || !args.isNullAt(2) && args.isNullAt(3)), + "only can set one of version/snapshot/tag in RollbackProcedure."); + snapshot = args.isNullAt(2) ? null : args.getLong(2); + tag = args.isNullAt(3) ? null : args.getString(3); + } + + if (snapshot != null) { + table.rollbackTo(snapshot); + } else { + table.rollbackTo(tag); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 3ce674db5275..325e06135b71 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -115,16 +115,21 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) // snapshot-3 - spark.sql("insert into T select 2, 'b2'") - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + spark.sql("insert into T select 3, 'c'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + // snapshot-4 + spark.sql("insert into T select 4, 'd'") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) + assertThrows[RuntimeException] { - spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '4')") } // rollback to snapshot checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"), Row(true) :: Nil) - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) // version/snapshot/tag can only set one of them assertThrows[RuntimeException] { @@ -140,9 +145,12 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122', snapshot => 1)") } + // rollback to snapshot + spark.sql("CALL paimon.sys.rollback(table => 'test.T', snapshot => 2)") + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + // rollback to tag spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122')") - checkAnswer(query(), Row(1, "a") :: Nil) }