From 0673e7bea1722ab95e27eb2ea274f7e1b663e33e Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 29 Apr 2025 15:17:48 +0800 Subject: [PATCH] [fix](paimon)Set the target size of the split for 3.0 (#50405) bp: #50083 --- .../paimon/source/PaimonScanNode.java | 39 ++++- .../paimon/source/PaimonSource.java | 8 + .../paimon/source/PaimonScanNodeTest.java | 157 ++++++++++++++++++ 3 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 5afef56f085c80..a7367f3e2744f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -123,6 +124,11 @@ protected void doInitialize() throws UserException { Preconditions.checkNotNull(source); } + @VisibleForTesting + public void setSource(PaimonSource source) { + this.source = source; + } + @Override protected void convertPredicate() { PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter( @@ -211,17 +217,12 @@ public List getSplits(int numBackends) throws UserException { SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType .valueOf(sessionVariable.getIgnoreSplitType()); List splits = new ArrayList<>(); - int[] projected = desc.getSlots().stream().mapToInt( - slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName()))) - .toArray(); - ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); - List paimonSplits = readBuilder.withFilter(predicates) - .withProjection(projected) - .newScan().plan().splits(); + List paimonSplits = getPaimonSplitFromAPI(); boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); + long realFileSplitSize = getRealFileSplitSize(0); for (org.apache.paimon.table.source.Split split : paimonSplits) { SplitStat splitStat = new SplitStat(); splitStat.setRowCount(split.rowCount()); @@ -249,7 +250,7 @@ public List getSplits(int numBackends) throws UserException { try { List dorisSplits = FileSplitter.splitFile( locationPath, - getRealFileSplitSize(0), + realFileSplitSize, null, file.length(), -1, @@ -289,11 +290,30 @@ public List getSplits(int numBackends) throws UserException { splitStats.add(splitStat); } + // We need to set the target size for all splits so that we can calculate the proportion of each split later. + splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number return splits; } + @VisibleForTesting + public List getPaimonSplitFromAPI() { + int[] projected = desc.getSlots().stream().mapToInt( + slot -> source.getPaimonTable().rowType() + .getFieldNames() + .stream() + .map(String::toLowerCase) + .collect(Collectors.toList()) + .indexOf(slot.getColumn().getName())) + .toArray(); + ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); + return readBuilder.withFilter(predicates) + .withProjection(projected) + .newScan().plan().splits(); + } + private void createRawFileSplits(List rawFiles, List splits, long blockSize) throws UserException { for (RawFile file : rawFiles) { LocationPath locationPath = new LocationPath(file.path(), @@ -320,7 +340,8 @@ private String getFileFormat(String path) { return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties()); } - private boolean supportNativeReader(Optional> optRawFiles) { + @VisibleForTesting + public boolean supportNativeReader(Optional> optRawFiles) { if (!optRawFiles.isPresent()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index a8bb814f1d353b..1c6b88b16ec57b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.thrift.TFileAttributes; +import com.google.common.annotations.VisibleForTesting; import org.apache.paimon.table.Table; @@ -34,6 +35,13 @@ public class PaimonSource { private final Table originTable; private final TupleDescriptor desc; + @VisibleForTesting + public PaimonSource() { + this.desc = null; + this.paimonExtTable = null; + this.originTable = null; + } + public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java new file mode 100644 index 00000000000000..4a2d61f2c7dc0b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; + +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.RawFile; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class PaimonScanNodeTest { + + @Mocked + private SessionVariable sv; + + @Mocked + private PaimonFileExternalCatalog paimonFileExternalCatalog; + + @Test + public void testSplitWeight() throws UserException { + + TupleDescriptor desc = new TupleDescriptor(new TupleId(3)); + PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1), desc, false, sv); + + paimonScanNode.setSource(new PaimonSource()); + + DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", 64 * 1024 * 1024, 1, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + BinaryRow binaryRow1 = BinaryRow.singleColumn(1); + DataSplit ds1 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow1) + .withBucket(1) + .withBucketPath("b1") + .withDataFiles(Collections.singletonList(dfm1)) + .build(); + + DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", 32 * 1024 * 1024, 2, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + BinaryRow binaryRow2 = BinaryRow.singleColumn(1); + DataSplit ds2 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow2) + .withBucket(1) + .withBucketPath("b1") + .withDataFiles(Collections.singletonList(dfm2)) + .build(); + + + new MockUp() { + @Mock + public List getPaimonSplitFromAPI() { + return new ArrayList() {{ + add(ds1); + add(ds2); + }}; + } + }; + + new MockUp() { + @Mock + public ExternalCatalog getCatalog() { + return paimonFileExternalCatalog; + } + }; + + new MockUp() { + @Mock + public Map getProperties() { + return Collections.emptyMap(); + } + }; + + new Expectations() {{ + sv.isForceJniScanner(); + result = false; + + sv.getIgnoreSplitType(); + result = "NONE"; + }}; + + // native + mockNativeReader(); + List s1 = paimonScanNode.getSplits(1); + PaimonSplit s11 = (PaimonSplit) s1.get(0); + PaimonSplit s12 = (PaimonSplit) s1.get(1); + Assert.assertEquals(2, s1.size()); + Assert.assertEquals(100, s11.getSplitWeight().getRawValue()); + Assert.assertNull(s11.getSplit()); + Assert.assertEquals(50, s12.getSplitWeight().getRawValue()); + Assert.assertNull(s12.getSplit()); + + // jni + mockJniReader(); + List s2 = paimonScanNode.getSplits(1); + PaimonSplit s21 = (PaimonSplit) s2.get(0); + PaimonSplit s22 = (PaimonSplit) s2.get(1); + Assert.assertEquals(2, s2.size()); + Assert.assertNotNull(s21.getSplit()); + Assert.assertNotNull(s22.getSplit()); + Assert.assertEquals(100, s21.getSplitWeight().getRawValue()); + Assert.assertEquals(50, s22.getSplitWeight().getRawValue()); + } + + private void mockJniReader() { + new MockUp() { + @Mock + public boolean supportNativeReader(Optional> optRawFiles) { + return false; + } + }; + } + + private void mockNativeReader() { + new MockUp() { + @Mock + public boolean supportNativeReader(Optional> optRawFiles) { + return true; + } + }; + } +}