From fc6ac7f3b219b332a2b8377d17fa54a0d7a130db Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 14 Nov 2024 19:47:42 +0800 Subject: [PATCH] [performance](load) fix broker load scan ranges for unsplittable files (#43161) --- .../doris/datasource/FileGroupInfo.java | 98 +++++++++++++++---- .../doris/datasource/FileGroupIntoTest.java | 66 +++++++++++++ 2 files changed, 147 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 4cea2e7e883fcb..b84a546e4f5b8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; @@ -52,7 +53,11 @@ import java.net.URI; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * FileTable encapsulates a set of files to be scanned into a Table like structure, @@ -83,6 +88,7 @@ public enum JobType { private boolean strictMode; private int loadParallelism; // set by getFileStatusAndCalcInstance + private int numInstances = 1; private long bytesPerInstance = 0; // used for stream load, FILE_LOCAL or FILE_STREAM private TFileType fileType; @@ -172,7 +178,6 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throw new UserException("No source file in this table(" + targetTable.getName() + ")."); } - int numInstances = 1; if (jobType == JobType.BULK_LOAD) { long totalBytes = 0; for (TBrokerFileStatus fileStatus : fileStatuses) { @@ -191,6 +196,7 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) } } else { // stream load, not need to split + numInstances = 1; bytesPerInstance = Long.MAX_VALUE; } LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); @@ -199,6 +205,75 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { + // Currently, we do not support mixed file types (or compress types). + // If any of the file is unsplittable, all files will be treated as unsplittable. + boolean isSplittable = true; + for (TBrokerFileStatus fileStatus : fileStatuses) { + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + // Now only support split plain text + if (compressType == TFileCompressType.PLAIN + && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON)) { + // is splittable + } else { + isSplittable = false; + break; + } + } + + if (isSplittable) { + createScanRangeLocationsSplittable(context, backendPolicy, scanRangeLocations); + } else { + createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations); + } + } + + public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) + throws UserException { + List fileSizes = fileStatuses.stream().map(x -> x.size).collect(Collectors.toList()); + List> groups = assignFilesToInstances(fileSizes, numInstances); + for (List group : groups) { + TScanRangeLocations locations = newLocations(context.params, brokerDesc, backendPolicy); + for (int i : group) { + TBrokerFileStatus fileStatus = fileStatuses.get(i); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + context.params.setFormatType(formatType); + TFileCompressType compressType = + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path); + context.params.setCompressType(compressType); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnNamesFromPath()); + TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath); + locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + } + scanRangeLocations.add(locations); + } + } + + public static List> assignFilesToInstances(List fileSizes, int instances) { + int n = Math.min(fileSizes.size(), instances); + PriorityQueue>> pq = new PriorityQueue<>(n, Comparator.comparingLong(Pair::key)); + for (int i = 0; i < n; i++) { + pq.add(Pair.of(0L, new ArrayList<>())); + } + List index = IntStream.range(0, fileSizes.size()).boxed().collect(Collectors.toList()); + index.sort((i, j) -> Long.compare(fileSizes.get(j), fileSizes.get(i))); + for (int i : index) { + Pair> p = pq.poll(); + p.value().add(i); + pq.add(Pair.of(p.key() + fileSizes.get(i), p.value())); + } + return pq.stream().map(Pair::value).collect(Collectors.toList()); + } + + public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context, + FederationBackendPolicy backendPolicy, + List scanRangeLocations) throws UserException { + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); long curInstanceBytes = 0; long curFileOffset = 0; @@ -217,27 +292,16 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context // Assign scan range locations only for broker load. // stream load has only one file, and no need to set multi scan ranges. if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { - // Now only support split plain text - if (compressType == TFileCompressType.PLAIN - && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) - || formatType == TFileFormatType.FORMAT_JSON)) { - long rangeBytes = bytesPerInstance - curInstanceBytes; - TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, - columnsFromPath); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - curFileOffset += rangeBytes; - } else { - TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes, - columnsFromPath); - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - i++; - } + long rangeBytes = bytesPerInstance - curInstanceBytes; + TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, + columnsFromPath); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curFileOffset += rangeBytes; // New one scan scanRangeLocations.add(curLocations); curLocations = newLocations(context.params, brokerDesc, backendPolicy); curInstanceBytes = 0; - } else { TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java new file mode 100644 index 00000000000000..b4470075717062 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + + +public class FileGroupIntoTest { + + private static Stream provideParameters() { + return Stream.of( + // 6, 5, 4+1, 3+2, max=6 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 4, 6), + + // 6+1, 5+2, 4+3, max=7 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 3, 7), + + // 6+3+1, 5+4+2, max=11 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 2, 11), + + // 1 group, sum = 21 + Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 1, 21), + + // current algorithm is not perfect, + // perfect partition: 5+4, 3+3+3, max=9 + // current partition: 5+3, 4+3+3, max=10 + Arguments.of(Arrays.asList(3L, 3L, 3L, 4L, 5L), 2, 10), + + // current algorithm is not perfect, + // perfect partition: 3+3+6, 4+4+4, max=12 + // current partition: 6+4+3, 4+4+3, max=13 + Arguments.of(Arrays.asList(3L, 3L, 4L, 4L, 4L, 6L), 2, 13) + ); + } + + @ParameterizedTest + @MethodSource("provideParameters") + public void testAssignFilesToInstances(List fileSizes, int numInstances, long expected) { + List> groups = FileGroupInfo.assignFilesToInstances(fileSizes, numInstances); + long max = groups.stream().map(group -> group.stream().mapToLong(fileSizes::get).sum()) + .max(Long::compare).orElse(0L); + Assertions.assertEquals(expected, max); + } +}