Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -52,7 +53,11 @@

import java.net.URI;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* FileTable encapsulates a set of files to be scanned into a Table like structure,
Expand Down Expand Up @@ -83,6 +88,7 @@ public enum JobType {
private boolean strictMode;
private int loadParallelism;
// set by getFileStatusAndCalcInstance
private int numInstances = 1;
private long bytesPerInstance = 0;
// used for stream load, FILE_LOCAL or FILE_STREAM
private TFileType fileType;
Expand Down Expand Up @@ -172,7 +178,6 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
throw new UserException("No source file in this table(" + targetTable.getName() + ").");
}

int numInstances = 1;
if (jobType == JobType.BULK_LOAD) {
long totalBytes = 0;
for (TBrokerFileStatus fileStatus : fileStatuses) {
Expand All @@ -191,6 +196,7 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
}
} else {
// stream load, not need to split
numInstances = 1;
bytesPerInstance = Long.MAX_VALUE;
}
LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance);
Expand All @@ -199,6 +205,75 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
// Currently, we do not support mixed file types (or compress types).
// If any of the file is unsplittable, all files will be treated as unsplittable.
boolean isSplittable = true;
for (TBrokerFileStatus fileStatus : fileStatuses) {
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
TFileCompressType compressType =
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path);
// Now only support split plain text
if (compressType == TFileCompressType.PLAIN
&& ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON)) {
// is splittable
} else {
isSplittable = false;
break;
}
}

if (isSplittable) {
createScanRangeLocationsSplittable(context, backendPolicy, scanRangeLocations);
} else {
createScanRangeLocationsUnsplittable(context, backendPolicy, scanRangeLocations);
}
}

public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations)
throws UserException {
List<Long> fileSizes = fileStatuses.stream().map(x -> x.size).collect(Collectors.toList());
List<List<Integer>> groups = assignFilesToInstances(fileSizes, numInstances);
for (List<Integer> group : groups) {
TScanRangeLocations locations = newLocations(context.params, brokerDesc, backendPolicy);
for (int i : group) {
TBrokerFileStatus fileStatus = fileStatuses.get(i);
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
context.params.setFormatType(formatType);
TFileCompressType compressType =
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path);
context.params.setCompressType(compressType);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath);
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
}
scanRangeLocations.add(locations);
}
}

public static List<List<Integer>> assignFilesToInstances(List<Long> fileSizes, int instances) {
int n = Math.min(fileSizes.size(), instances);
PriorityQueue<Pair<Long, List<Integer>>> pq = new PriorityQueue<>(n, Comparator.comparingLong(Pair::key));
for (int i = 0; i < n; i++) {
pq.add(Pair.of(0L, new ArrayList<>()));
}
List<Integer> index = IntStream.range(0, fileSizes.size()).boxed().collect(Collectors.toList());
index.sort((i, j) -> Long.compare(fileSizes.get(j), fileSizes.get(i)));
for (int i : index) {
Pair<Long, List<Integer>> p = pq.poll();
p.value().add(i);
pq.add(Pair.of(p.key() + fileSizes.get(i), p.value()));
}
return pq.stream().map(Pair::value).collect(Collectors.toList());
}

public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {

TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy);
long curInstanceBytes = 0;
long curFileOffset = 0;
Expand All @@ -217,27 +292,16 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context
// Assign scan range locations only for broker load.
// stream load has only one file, and no need to set multi scan ranges.
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
// Now only support split plain text
if (compressType == TFileCompressType.PLAIN
&& ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON)) {
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;
} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
i++;
}
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;

// New one scan
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, brokerDesc, backendPolicy);
curInstanceBytes = 0;

} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;


public class FileGroupIntoTest {

private static Stream<Arguments> provideParameters() {
return Stream.of(
// 6, 5, 4+1, 3+2, max=6
Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 4, 6),

// 6+1, 5+2, 4+3, max=7
Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 3, 7),

// 6+3+1, 5+4+2, max=11
Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 2, 11),

// 1 group, sum = 21
Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 1, 21),

// current algorithm is not perfect,
// perfect partition: 5+4, 3+3+3, max=9
// current partition: 5+3, 4+3+3, max=10
Arguments.of(Arrays.asList(3L, 3L, 3L, 4L, 5L), 2, 10),

// current algorithm is not perfect,
// perfect partition: 3+3+6, 4+4+4, max=12
// current partition: 6+4+3, 4+4+3, max=13
Arguments.of(Arrays.asList(3L, 3L, 4L, 4L, 4L, 6L), 2, 13)
);
}

@ParameterizedTest
@MethodSource("provideParameters")
public void testAssignFilesToInstances(List<Long> fileSizes, int numInstances, long expected) {
List<List<Integer>> groups = FileGroupInfo.assignFilesToInstances(fileSizes, numInstances);
long max = groups.stream().map(group -> group.stream().mapToLong(fileSizes::get).sum())
.max(Long::compare).orElse(0L);
Assertions.assertEquals(expected, max);
}
}