diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 0b243558dda3..2b73c59ca876 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -860,20 +860,17 @@ private void testNoChangelogProducerRandom( // Deletion vectors mode not support concurrent write numProducers = 1; } - List results = - testRandom( - tEnv, - numProducers, - enableFailure, - "'bucket' = '4'," - + String.format( - "'deletion-vectors.enabled' = '%s'", - enableDeletionVectors)); - - for (TableResult result : results) { - result.await(); - } - checkBatchResult(numProducers); + + testRandom( + tEnv, + numProducers, + enableFailure, + "'bucket' = '4'," + + String.format( + "'deletion-vectors.enabled' = '%s'", enableDeletionVectors)); + + // changelog is produced by Flink normalize operator + checkChangelogTestResult(numProducers); } private void testFullCompactionChangelogProducerRandom( @@ -1059,7 +1056,7 @@ private void checkChangelogTestResult(int numProducers) throws Exception { *

All jobs will modify the same set of partitions to emulate conflicting writes. Each job * will write its own set of keys for easy result checking. */ - private List testRandom( + private void testRandom( TableEnvironment tEnv, int numProducers, boolean enableFailure, String tableProperties) throws Exception { // producers will very quickly produce snapshots, @@ -1130,8 +1127,6 @@ private List testRandom( + ")") .await(); - List results = new ArrayList<>(); - if (enableFailure) { FailingFileIO.reset(failingName, 2, 10000); } @@ -1157,18 +1152,14 @@ private List testRandom( // run test SQL int idx = i; - TableResult result = - FailingFileIO.retryArtificialException( - () -> - tEnv.executeSql( - "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView" - + idx - + " WHERE i < " - + usefulNumRows)); - results.add(result); + FailingFileIO.retryArtificialException( + () -> + tEnv.executeSql( + "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView" + + idx + + " WHERE i < " + + usefulNumRows)); } - - return results; } private void checkBatchResult(int numProducers) throws Exception {