Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ All available procedures are listed below.
CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000)
</td>
</tr>
<tr>
<td>rollback_to_watermark</td>
<td>
-- for Flink 1.18<br/>
-- rollback to the snapshot which earlier or equal than watermark.<br/>
CALL sys.rollback_to_watermark('identifier', watermark)<br/><br/>
-- for Flink 1.19 and later<br/>
-- rollback to the snapshot which earlier or equal than watermark.<br/>
CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` => watermark)<br/><br/>
</td>
<td>
To rollback to the snapshot which earlier or equal than watermark. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>watermark (Long): Roll back to the snapshot which earlier or equal than watermark.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.rollback_to_watermark('default.T', 1730292023000)
-- for Flink 1.19 and later<br/>
CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000)
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ This section introduce all available spark procedures about paimon.
CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)<br/><br/>
</td>
</tr>
<tr>
<td>rollback_to_watermark</td>
<td>
To rollback to the snapshot which earlier or equal than watermark. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>watermark: roll back to the snapshot which earlier or equal than watermark.</li>
</td>
<td>
CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)<br/><br/>
</td>
</tr>
<tr>
<td>migrate_database</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,65 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return finalSnapshot;
}

public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
// If latest == Long.MIN_VALUE don't need next binary search for watermark
// which can reduce IO cost with snapshot
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}
Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
if (earliestWatermark != null) {
break;
}
}
}
if (earliestWatermark == null) {
return null;
}

if (earliestWatermark >= watermark) {
return snapshot(earliest);
}
Snapshot finalSnapshot = null;

while (earliest <= latest) {
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
Snapshot snapshot = snapshot(mid);
Long commitWatermark = snapshot.watermark();
if (commitWatermark == null) {
// find the first snapshot with watermark
while (mid >= earliest) {
mid--;
commitWatermark = snapshot(mid).watermark();
if (commitWatermark != null) {
break;
}
}
}
if (commitWatermark == null) {
earliest = mid + 1;
} else {
if (commitWatermark > watermark) {
latest = mid - 1; // Search in the left half
} else if (commitWatermark < watermark) {
earliest = mid + 1; // Search in the right half
finalSnapshot = snapshot;
} else {
finalSnapshot = snapshot; // Found the exact match
break;
}
}
}
return finalSnapshot;
}

public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Rollback to watermark procedure. Usage:
*
* <pre><code>
* -- rollback to the snapshot which earlier or equal than watermark.
* CALL sys.rollback_to_watermark('tableId', watermark)
* </code></pre>
*/
public class RollbackToWatermarkProcedure extends ProcedureBase {

public static final String IDENTIFIER = "rollback_to_watermark";

public String[] call(ProcedureContext procedureContext, String tableId, long watermark)
throws Catalog.TableNotExistException {
Preconditions.checkNotNull(tableId, "table can not be empty");
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", watermark));
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

/**
* Rollback to watermark procedure. Usage:
*
* <pre><code>
* -- rollback to the snapshot which earlier or equal than watermark.
* CALL sys.rollback_to_watermark(`table` => 'tableId', watermark => watermark)
* </code></pre>
*/
public class RollbackToWatermarkProcedure extends ProcedureBase {

public static final String IDENTIFIER = "rollback_to_watermark";

@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "watermark", type = @DataTypeHint("BIGINT"))
})
public String[] call(ProcedureContext procedureContext, String tableId, Long watermark)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", watermark));
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** IT Case for {@link RollbackToWatermarkProcedure}. */
public class RollbackToWatermarkProcedureITCase extends CatalogITCaseBase {

@Test
public void testCreateTagsFromSnapshotsWatermark() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1'"
+ ")");

// create snapshot 1 with watermark 1000.
sql(
"insert into T/*+ OPTIONS('end-input.watermark'= '1000') */ values('k1', '2024-12-02')");
// create snapshot 2 with watermark 2000.
sql(
"insert into T/*+ OPTIONS('end-input.watermark'= '2000') */ values('k2', '2024-12-02')");
// create snapshot 3 with watermark 3000.
sql(
"insert into T/*+ OPTIONS('end-input.watermark'= '3000') */ values('k3', '2024-12-02')");

FileStoreTable table = paimonTable("T");

long watermark1 = table.snapshotManager().snapshot(1).watermark();
long watermark2 = table.snapshotManager().snapshot(2).watermark();
long watermark3 = table.snapshotManager().snapshot(3).watermark();

assertThat(watermark1 == 1000).isTrue();
assertThat(watermark2 == 2000).isTrue();
assertThat(watermark3 == 3000).isTrue();

assertThat(sql("select * from T").stream().map(Row::toString))
.containsExactlyInAnyOrder(
"+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3, 2024-12-02]");

sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 2001)");

// check for snapshot 2
assertThat(sql("select * from T").stream().map(Row::toString))
.containsExactlyInAnyOrder("+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]");

sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark` => 1001)");

// check for snapshot 1
assertThat(sql("select * from T").stream().map(Row::toString))
.containsExactlyInAnyOrder("+I[k1, 2024-12-02]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;
import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -72,6 +73,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
ImmutableMap.builder();
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("rollback_to_timestamp", RollbackToTimestampProcedure::builder);
procedureBuilders.put("rollback_to_watermark", RollbackToWatermarkProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
Expand Down
Loading
Loading