From a1a37ccaaab94584dc8ed7b2bf5525d68ea48c3b Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 6 Jan 2025 15:15:23 +0800 Subject: [PATCH 1/3] use snapshot when SnapshotReaderImpl.toChangesPlan --- .../table/source/snapshot/SnapshotReaderImpl.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index bf19ba10c689..43a6d3c8721c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -391,14 +391,14 @@ public Plan readChanges() { groupByPartFiles(plan.files(FileKind.DELETE)); Map>> dataFiles = groupByPartFiles(plan.files(FileKind.ADD)); - - return toChangesPlan(true, plan, plan.snapshot().id() - 1, beforeFiles, dataFiles); + Snapshot beforeSnapshot = snapshotManager.snapshot(plan.snapshot().id() - 1); + return toChangesPlan(true, plan, beforeSnapshot, beforeFiles, dataFiles); } private Plan toChangesPlan( boolean isStreaming, FileStoreScan.Plan plan, - long beforeSnapshotId, + Snapshot beforeSnapshot, Map>> beforeFiles, Map>> dataFiles) { Snapshot snapshot = plan.snapshot(); @@ -416,7 +416,7 @@ private Plan toChangesPlan( Map, List> beforDeletionIndexFilesMap = deletionVectors ? indexFileHandler.scan( - beforeSnapshotId, DELETION_VECTORS_INDEX, beforeFiles.keySet()) + beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet()) : Collections.emptyMap(); Map, List> deletionIndexFilesMap = deletionVectors @@ -476,7 +476,7 @@ public Plan readIncrementalDiff(Snapshot before) { groupByPartFiles(plan.files(FileKind.ADD)); Map>> beforeFiles = groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD)); - return toChangesPlan(false, plan, before.id(), beforeFiles, dataFiles); + return toChangesPlan(false, plan, before, beforeFiles, dataFiles); } private RecordComparator partitionComparator() { From 30ef1ee935ba0c378df6f07abe7c06b717bac276 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 6 Jan 2025 18:06:54 +0800 Subject: [PATCH 2/3] add IT --- .../paimon/flink/BatchFileStoreITCase.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index d48b6e771236..6a6d5153da32 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -299,6 +299,32 @@ public void testTimeTravelReadWithSnapshotExpiration() throws Exception { .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } + @Test + public void testIncrementBetweenReadWithSnapshotExpiration() throws Exception { + String tableName = "T"; + batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111), (2, 22, 222)", tableName)); + + paimonTable(tableName).createTag("tag1", 1); + + batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)", tableName)); + paimonTable(tableName).createTag("tag2", 2); + + // expire snapshot 1 + Map expireOptions = new HashMap<>(); + expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1"); + expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1"); + FileStoreTable table = (FileStoreTable) paimonTable(tableName); + table.copy(expireOptions).newCommit("").expireSnapshots(); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1); + + assertThat( + batchSql( + String.format( + "SELECT * FROM %s /*+ OPTIONS('incremental-between' = 'tag1,tag2', 'deletion-vectors.enabled' = 'true') */", + tableName))) + .containsExactlyInAnyOrder(Row.of(3, 33, 333)); + } + @Test public void testSortSpillMerge() { sql( From d7775084b17bf9e91331bc59091b30599d286105 Mon Sep 17 00:00:00 2001 From: yantian Date: Tue, 7 Jan 2025 09:19:54 +0800 Subject: [PATCH 3/3] update IT make two tags snapshot all expire --- .../org/apache/paimon/flink/BatchFileStoreITCase.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 6a6d5153da32..d3108e374914 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -302,12 +302,14 @@ public void testTimeTravelReadWithSnapshotExpiration() throws Exception { @Test public void testIncrementBetweenReadWithSnapshotExpiration() throws Exception { String tableName = "T"; - batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111), (2, 22, 222)", tableName)); + batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111)", tableName)); paimonTable(tableName).createTag("tag1", 1); - batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)", tableName)); + batchSql(String.format("INSERT INTO %s VALUES (2, 22, 222)", tableName)); paimonTable(tableName).createTag("tag2", 2); + batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)", tableName)); + paimonTable(tableName).createTag("tag3", 3); // expire snapshot 1 Map expireOptions = new HashMap<>(); @@ -322,7 +324,7 @@ public void testIncrementBetweenReadWithSnapshotExpiration() throws Exception { String.format( "SELECT * FROM %s /*+ OPTIONS('incremental-between' = 'tag1,tag2', 'deletion-vectors.enabled' = 'true') */", tableName))) - .containsExactlyInAnyOrder(Row.of(3, 33, 333)); + .containsExactlyInAnyOrder(Row.of(2, 22, 222)); } @Test