From bd9b6df47371ae1c01f6b4dd6996eb1ace1ad13d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 26 Dec 2025 15:54:45 +0800 Subject: [PATCH 1/2] [spark] Optimize compact for data-evolution table, commit multiple times to avoid out of memory --- .../paimon/spark/procedure/CompactProcedure.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 75a99fc9b337..0ac6c369f713 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -97,6 +97,7 @@ import scala.collection.JavaConverters; import scala.collection.Seq; +import static com.sun.org.apache.xml.internal.serializer.utils.Utils.messages; import static org.apache.paimon.CoreOptions.createCommitUser; import static org.apache.paimon.spark.utils.SparkProcedureUtils.readParallelism; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -489,7 +490,6 @@ private void compactDataEvolutionTable( new DataEvolutionCompactCoordinator(table, partitionPredicate, false); CommitMessageSerializer messageSerializerser = new CommitMessageSerializer(); String commitUser = createCommitUser(table.coreOptions().toConfiguration()); - List messages = new ArrayList<>(); try { while (true) { compactionTasks = compactCoordinator.plan(); @@ -560,13 +560,15 @@ private void compactDataEvolutionTable( return messagesBytes.iterator(); }); + List messages = new ArrayList<>(); List serializedMessages = commitMessageJavaRDD.collect(); - try { + try (TableCommitImpl commit = table.newCommit(commitUser)) { for (byte[] serializedMessage : serializedMessages) { messages.add( messageSerializerser.deserialize( messageSerializerser.getVersion(), serializedMessage)); } + commit.commit(messages); } catch (Exception e) { throw new RuntimeException("Deserialize commit message failed", e); } @@ -574,12 +576,6 @@ private void compactDataEvolutionTable( } catch (EndOfScanException e) { LOG.info("Catching EndOfScanException, the compact job is finishing."); } - - try (TableCommitImpl commit = table.newCommit(commitUser)) { - commit.commit(messages); - } catch (Exception e) { - throw new RuntimeException(e); - } } private Set getHistoryPartition( From 56a0b7d8a11721f6a80882a7967ddd3b91f80c92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 26 Dec 2025 16:01:59 +0800 Subject: [PATCH 2/2] Fix minus --- .../java/org/apache/paimon/spark/procedure/CompactProcedure.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 0ac6c369f713..7785735d04fd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -97,7 +97,6 @@ import scala.collection.JavaConverters; import scala.collection.Seq; -import static com.sun.org.apache.xml.internal.serializer.utils.Utils.messages; import static org.apache.paimon.CoreOptions.createCommitUser; import static org.apache.paimon.spark.utils.SparkProcedureUtils.readParallelism; import static org.apache.paimon.utils.Preconditions.checkArgument;