From 8bee49f062d23d0c4a200a71c47951aa5e7aef02 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 9 Dec 2024 11:59:22 +0800 Subject: [PATCH] [core] Optimize drop partitions to avoid stack overflow --- .../utils/InternalRowPartitionComputer.java | 18 +++++++++++ .../paimon/operation/FileStoreCommitImpl.java | 32 ++++++++++++------- .../paimon/partition/PartitionPredicate.java | 15 +++++++++ 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java index 8de2720a26e9..6bb26d76138e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java @@ -21,6 +21,7 @@ import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.InternalRow.FieldGetter; import org.apache.paimon.types.DataType; @@ -32,6 +33,7 @@ import java.util.Map; import static org.apache.paimon.utils.InternalRowUtils.createNullCheckingFieldGetter; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.TypeUtils.castFromString; /** PartitionComputer for {@link InternalRow}. */ @@ -102,6 +104,22 @@ public static Map convertSpecToInternal( return partValues; } + public static GenericRow convertSpecToInternalRow( + Map spec, RowType partType, String defaultPartValue) { + checkArgument(spec.size() == partType.getFieldCount()); + GenericRow partRow = new GenericRow(spec.size()); + List fieldNames = partType.getFieldNames(); + for (Map.Entry entry : spec.entrySet()) { + Object value = + defaultPartValue.equals(entry.getValue()) + ? null + : castFromString( + entry.getValue(), partType.getField(entry.getKey()).type()); + partRow.setField(fieldNames.indexOf(entry.getKey()), value); + } + return partRow; + } + public static String partToSimpleString( RowType partitionType, BinaryRow partition, String delimiter, int maxLength) { FieldGetter[] getters = partitionType.fieldGetters(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 4808975fa763..1c503689c24e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -83,6 +83,7 @@ import static org.apache.paimon.manifest.ManifestEntry.recordCount; import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd; import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete; +import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; @@ -530,17 +531,26 @@ public void dropPartitions(List> partitions, long commitIden partitions.stream().map(Objects::toString).collect(Collectors.joining(","))); } - // partitions may be partial partition fields, so here must to use predicate way. - Predicate predicate = - partitions.stream() - .map( - partition -> - createPartitionPredicate( - partition, partitionType, partitionDefaultName)) - .reduce(PredicateBuilder::or) - .orElseThrow(() -> new RuntimeException("Failed to get partition filter.")); - PartitionPredicate partitionFilter = - PartitionPredicate.fromPredicate(partitionType, predicate); + boolean fullMode = + partitions.stream().allMatch(part -> part.size() == partitionType.getFieldCount()); + PartitionPredicate partitionFilter; + if (fullMode) { + List binaryPartitions = + createBinaryPartitions(partitions, partitionType, partitionDefaultName); + partitionFilter = PartitionPredicate.fromMultiple(partitionType, binaryPartitions); + } else { + // partitions may be partial partition fields, so here must to use predicate way. + Predicate predicate = + partitions.stream() + .map( + partition -> + createPartitionPredicate( + partition, partitionType, partitionDefaultName)) + .reduce(PredicateBuilder::or) + .orElseThrow( + () -> new RuntimeException("Failed to get partition filter.")); + partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate); + } tryOverwrite( partitionFilter, diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java index 12ea884be15f..1f6c2cfe454e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java @@ -19,8 +19,10 @@ package org.apache.paimon.partition; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.format.SimpleColStats; @@ -33,6 +35,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,6 +43,7 @@ import java.util.Set; import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal; +import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -231,4 +235,15 @@ static Predicate createPartitionPredicate( .map(p -> createPartitionPredicate(p, rowType, defaultPartValue)) .toArray(Predicate[]::new)); } + + static List createBinaryPartitions( + List> partitions, RowType partitionType, String defaultPartValue) { + InternalRowSerializer serializer = new InternalRowSerializer(partitionType); + List result = new ArrayList<>(); + for (Map spec : partitions) { + GenericRow row = convertSpecToInternalRow(spec, partitionType, defaultPartValue); + result.add(serializer.toBinaryRow(row).copy()); + } + return result; + } }