diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index bf19ba10c689..43a6d3c8721c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -391,14 +391,14 @@ public Plan readChanges() { groupByPartFiles(plan.files(FileKind.DELETE)); Map>> dataFiles = groupByPartFiles(plan.files(FileKind.ADD)); - - return toChangesPlan(true, plan, plan.snapshot().id() - 1, beforeFiles, dataFiles); + Snapshot beforeSnapshot = snapshotManager.snapshot(plan.snapshot().id() - 1); + return toChangesPlan(true, plan, beforeSnapshot, beforeFiles, dataFiles); } private Plan toChangesPlan( boolean isStreaming, FileStoreScan.Plan plan, - long beforeSnapshotId, + Snapshot beforeSnapshot, Map>> beforeFiles, Map>> dataFiles) { Snapshot snapshot = plan.snapshot(); @@ -416,7 +416,7 @@ private Plan toChangesPlan( Map, List> beforDeletionIndexFilesMap = deletionVectors ? indexFileHandler.scan( - beforeSnapshotId, DELETION_VECTORS_INDEX, beforeFiles.keySet()) + beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet()) : Collections.emptyMap(); Map, List> deletionIndexFilesMap = deletionVectors @@ -476,7 +476,7 @@ public Plan readIncrementalDiff(Snapshot before) { groupByPartFiles(plan.files(FileKind.ADD)); Map>> beforeFiles = groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD)); - return toChangesPlan(false, plan, before.id(), beforeFiles, dataFiles); + return toChangesPlan(false, plan, before, beforeFiles, dataFiles); } private RecordComparator partitionComparator() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index d48b6e771236..d3108e374914 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -299,6 +299,34 @@ public void testTimeTravelReadWithSnapshotExpiration() throws Exception { .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } + @Test + public void testIncrementBetweenReadWithSnapshotExpiration() throws Exception { + String tableName = "T"; + batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111)", tableName)); + + paimonTable(tableName).createTag("tag1", 1); + + batchSql(String.format("INSERT INTO %s VALUES (2, 22, 222)", tableName)); + paimonTable(tableName).createTag("tag2", 2); + batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)", tableName)); + paimonTable(tableName).createTag("tag3", 3); + + // expire snapshot 1 + Map expireOptions = new HashMap<>(); + expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1"); + expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1"); + FileStoreTable table = (FileStoreTable) paimonTable(tableName); + table.copy(expireOptions).newCommit("").expireSnapshots(); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1); + + assertThat( + batchSql( + String.format( + "SELECT * FROM %s /*+ OPTIONS('incremental-between' = 'tag1,tag2', 'deletion-vectors.enabled' = 'true') */", + tableName))) + .containsExactlyInAnyOrder(Row.of(2, 22, 222)); + } + @Test public void testSortSpillMerge() { sql(