Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,17 @@ This section introduce all available spark procedures about paimon.
<tr>
<td>rollback</td>
<td>
To rollback to a specific version of target table. Argument:
To rollback to a specific version of target table, note version/snapshot/tag must set one of them. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>version: id of the snapshot or name of tag that will roll back to.</li>
<li>version: id of the snapshot or name of tag that will roll back to, version would be Deprecated.</li>
<li>snapshot: snapshot that will roll back to.</li>
<li>tag: tag that will roll back to.</li>
</td>
<td>
CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/><br/>
CALL sys.rollback(table => 'default.T', version => 10)
CALL sys.rollback(table => 'default.T', version => 10)<br/><br/>
CALL sys.rollback(table => 'default.T', tag => 'tag1')
CALL sys.rollback(table => 'default.T', snapshot => 2)
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -26,6 +29,7 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** A procedure to rollback to a snapshot or a tag. */
Expand All @@ -35,7 +39,9 @@ public class RollbackProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
// snapshot id or tag name
ProcedureParameter.required("version", StringType)
ProcedureParameter.optional("version", StringType),
ProcedureParameter.optional("snapshot", LongType),
ProcedureParameter.optional("tag", StringType)
};

private static final StructType OUTPUT_TYPE =
Expand All @@ -61,15 +67,35 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String version = args.getString(1);
String version = args.isNullAt(1) ? null : args.getString(1);

return modifyPaimonTable(
tableIdent,
table -> {
if (version.chars().allMatch(Character::isDigit)) {
table.rollbackTo(Long.parseLong(version));
Long snapshot = null;
String tag = null;
if (!StringUtils.isNullOrWhitespaceOnly(version)) {
Preconditions.checkState(
args.isNullAt(2) && args.isNullAt(3),
"only can set one of version/snapshot/tag in RollbackProcedure.");
if (version.chars().allMatch(Character::isDigit)) {
snapshot = Long.parseLong(version);
} else {
tag = version;
}
} else {
Preconditions.checkState(
(args.isNullAt(2) && !args.isNullAt(3)
|| !args.isNullAt(2) && args.isNullAt(3)),
"only can set one of version/snapshot/tag in RollbackProcedure.");
snapshot = args.isNullAt(2) ? null : args.getLong(2);
tag = args.isNullAt(3) ? null : args.getString(3);
}

if (snapshot != null) {
table.rollbackTo(snapshot);
} else {
table.rollbackTo(version);
table.rollbackTo(tag);
}
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,66 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
}
}

test("Paimon Procedure: rollback to tag check test") {
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 'file.format'='orc')
|""".stripMargin)

val query = () => spark.sql("SELECT * FROM T ORDER BY a")

// snapshot-1
spark.sql("insert into T select 1, 'a'")
checkAnswer(query(), Row(1, "a") :: Nil)

checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '20250122', snapshot => 1)"),
Row(true) :: Nil)

// snapshot-2
spark.sql("insert into T select 2, 'b'")
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

// snapshot-3
spark.sql("insert into T select 3, 'c'")
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)

// snapshot-4
spark.sql("insert into T select 4, 'd'")
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)

assertThrows[RuntimeException] {
spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '4')")
}
// rollback to snapshot
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"),
Row(true) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)

// version/snapshot/tag can only set one of them
assertThrows[RuntimeException] {
spark.sql(
"CALL paimon.sys.rollback(table => 'test.T', version => '20250122', tag => '20250122')")
}

assertThrows[RuntimeException] {
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '20250122', snapshot => 1)")
}

assertThrows[RuntimeException] {
spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122', snapshot => 1)")
}

// rollback to snapshot
spark.sql("CALL paimon.sys.rollback(table => 'test.T', snapshot => 2)")
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

// rollback to tag
spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122')")
checkAnswer(query(), Row(1, "a") :: Nil)
}

test("Paimon Procedure: rollback to timestamp") {
failAfter(streamingTimeout) {
withTempDir {
Expand Down
Loading