diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 75a99fc9b337..7785735d04fd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -489,7 +489,6 @@ private void compactDataEvolutionTable( new DataEvolutionCompactCoordinator(table, partitionPredicate, false); CommitMessageSerializer messageSerializerser = new CommitMessageSerializer(); String commitUser = createCommitUser(table.coreOptions().toConfiguration()); - List messages = new ArrayList<>(); try { while (true) { compactionTasks = compactCoordinator.plan(); @@ -560,13 +559,15 @@ private void compactDataEvolutionTable( return messagesBytes.iterator(); }); + List messages = new ArrayList<>(); List serializedMessages = commitMessageJavaRDD.collect(); - try { + try (TableCommitImpl commit = table.newCommit(commitUser)) { for (byte[] serializedMessage : serializedMessages) { messages.add( messageSerializerser.deserialize( messageSerializerser.getVersion(), serializedMessage)); } + commit.commit(messages); } catch (Exception e) { throw new RuntimeException("Deserialize commit message failed", e); } @@ -574,12 +575,6 @@ private void compactDataEvolutionTable( } catch (EndOfScanException e) { LOG.info("Catching EndOfScanException, the compact job is finishing."); } - - try (TableCommitImpl commit = table.newCommit(commitUser)) { - commit.commit(messages); - } catch (Exception e) { - throw new RuntimeException(e); - } } private Set getHistoryPartition(