From 0d07cf922fb12ec0f63af384c8155f69bd2002f4 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 16 Apr 2025 11:22:25 +0800 Subject: [PATCH 1/3] fix --- .../doris/datasource/paimon/source/PaimonScanNode.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index a8d9c0c42e5173..aa2120f15806ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -280,6 +280,9 @@ public List getSplits(int numBackends) throws UserException { boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); + // if applyCountPushdown is true, we cannot to split the file + // because the raw file and deletion vector is one-to-one mapping + long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0); for (org.apache.paimon.table.source.Split split : paimonSplits) { SplitStat splitStat = new SplitStat(); splitStat.setRowCount(split.rowCount()); @@ -304,9 +307,7 @@ public List getSplits(int numBackends) throws UserException { try { List dorisSplits = FileSplitter.splitFile( locationPath, - // if applyCountPushdown is true, we can't to split the file - // becasue the raw file and deletion vector is one-to-one mapping - getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0), + realFileSplitSize, null, file.length(), -1, @@ -344,6 +345,9 @@ public List getSplits(int numBackends) throws UserException { splitStats.add(splitStat); } + // We need to set the target size for all splits so that we can calculate the proportion of each split later. + splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + // if applyCountPushdown is true, calcute row count for count pushdown if (applyCountPushdown) { // we can create a special empty split and skip the plan process From 0ac8c304c8769c527b8fd41733e1a613291cebff Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Thu, 17 Apr 2025 20:33:45 +0800 Subject: [PATCH 2/3] fix --- .../paimon/source/PaimonScanNode.java | 45 +++--- .../paimon/source/PaimonSource.java | 8 ++ .../paimon/source/PaimonScanNodeTest.java | 131 ++++++++++++++++++ 3 files changed, 167 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index aa2120f15806ba..6398f6da8d3a7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -128,6 +128,11 @@ protected void doInitialize() throws UserException { params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } + @VisibleForTesting + public void setSource(PaimonSource source) { + this.source = source; + } + @Override protected void convertPredicate() { PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter( @@ -260,23 +265,8 @@ public List getSplits(int numBackends) throws UserException { SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType .valueOf(sessionVariable.getIgnoreSplitType()); List splits = new ArrayList<>(); - if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) { - // an empty table in PaimonSnapshotCacheValue - return splits; - } - int[] projected = desc.getSlots().stream().mapToInt( - slot -> source.getPaimonTable().rowType() - .getFieldNames() - .stream() - .map(String::toLowerCase) - .collect(Collectors.toList()) - .indexOf(slot.getColumn().getName())) - .toArray(); - ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); - List paimonSplits = readBuilder.withFilter(predicates) - .withProjection(projected) - .newScan().plan().splits(); + List paimonSplits = getPaimonSplitFromAPI(); boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); @@ -374,11 +364,32 @@ public List getSplits(int numBackends) throws UserException { return splits; } + @VisibleForTesting + public List getPaimonSplitFromAPI() { + if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) { + // an empty table in PaimonSnapshotCacheValue + return Collections.emptyList(); + } + int[] projected = desc.getSlots().stream().mapToInt( + slot -> source.getPaimonTable().rowType() + .getFieldNames() + .stream() + .map(String::toLowerCase) + .collect(Collectors.toList()) + .indexOf(slot.getColumn().getName())) + .toArray(); + ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); + return readBuilder.withFilter(predicates) + .withProjection(projected) + .newScan().plan().splits(); + } + private String getFileFormat(String path) { return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties()); } - private boolean supportNativeReader(Optional> optRawFiles) { + @VisibleForTesting + public boolean supportNativeReader(Optional> optRawFiles) { if (!optRawFiles.isPresent()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index a8bb814f1d353b..1c6b88b16ec57b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.thrift.TFileAttributes; +import com.google.common.annotations.VisibleForTesting; import org.apache.paimon.table.Table; @@ -34,6 +35,13 @@ public class PaimonSource { private final Table originTable; private final TupleDescriptor desc; + @VisibleForTesting + public PaimonSource() { + this.desc = null; + this.paimonExtTable = null; + this.originTable = null; + } + public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index f67f3a9397788b..b0ab482df6be3e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -17,13 +17,34 @@ package org.apache.doris.datasource.paimon.source; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; + +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.Split; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; public class PaimonScanNodeTest { @@ -152,4 +173,114 @@ public Optional> deletionFiles() { Optional result = PaimonScanNode.calcuteTableLevelCount(splits); Assert.assertFalse(result.isPresent()); } + + @Mocked + private SessionVariable sv; + + @Mocked + private PaimonFileExternalCatalog paimonFileExternalCatalog; + + @Test + public void testSplitWeight() throws UserException { + + TupleDescriptor desc = new TupleDescriptor(new TupleId(3)); + PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1), desc, false, sv); + + paimonScanNode.setSource(new PaimonSource()); + + DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", 64 * 1024 * 1024, 1, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + BinaryRow binaryRow1 = BinaryRow.singleColumn(1); + DataSplit ds1 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow1) + .withBucket(1) + .withBucketPath("b1") + .withDataFiles(Collections.singletonList(dfm1)) + .build(); + + DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", 32 * 1024 * 1024, 2, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + BinaryRow binaryRow2 = BinaryRow.singleColumn(1); + DataSplit ds2 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow2) + .withBucket(1) + .withBucketPath("b1") + .withDataFiles(Collections.singletonList(dfm2)) + .build(); + + + new MockUp() { + @Mock + public List getPaimonSplitFromAPI() { + return new ArrayList() {{ + add(ds1); + add(ds2); + }}; + } + }; + + new MockUp() { + @Mock + public ExternalCatalog getCatalog() { + return paimonFileExternalCatalog; + } + }; + + new MockUp() { + @Mock + public Map getProperties() { + return Collections.emptyMap(); + } + }; + + new Expectations() {{ + sv.isForceJniScanner(); + result = false; + + sv.getIgnoreSplitType(); + result = "NONE"; + }}; + + // native + mockNativeReader(); + List s1 = paimonScanNode.getSplits(1); + PaimonSplit s11 = (PaimonSplit) s1.get(0); + PaimonSplit s12 = (PaimonSplit) s1.get(1); + Assert.assertEquals(2, s1.size()); + Assert.assertEquals(100, s11.getSplitWeight().getRawValue()); + Assert.assertNull(s11.getSplit()); + Assert.assertEquals(50, s12.getSplitWeight().getRawValue()); + Assert.assertNull(s12.getSplit()); + + // jni + mockJniReader(); + List s2 = paimonScanNode.getSplits(1); + PaimonSplit s21 = (PaimonSplit) s2.get(0); + PaimonSplit s22 = (PaimonSplit) s2.get(1); + Assert.assertEquals(2, s2.size()); + Assert.assertNotNull(s21.getSplit()); + Assert.assertNotNull(s22.getSplit()); + Assert.assertEquals(100, s21.getSplitWeight().getRawValue()); + Assert.assertEquals(50, s22.getSplitWeight().getRawValue()); + } + + private void mockJniReader() { + new MockUp() { + @Mock + public boolean supportNativeReader(Optional> optRawFiles) { + return false; + } + }; + } + + private void mockNativeReader() { + new MockUp() { + @Mock + public boolean supportNativeReader(Optional> optRawFiles) { + return true; + } + }; + } } From 5b3af459be84d915a646649c2a7d637ee9cac8ee Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sat, 19 Apr 2025 09:53:46 +0800 Subject: [PATCH 3/3] fix --- .../doris/datasource/paimon/source/PaimonScanNodeTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index b0ab482df6be3e..bc253680eb0a73 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -29,8 +29,6 @@ import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.stats.SimpleStats;