diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index dc1c2d6bdbc5..9f67a637ff42 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -252,7 +252,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { if (expireConfig.isChangelogDecoupled()) { commitChangelog(new Changelog(snapshot)); } - snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id)); + snapshotManager.deleteSnapshot(id); } writeEarliestHint(endExclusiveId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java index 29fecec11353..d5482c6f5388 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java @@ -122,7 +122,7 @@ private List cleanSnapshotsDataFiles(Snapshot retainedSnapshot) { // Ignore the non-existent snapshots if (snapshotManager.snapshotExists(i)) { toBeCleaned.add(snapshotManager.snapshot(i)); - fileIO.deleteQuietly(snapshotManager.snapshotPath(i)); + snapshotManager.deleteSnapshot(i); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index ae70d7aec5d1..5257cf1c121c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -183,6 +183,14 @@ public boolean snapshotExists(long snapshotId) { } } + public void deleteSnapshot(long snapshotId) { + Path path = snapshotPath(snapshotId); + if (cache != null) { + cache.invalidate(path); + } + fileIO().deleteQuietly(path); + } + public boolean longLivedChangelogExists(long snapshotId) { Path path = longLivedChangelogPath(snapshotId); try { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java index 938119f947c4..96a9e363bf68 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java @@ -84,9 +84,7 @@ private void commitSnapshotHintInTargetTable(SnapshotManager targetTableSnapshot targetTableSnapshotManager.commitLatestHint(snapshotId); for (Snapshot snapshot : targetTableSnapshotManager.safelyGetAllSnapshots()) { if (snapshot.id() != snapshotId) { - targetTableSnapshotManager - .fileIO() - .deleteQuietly(targetTableSnapshotManager.snapshotPath(snapshot.id())); + targetTableSnapshotManager.deleteSnapshot(snapshot.id()); } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 457c5ba513ec..5f5facc57af5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -151,4 +151,12 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } + test("Paimon Procedure: rollback with cache") { + sql("CREATE TABLE T (id INT)") + sql("INSERT INTO T VALUES (1), (2), (3), (4)") + sql("DELETE FROM T WHERE id = 1") + sql("CALL sys.rollback(table => 'T', version => '1')") + sql("DELETE FROM T WHERE id = 1") + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Seq(Row(2), Row(3), Row(4))) + } }