From 77bb66debf0313abd28cda3a2bea073d3c5b60f8 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Wed, 11 Dec 2024 20:01:30 +0800 Subject: [PATCH 1/3] [core] Introduce RollbackToWatermarkProcedure for rollback --- docs/content/flink/procedures.md | 22 ++++ docs/content/spark/procedures.md | 11 ++ .../apache/paimon/utils/SnapshotManager.java | 59 ++++++++++ .../RollbackToWatermarkProcedure.java | 59 ++++++++++ .../RollbackToWatermarkProcedure.java | 66 +++++++++++ .../org.apache.paimon.factories.Factory | 1 + .../RollbackToWatermarkProcedureITCase.java | 79 +++++++++++++ .../apache/paimon/spark/SparkProcedures.java | 2 + .../RollbackToWatermarkProcedure.java | 105 ++++++++++++++++++ 9 files changed, 404 insertions(+) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 59b02f82bf8c..7a9b23807392 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -412,6 +412,28 @@ All available procedures are listed below. CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000) + + rollback_to_watermark + + -- for Flink 1.18
+ -- rollback to the snapshot which earlier or equal than watermark.
+ CALL sys.rollback_to_watermark('identifier', watermark)

+ -- for Flink 1.19 and later
+ -- rollback to the snapshot which earlier or equal than watermark.
+ CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` => watermark)

+ + + To rollback to the snapshot which earlier or equal than watermark. Argument: +
  • identifier: the target table identifier. Cannot be empty.
  • +
  • watermark (Long): Roll back to the snapshot which earlier or equal than watermark.
  • + + + -- for Flink 1.18
    + CALL sys.rollback_to_watermark('default.T', 1730292023000) + -- for Flink 1.19 and later
    + CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000) + + expire_snapshots diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 88d46fabbb2b..5b0efd5f90a6 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -179,6 +179,17 @@ This section introduce all available spark procedures about paimon. CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)

    + + rollback_to_watermark + + To rollback to the snapshot which earlier or equal than watermark. Argument: +
  • table: the target table identifier. Cannot be empty.
  • +
  • watermark: roll back to the snapshot which earlier or equal than watermark.
  • + + + CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)

    + + migrate_database diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index cbe33ffaf456..eb7333366fec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -366,6 +366,65 @@ private Snapshot changelogOrSnapshot(long snapshotId) { return finalSnapshot; } + public @Nullable Snapshot earlierOrEqualWatermark(long watermark) { + Long earliest = earliestSnapshotId(); + Long latest = latestSnapshotId(); + // If latest == Long.MIN_VALUE don't need next binary search for watermark + // which can reduce IO cost with snapshot + if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { + return null; + } + Long earliestWatermark = null; + // find the first snapshot with watermark + if ((earliestWatermark = snapshot(earliest).watermark()) == null) { + while (earliest < latest) { + earliest++; + earliestWatermark = snapshot(earliest).watermark(); + if (earliestWatermark != null) { + break; + } + } + } + if (earliestWatermark == null) { + return null; + } + + if (earliestWatermark >= watermark) { + return snapshot(earliest); + } + Snapshot finalSnapshot = null; + + while (earliest <= latest) { + long mid = earliest + (latest - earliest) / 2; // Avoid overflow + Snapshot snapshot = snapshot(mid); + Long commitWatermark = snapshot.watermark(); + if (commitWatermark == null) { + // find the first snapshot with watermark + while (mid >= earliest) { + mid--; + commitWatermark = snapshot(mid).watermark(); + if (commitWatermark != null) { + break; + } + } + } + if (commitWatermark == null) { + earliest = mid + 1; + } else { + if (commitWatermark > watermark) { + latest = mid - 1; // Search in the left half + } else if (commitWatermark < watermark) { + earliest = mid + 1; // Search in the right half + finalSnapshot = snapshot; + } else { + finalSnapshot = snapshot; // Found the exact match + break; + } + } + } + return finalSnapshot; + } + public @Nullable Snapshot laterOrEqualWatermark(long watermark) { Long earliest = earliestSnapshotId(); Long latest = latestSnapshotId(); diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java new file mode 100644 index 000000000000..da0b38f16b54 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Rollback to watermark procedure. Usage: + * + *
    
    + *  -- rollback to the snapshot which earlier or equal than watermark.
    + *  CALL sys.rollback_to_watermark('tableId', watermark)
    + * 
    + */ +public class RollbackToWatermarkProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "rollback_to_watermark"; + + public String[] call(ProcedureContext procedureContext, String tableId, long watermark) + throws Catalog.TableNotExistException { + Preconditions.checkNotNull(tableId, "table can not be empty"); + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark); + Preconditions.checkNotNull( + snapshot, String.format("count not find snapshot earlier than %s", watermark)); + long snapshotId = snapshot.id(); + fileStoreTable.rollbackTo(snapshotId); + return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java new file mode 100644 index 000000000000..ab1ea8080de9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Rollback to watermark procedure. Usage: + * + *
    
    + *  -- rollback to the snapshot which earlier or equal than watermark.
    + *  CALL sys.rollback_to_watermark(`table` => 'tableId', watermark => watermark)
    + * 
    + */ +public class RollbackToWatermarkProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "rollback_to_watermark"; + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "watermark", type = @DataTypeHint("BIGINT")) + }) + public String[] call(ProcedureContext procedureContext, String tableId, Long watermark) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark); + Preconditions.checkNotNull( + snapshot, String.format("count not find snapshot earlier than %s", watermark)); + long snapshotId = snapshot.id(); + fileStoreTable.rollbackTo(snapshotId); + return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 0ff3ac1f1e1c..6c3b0e7664c0 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -61,6 +61,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure org.apache.paimon.flink.procedure.ResetConsumerProcedure org.apache.paimon.flink.procedure.RollbackToProcedure org.apache.paimon.flink.procedure.RollbackToTimestampProcedure +org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure org.apache.paimon.flink.procedure.MigrateTableProcedure org.apache.paimon.flink.procedure.MigrateDatabaseProcedure org.apache.paimon.flink.procedure.MigrateFileProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java new file mode 100644 index 000000000000..bbb0097f8a20 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.CatalogITCaseBase; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for {@link RollbackToWatermarkProcedure}. */ +public class RollbackToWatermarkProcedureITCase extends CatalogITCaseBase { + + @Test + public void testCreateTagsFromSnapshotsWatermark() throws Exception { + sql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " PRIMARY KEY (k, dt) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '1'" + + ")"); + + // create snapshot 1 with watermark 1000. + sql( + "insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k1', '2024-12-02')"); + // create snapshot 2 with watermark 2000. + sql( + "insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k2', '2024-12-02')"); + // create snapshot 3 with watermark 3000. + sql( + "insert into T/*+ OPTIONS('end-input.watermark'= '3000') */ values('k3', '2024-12-02')"); + + FileStoreTable table = paimonTable("T"); + + long watermark1 = table.snapshotManager().snapshot(1).watermark(); + long watermark2 = table.snapshotManager().snapshot(2).watermark(); + long watermark3 = table.snapshotManager().snapshot(3).watermark(); + + assertThat(watermark1 == 1000).isTrue(); + assertThat(watermark2 == 2000).isTrue(); + assertThat(watermark3 == 3000).isTrue(); + + assertThat(sql("select * from T").stream().map(Row::toString)) + .containsExactlyInAnyOrder( + "+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3, 2024-12-02]"); + + sql("CALL sys.rollback_to_watermark(" + "`table` => 'default.T'," + "`watermark` => 2001)"); + + // check for snapshot 2 + assertThat(sql("select * from T").stream().map(Row::toString)) + .containsExactlyInAnyOrder("+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]"); + + sql("CALL sys.rollback_to_watermark(" + "`table` => 'default.T'," + "`watermark` => 1001)"); + + // check for snapshot 1 + assertThat(sql("select * from T").stream().map(Row::toString)) + .containsExactlyInAnyOrder("+I[k1, 2024-12-02]"); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 21f14e5d7a38..b2fa66a15090 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -43,6 +43,7 @@ import org.apache.paimon.spark.procedure.ResetConsumerProcedure; import org.apache.paimon.spark.procedure.RollbackProcedure; import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure; +import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -72,6 +73,7 @@ private static Map> initProcedureBuilders() { ImmutableMap.builder(); procedureBuilders.put("rollback", RollbackProcedure::builder); procedureBuilders.put("rollback_to_timestamp", RollbackToTimestampProcedure::builder); + procedureBuilders.put("rollback_to_watermark", RollbackToWatermarkProcedure::builder); procedureBuilders.put("create_tag", CreateTagProcedure::builder); procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder); procedureBuilders.put("rename_tag", RenameTagProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java new file mode 100644 index 000000000000..b417da51637b --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +import static org.apache.spark.sql.types.DataTypes.LongType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** A procedure to rollback to a watermark. */ +public class RollbackToWatermarkProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + // timestamp value + ProcedureParameter.required("watermark", LongType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", StringType, true, Metadata.empty()) + }); + + private RollbackToWatermarkProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + Long watermark = args.getLong(1); + + return modifyPaimonTable( + tableIdent, + table -> { + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = + fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark); + Preconditions.checkNotNull( + snapshot, + String.format("count not find snapshot earlier than %s", watermark)); + long snapshotId = snapshot.id(); + fileStoreTable.rollbackTo(snapshotId); + InternalRow outputRow = + newInternalRow( + UTF8String.fromString( + String.format( + "Success roll back to snapshot: %s .", + snapshotId))); + return new InternalRow[] {outputRow}; + }); + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public RollbackToWatermarkProcedure doBuild() { + return new RollbackToWatermarkProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "RollbackToWatermarkProcedure"; + } +} From e26a166d7eb326a8d8dd49a71b3afadf804dab31 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Wed, 11 Dec 2024 20:03:20 +0800 Subject: [PATCH 2/3] fix --- .../paimon/spark/procedure/RollbackToWatermarkProcedure.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java index b417da51637b..09185f02c919 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java @@ -39,7 +39,7 @@ public class RollbackToWatermarkProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), - // timestamp value + // watermark value ProcedureParameter.required("watermark", LongType) }; From 5fe41361ac7a81385df5c47859a40509f5242211 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Wed, 11 Dec 2024 20:46:13 +0800 Subject: [PATCH 3/3] fix style --- .../flink/procedure/RollbackToWatermarkProcedureITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java index bbb0097f8a20..f87ecd24756b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java @@ -64,13 +64,13 @@ public void testCreateTagsFromSnapshotsWatermark() throws Exception { .containsExactlyInAnyOrder( "+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3, 2024-12-02]"); - sql("CALL sys.rollback_to_watermark(" + "`table` => 'default.T'," + "`watermark` => 2001)"); + sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 2001)"); // check for snapshot 2 assertThat(sql("select * from T").stream().map(Row::toString)) .containsExactlyInAnyOrder("+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]"); - sql("CALL sys.rollback_to_watermark(" + "`table` => 'default.T'," + "`watermark` => 1001)"); + sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 1001)"); // check for snapshot 1 assertThat(sql("select * from T").stream().map(Row::toString))