From ea435c691e69289c3f0765dbe66e56b478c864d6 Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Thu, 12 Dec 2024 21:21:10 +0800 Subject: [PATCH] [core] fix the issue where streaming reading of overwrite data would fail when retract type data appeared. --- .../IncrementalChangelogReadProvider.java | 8 ++-- .../paimon/flink/ReadWriteTableITCase.java | 37 +++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java index 308c09d14204..eb41d02669fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java @@ -60,20 +60,20 @@ private SplitRead create(Supplier supplier) { ConcatRecordReader.create( () -> new ReverseReader( - read.createNoMergeReader( + read.createMergeReader( split.partition(), split.bucket(), split.beforeFiles(), split.beforeDeletionFiles() .orElse(null), - true)), + false)), () -> - read.createNoMergeReader( + read.createMergeReader( split.partition(), split.bucket(), split.dataFiles(), split.deletionFiles().orElse(null), - true)); + false)); return unwrap(reader); }; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 10de1ae4839f..732e96454236 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -801,6 +801,43 @@ public void testStreamingReadOverwriteWithoutPartitionedRecords() throws Excepti streamingItr.close(); } + @Test + public void testStreamingReadOverwriteWithDeleteRecords() throws Exception { + String table = + createTable( + Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), + Collections.singletonList("currency"), + Collections.emptyList(), + Collections.emptyList(), + streamingReadOverwrite); + + insertInto( + table, + "('US Dollar', 102, '2022-01-01')", + "('Yen', 1, '2022-01-02')", + "('Euro', 119, '2022-01-02')"); + + bEnv.executeSql(String.format("DELETE FROM %s WHERE currency = 'Euro'", table)).await(); + + checkFileStorePath(table, Collections.emptyList()); + + // test projection and filter + BlockingIterator streamingItr = + testStreamingRead( + buildQuery(table, "currency, rate", "WHERE dt = '2022-01-02'"), + Collections.singletonList(changelogRow("+I", "Yen", 1L))); + + insertOverwrite(table, "('US Dollar', 100, '2022-01-02')", "('Yen', 10, '2022-01-01')"); + + validateStreamingReadResult( + streamingItr, + Arrays.asList( + changelogRow("-D", "Yen", 1L), changelogRow("+I", "US Dollar", 100L))); + assertNoMoreRecords(streamingItr); + + streamingItr.close(); + } + @Test public void testUnsupportStreamingReadOverwriteWithoutPk() { assertThatThrownBy(