From 87a37bbf2e484d483d27d1e2a14d2d747fe370bb Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Sun, 3 Nov 2024 19:10:22 +0800 Subject: [PATCH 1/3] [performance](load) fix broker load scan ranges for unsplittable files --- .../doris/datasource/FileGroupInfo.java | 81 +++++++++++++++---- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 932e698e2cdec9..5bf1c95bc034fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; @@ -53,7 +54,9 @@ import java.net.URI; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; /** * FileTable encapsulates a set of files to be scanned into a Table like structure, @@ -84,6 +87,7 @@ public enum JobType { private boolean strictMode; private int loadParallelism; // set by getFileStatusAndCalcInstance + private long numInstances = 1; private long bytesPerInstance = 0; // used for stream load, FILE_LOCAL or FILE_STREAM private TFileType fileType; @@ -189,7 +193,6 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throw new UserException("No source file in this table(" + targetTable.getName() + ")."); } - int numInstances = 1; if (jobType == JobType.BULK_LOAD) { long totalBytes = 0; for (TBrokerFileStatus fileStatus : fileStatuses) { @@ -208,6 +211,7 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) } } else { // stream load, not need to split + numInstances = 1; bytesPerInstance = Long.MAX_VALUE; } LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); @@ -216,6 +220,60 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { + // Currently, we do not support mixed file types (or compress types). + // If any of the file is unsplittable, all files will be treated as unsplittable. + boolean isSplittable = true; + for (TBrokerFileStatus fileStatus : fileStatuses) { + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + // Now only support split plain text + if (compressType == TFileCompressType.PLAIN + && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON)) { + // is splittable + } else { + isSplittable = false; + break; + } + } + + if (isSplittable) { + createScanRangeLocationsSplittable(context, backendPolicy, scanRangeLocations); + } else { + createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations); + } + } + + public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) + throws UserException { + PriorityQueue> pq = new PriorityQueue<>(Comparator.comparingLong(Pair::key)); + for (int i = 0; i < Math.min(fileStatuses.size(), numInstances); i++) { + pq.add(Pair.of(0L, newLocations(context.params, brokerDesc, backendPolicy))); + } + fileStatuses.sort((a, b) -> Long.compare(b.size, a.size)); + for (TBrokerFileStatus fileStatus : fileStatuses) { + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + context.params.setFormatType(formatType); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + context.params.setCompressType(compressType); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnNamesFromPath()); + Pair p = pq.poll(); + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); + p.value().getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + pq.add(Pair.of(p.key() + fileStatus.size, p.value())); + } + pq.stream().map(Pair::value).forEach(scanRangeLocations::add); + } + + public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); long curInstanceBytes = 0; long curFileOffset = 0; @@ -234,27 +292,16 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context // Assign scan range locations only for broker load. // stream load has only one file, and no need to set multi scan ranges. if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { - // Now only support split plain text - if (compressType == TFileCompressType.PLAIN - && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) - || formatType == TFileFormatType.FORMAT_JSON)) { - long rangeBytes = bytesPerInstance - curInstanceBytes; - TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - curFileOffset += rangeBytes; - } else { - TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes, - columnsFromPath); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - i++; - } + long rangeBytes = bytesPerInstance - curInstanceBytes; + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset += rangeBytes; // New one scan scanRangeLocations.add(curLocations); curLocations = newLocations(context.params, brokerDesc, backendPolicy); curInstanceBytes = 0; - } else { TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); From 9f6a294fbb4fd126155c5079cbdc905c86e53b5b Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 5 Nov 2024 16:58:19 +0800 Subject: [PATCH 2/3] abstract and UT --- .../doris/datasource/FileGroupInfo.java | 53 ++++++++++----- .../doris/datasource/FileGroupIntoTest.java | 66 +++++++++++++++++++ 2 files changed, 101 insertions(+), 18 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 5bf1c95bc034fb..54034ee0f650df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -57,6 +57,8 @@ import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * FileTable encapsulates a set of files to be scanned into a Table like structure, @@ -87,7 +89,7 @@ public enum JobType { private boolean strictMode; private int loadParallelism; // set by getFileStatusAndCalcInstance - private long numInstances = 1; + private int numInstances = 1; private long bytesPerInstance = 0; // used for stream load, FILE_LOCAL or FILE_STREAM private TFileType fileType; @@ -249,25 +251,40 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { - PriorityQueue> pq = new PriorityQueue<>(Comparator.comparingLong(Pair::key)); - for (int i = 0; i < Math.min(fileStatuses.size(), numInstances); i++) { - pq.add(Pair.of(0L, newLocations(context.params, brokerDesc, backendPolicy))); + List fileSizes = fileStatuses.stream().map(x -> x.size).collect(Collectors.toList()); + List> groups = assignFilesToInstances(fileSizes, numInstances); + for (List group : groups) { + TScanRangeLocations locations = newLocations(context.params, brokerDesc, backendPolicy); + for (int i : group) { + TBrokerFileStatus fileStatus = fileStatuses.get(i); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + context.params.setFormatType(formatType); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + context.params.setCompressType(compressType); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnNamesFromPath()); + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); + locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + } + scanRangeLocations.add(locations); } - fileStatuses.sort((a, b) -> Long.compare(b.size, a.size)); - for (TBrokerFileStatus fileStatus : fileStatuses) { - TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); - context.params.setFormatType(formatType); - TFileCompressType compressType = - Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); - context.params.setCompressType(compressType); - List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, - context.fileGroup.getColumnNamesFromPath()); - Pair p = pq.poll(); - TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); - p.value().getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - pq.add(Pair.of(p.key() + fileStatus.size, p.value())); + } + + public static List> assignFilesToInstances(List fileSizes, int instances) { + int n = Math.min(fileSizes.size(), instances); + PriorityQueue>> pq = new PriorityQueue<>(n, Comparator.comparingLong(Pair::key)); + for (int i = 0; i < n; i++) { + pq.add(Pair.of(0L, new ArrayList<>())); + } + List index = IntStream.range(0, fileSizes.size()).boxed().collect(Collectors.toList()); + index.sort((i, j) -> Long.compare(fileSizes.get(j), fileSizes.get(i))); + for (int i : index) { + Pair> p = pq.poll(); + p.value().add(i); + pq.add(Pair.of(p.key() + fileSizes.get(i), p.value())); } - pq.stream().map(Pair::value).forEach(scanRangeLocations::add); + return pq.stream().map(Pair::value).collect(Collectors.toList()); } public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context, diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java new file mode 100644 index 00000000000000..71faa08597c393 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + + +public class FileGroupIntoTest { + + private static Stream provideParameters() { + return Stream.of( + // 6, 1+5, 2+4, 3, max=6 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 4, 6), + + // 6+1, 5+2, 4+3, max=7 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 3, 7), + + // 6+3+1, 5+4+2, max=11 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 2, 11), + + // 1 group, sum = 21 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 1, 21), + + // current algorithm is not perfect, + // perfect partition: 5+4, 3+3+3, max=9 + // current partition: 5+3, 4+3+3, max=10 + Arguments.of(Arrays.asList(3L, 3L, 3L, 4L, 5L), 2, 10), + + // current algorithm is not perfect, + // perfect partition: 3+3+6, 4+4+4, max=12 + // current partition: 6+4+3, 4+4+3, max=13 + Arguments.of(Arrays.asList(3L, 3L, 4L, 4L, 4L, 6L), 2, 13) + ); + } + + @ParameterizedTest + @MethodSource("provideParameters") + public void testAssignFilesToInstances(List fileSizes, int numInstances, long expected) { + List> groups = FileGroupInfo.assignFilesToInstances(fileSizes, numInstances); + long max = groups.stream().map(group -> group.stream().mapToLong(fileSizes::get).sum()) + .max(Long::compare).orElse(0L); + Assertions.assertEquals(expected, max); + } +} From 52c0e21e023dc39fd45fd88725b5fee659b43e82 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 14 Nov 2024 11:48:51 +0800 Subject: [PATCH 3/3] update comments Co-authored-by: Xin Liao --- .../java/org/apache/doris/datasource/FileGroupIntoTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java index 71faa08597c393..b4470075717062 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java @@ -31,7 +31,7 @@ public class FileGroupIntoTest { private static Stream provideParameters() { return Stream.of( - // 6, 1+5, 2+4, 3, max=6 + // 6, 5, 4+1, 3+2, max=6 Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 4, 6), // 6+1, 5+2, 4+3, max=7