diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 98cc91824f64c1..d73cfc405fd008 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -73,11 +73,13 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto split_source = scan_range.split_source; RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); _split_source = std::make_shared( - state, get_split_timer, split_source.split_source_id, split_source.num_splits); + state, get_split_timer, split_source.split_source_id, split_source.num_splits, + _max_scanners); } } if (_split_source == nullptr) { - _split_source = std::make_shared(scan_ranges); + _split_source = + std::make_shared(scan_ranges, _max_scanners); } _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); if (scan_ranges.size() > 0 && diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index 6533ae2bfe0d9c..478af522e76f52 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -60,7 +60,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang return Status::IOError("Failed to get batch of split source: {}", e.what()); } _last_batch = result.splits.empty(); - _scan_ranges = result.splits; + _merge_ranges(_scan_ranges, result.splits); _scan_index = 0; _range_index = 0; } diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index f62b45612bf7c2..8f38cd4f17a18f 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -43,6 +43,49 @@ class SplitSourceConnector { virtual int num_scan_ranges() = 0; virtual TFileScanRangeParams* get_params() = 0; + +protected: + template + void _merge_ranges(std::vector& merged_ranges, const std::vector& scan_ranges) { + if (scan_ranges.size() <= _max_scanners) { + merged_ranges = scan_ranges; + return; + } + + // There is no need for the number of scanners to exceed the number of threads in thread pool. + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + merged_ranges.resize(_max_scanners); + int num_ranges = scan_ranges.size() / _max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * _max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + merged_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + for (int i = num_add_one; i < _max_scanners; ++i) { + merged_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size(); + } + +protected: + int _max_scanners; }; /** @@ -59,8 +102,10 @@ class LocalSplitSourceConnector : public SplitSourceConnector { int _range_index = 0; public: - LocalSplitSourceConnector(const std::vector& scan_ranges) - : _scan_ranges(scan_ranges) {} + LocalSplitSourceConnector(const std::vector& scan_ranges, int max_scanners) { + _max_scanners = max_scanners; + _merge_ranges(_scan_ranges, scan_ranges); + } Status get_next(bool* has_next, TFileRangeDesc* range) override; @@ -98,11 +143,13 @@ class RemoteSplitSourceConnector : public SplitSourceConnector { public: RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer, - int64 split_source_id, int num_splits) + int64 split_source_id, int num_splits, int max_scanners) : _state(state), _get_split_timer(get_split_timer), _split_source_id(split_source_id), - _num_splits(num_splits) {} + _num_splits(num_splits) { + _max_scanners = max_scanners; + } Status get_next(bool* has_next, TFileRangeDesc* range) override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java index 552b701d029385..e832e193ab3dd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java @@ -80,7 +80,7 @@ public void analyze(Analyzer analyzer) throws UserException { ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); } if (quotaType == QuotaType.DATA) { - quota = ParseUtil.analyzeDataVolumn(quotaValue); + quota = ParseUtil.analyzeDataVolume(quotaValue); } else if (quotaType == QuotaType.REPLICA) { quota = ParseUtil.analyzeReplicaNumber(quotaValue); } else if (quotaType == QuotaType.TRANSACTION) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index ed775826f8a271..8f53a7a5a1aba2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -127,7 +127,7 @@ private static Map maybeRewriteByAutoBucket(DistributionDesc dis distributionDesc.setBuckets(FeConstants.default_bucket_num); } else { long partitionSize = ParseUtil - .analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); + .analyzeDataVolume(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 8dd91c3fd8a395..6debdca789f125 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -573,7 +573,7 @@ private void analyzeProperties() throws UserException { } if (properties.containsKey(PROP_MAX_FILE_SIZE)) { - maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE)); + maxFileSizeBytes = ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE)); if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) { throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index 6142feec895b53..e80860bf584392 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -179,12 +179,22 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { this.result = (LiteralExpr) this.value; } - if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) { - this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); + if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT) + || getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { + this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue()))); this.result = (LiteralExpr) this.value; } - if (getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { - this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); + if (getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { + try { + this.value = new StringLiteral( + Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue()))); + } catch (Throwable t) { + // The way of handling file_split_size should be same as exec_mem_limit or scan_queue_mem_limit. + // But ParseUtil.analyzeDataVolume() does not accept 0 as a valid value. + // So for compatibility, we set origin value to file_split_size + // when the value is 0 or other invalid value. + this.value = new StringLiteral(getResult().getStringValue()); + } this.result = (LiteralExpr) this.value; } if (getVariable().equalsIgnoreCase("is_report_success")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java index c47753d2d42b99..649f03ffcc7fbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -41,7 +41,7 @@ public class ParseUtil { private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)"); - public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException { + public static long analyzeDataVolume(String dataVolumnStr) throws AnalysisException { long dataVolumn = 0; Matcher m = dataVolumnPattern.matcher(dataVolumnStr); if (m.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 48c4a9ff3f476c..c8c4323d34d33c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -64,12 +63,13 @@ public abstract class FileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(FileScanNode.class); - public static final long DEFAULT_SPLIT_SIZE = 8 * 1024 * 1024; // 8MB + public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; protected long fileSplitSize; + protected boolean isSplitSizeSetBySession = false; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -79,7 +79,15 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St @Override public void init() throws UserException { + initFileSplitSize(); + } + + private void initFileSplitSize() { this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + this.isSplitSizeSetBySession = this.fileSplitSize > 0; + if (this.fileSplitSize <= 0) { + this.fileSplitSize = DEFAULT_SPLIT_SIZE; + } } @Override @@ -249,12 +257,6 @@ protected void setDefaultValueExprs(TableIf tbl, } } - protected List splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, - long modificationTime, boolean splittable, List partitionValues) throws IOException { - return splitFile(path, blockSize, blockLocations, length, modificationTime, splittable, partitionValues, - FileSplitCreator.DEFAULT); - } - protected List splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, long modificationTime, boolean splittable, List partitionValues, SplitCreator splitCreator) throws IOException { @@ -271,11 +273,11 @@ protected List splitFile(Path path, long blockSize, BlockLocation[] block result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); return result; } - if (fileSplitSize <= 0) { - fileSplitSize = blockSize; + // if file split size is set by session variable, use session variable. + // Otherwise, use max(file split size, block size) + if (!isSplitSizeSetBySession) { + fileSplitSize = Math.max(fileSplitSize, blockSize); } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE); long bytesRemaining; for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; bytesRemaining -= fileSplitSize) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 7f23385d847886..da88a03f2ebabe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -35,7 +35,6 @@ import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; @@ -512,8 +511,7 @@ public List getFilesByPartitions(List partitions, if (LOG.isDebugEnabled()) { LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", - fileLists.stream().mapToInt(l -> l.getFiles() == null - ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), + fileLists.stream().mapToInt(l -> l.getFiles() == null ? 0 : l.getFiles().size()).sum(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); } return fileLists; @@ -992,9 +990,6 @@ public String toString() { public static class FileCacheValue { // File Cache for self splitter. private final List files = Lists.newArrayList(); - // File split cache for old splitter. This is a temp variable. - @Deprecated - private final List splits = Lists.newArrayList(); private boolean isSplittable; // The values of partitions. // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile @@ -1015,13 +1010,6 @@ public void addFile(RemoteFile file) { } } - @Deprecated - public void addSplit(FileSplit split) { - if (isFileVisible(split.getPath())) { - splits.add(split); - } - } - public int getValuesSize() { return partitionValues == null ? 0 : partitionValues.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 1bdb805f0fd96d..6ef551825e2afc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -320,10 +320,6 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List doGetSplits() throws UserException { // get splits List splits = new ArrayList<>(); int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); HashSet partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); - CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); + CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize); try (CloseableIterable combinedScanTasks = - TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { + TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { String dataFilePath = normalizeLocation(splitTask.file().path().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index 96e96d3cf19a26..26b90c26a46f0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -139,7 +140,8 @@ public List getSplits() throws UserException { Path path = new Path(fileStatus.getPath()); try { splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(), - fileStatus.getModificationTime(), fileStatus.isSplitable, null)); + fileStatus.getModificationTime(), fileStatus.isSplitable, null, + FileSplitCreator.DEFAULT)); } catch (IOException e) { LOG.warn("get file split failed for TVF: {}", path, e); throw new UserException(e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 38b576af17511d..29e60583751899 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -689,7 +689,7 @@ public static Map maybeRewriteByAutoBucket( if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) { distributionDesc.updateBucketNum(FeConstants.default_bucket_num); } else { - long partitionSize = ParseUtil.analyzeDataVolumn( + long partitionSize = ParseUtil.analyzeDataVolume( newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets));