From 9754dfdc4aa633269f46364727784574edb20f37 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 24 Jul 2024 18:38:24 +0800 Subject: [PATCH 1/7] 1 --- .../vec/exec/scan/split_source_connector.cpp | 2 +- be/src/vec/exec/scan/split_source_connector.h | 54 +++++++++++++++++-- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index 6533ae2bfe0d9c..ae42660be9341b 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -60,7 +60,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang return Status::IOError("Failed to get batch of split source: {}", e.what()); } _last_batch = result.splits.empty(); - _scan_ranges = result.splits; + _merge_ranges(result.splits); _scan_index = 0; _range_index = 0; } diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index f62b45612bf7c2..f757d03439e05a 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -43,6 +43,48 @@ class SplitSourceConnector { virtual int num_scan_ranges() = 0; virtual TFileScanRangeParams* get_params() = 0; + +protected: + void _merge_ranges(const std::vector& scan_ranges) { + if (scan_ranges.size() <= _max_scanners) { + _scan_ranges = scan_ranges; + return; + } + + // There is no need for the number of scanners to exceed the number of threads in thread pool. + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + _scan_ranges.resize(_max_scanners); + int num_ranges = scan_ranges.size() / _max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * _max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + for (int i = num_add_one; i < _max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); + } + +protected: + int _max_scanners; }; /** @@ -59,8 +101,11 @@ class LocalSplitSourceConnector : public SplitSourceConnector { int _range_index = 0; public: - LocalSplitSourceConnector(const std::vector& scan_ranges) - : _scan_ranges(scan_ranges) {} + LocalSplitSourceConnector(const std::vector& scan_ranges, int max_scanners) + : _max_scanners(max_scanners) { + _merge_ranges(scan_ranges); + , + } Status get_next(bool* has_next, TFileRangeDesc* range) override; @@ -98,11 +143,12 @@ class RemoteSplitSourceConnector : public SplitSourceConnector { public: RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer, - int64 split_source_id, int num_splits) + int64 split_source_id, int num_splits, int max_scanners) : _state(state), _get_split_timer(get_split_timer), _split_source_id(split_source_id), - _num_splits(num_splits) {} + _num_splits(num_splits), + _max_scanners(max_scanners) {} Status get_next(bool* has_next, TFileRangeDesc* range) override; From 69d691613feefe16e13d5fac1f0c345a4247aabc Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 30 Jul 2024 19:01:14 +0800 Subject: [PATCH 2/7] 1 --- be/src/pipeline/exec/file_scan_operator.cpp | 6 ++-- .../vec/exec/scan/split_source_connector.cpp | 2 +- be/src/vec/exec/scan/split_source_connector.h | 29 ++++++++++--------- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 98cc91824f64c1..d73cfc405fd008 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -73,11 +73,13 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto split_source = scan_range.split_source; RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); _split_source = std::make_shared( - state, get_split_timer, split_source.split_source_id, split_source.num_splits); + state, get_split_timer, split_source.split_source_id, split_source.num_splits, + _max_scanners); } } if (_split_source == nullptr) { - _split_source = std::make_shared(scan_ranges); + _split_source = + std::make_shared(scan_ranges, _max_scanners); } _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); if (scan_ranges.size() > 0 && diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index ae42660be9341b..478af522e76f52 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -60,7 +60,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang return Status::IOError("Failed to get batch of split source: {}", e.what()); } _last_batch = result.splits.empty(); - _merge_ranges(result.splits); + _merge_ranges(_scan_ranges, result.splits); _scan_index = 0; _range_index = 0; } diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index f757d03439e05a..8f38cd4f17a18f 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -45,9 +45,10 @@ class SplitSourceConnector { virtual TFileScanRangeParams* get_params() = 0; protected: - void _merge_ranges(const std::vector& scan_ranges) { + template + void _merge_ranges(std::vector& merged_ranges, const std::vector& scan_ranges) { if (scan_ranges.size() <= _max_scanners) { - _scan_ranges = scan_ranges; + merged_ranges = scan_ranges; return; } @@ -55,15 +56,15 @@ class SplitSourceConnector { // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. // In the insert statement, reading data in partition order can reduce the memory usage of BE // and prevent the generation of smaller tables. - _scan_ranges.resize(_max_scanners); + merged_ranges.resize(_max_scanners); int num_ranges = scan_ranges.size() / _max_scanners; int num_add_one = scan_ranges.size() - num_ranges * _max_scanners; int scan_index = 0; int range_index = 0; for (int i = 0; i < num_add_one; ++i) { - _scan_ranges[scan_index] = scan_ranges[range_index++]; + merged_ranges[scan_index] = scan_ranges[range_index++]; auto& ranges = - _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; for (int j = 0; j < num_ranges; j++) { auto& merged_ranges = scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; @@ -71,16 +72,16 @@ class SplitSourceConnector { } } for (int i = num_add_one; i < _max_scanners; ++i) { - _scan_ranges[scan_index] = scan_ranges[range_index++]; + merged_ranges[scan_index] = scan_ranges[range_index++]; auto& ranges = - _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; for (int j = 0; j < num_ranges - 1; j++) { auto& merged_ranges = scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } } - LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); + LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size(); } protected: @@ -101,10 +102,9 @@ class LocalSplitSourceConnector : public SplitSourceConnector { int _range_index = 0; public: - LocalSplitSourceConnector(const std::vector& scan_ranges, int max_scanners) - : _max_scanners(max_scanners) { - _merge_ranges(scan_ranges); - , + LocalSplitSourceConnector(const std::vector& scan_ranges, int max_scanners) { + _max_scanners = max_scanners; + _merge_ranges(_scan_ranges, scan_ranges); } Status get_next(bool* has_next, TFileRangeDesc* range) override; @@ -147,8 +147,9 @@ class RemoteSplitSourceConnector : public SplitSourceConnector { : _state(state), _get_split_timer(get_split_timer), _split_source_id(split_source_id), - _num_splits(num_splits), - _max_scanners(max_scanners) {} + _num_splits(num_splits) { + _max_scanners = max_scanners; + } Status get_next(bool* has_next, TFileRangeDesc* range) override; From afa3a1de7532488c97f249bddaa6381a65a8c7f3 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 30 Jul 2024 23:38:53 +0800 Subject: [PATCH 3/7] fix fe --- .../org/apache/doris/analysis/SetVar.java | 8 +++--- .../apache/doris/datasource/FileScanNode.java | 26 +++++++++++-------- .../datasource/hive/HiveMetaStoreCache.java | 10 ------- .../datasource/hive/source/HiveScanNode.java | 4 --- .../iceberg/source/IcebergScanNode.java | 6 ++--- .../datasource/tvf/source/TVFScanNode.java | 4 ++- 6 files changed, 23 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index 6142feec895b53..ce8a97a16c4edb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -179,11 +179,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { this.result = (LiteralExpr) this.value; } - if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) { - this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); - this.result = (LiteralExpr) this.value; - } - if (getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { + if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT) + ||getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT) + ||getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); this.result = (LiteralExpr) this.value; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 48c4a9ff3f476c..756d7376a4f122 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -64,12 +64,14 @@ public abstract class FileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(FileScanNode.class); - public static final long DEFAULT_SPLIT_SIZE = 8 * 1024 * 1024; // 8MB + public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; protected long fileSplitSize; + protected boolean isSplitSizeSetBySession = false; + public long rowCount = 0; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -79,7 +81,15 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St @Override public void init() throws UserException { + initFileSplitSize(); + } + + private void initFileSplitSize() { this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + this.isSplitSizeSetBySession = this.fileSplitSize > 0; + if (this.fileSplitSize <= 0) { + this.fileSplitSize = DEFAULT_SPLIT_SIZE; + } } @Override @@ -249,12 +259,6 @@ protected void setDefaultValueExprs(TableIf tbl, } } - protected List splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, - long modificationTime, boolean splittable, List partitionValues) throws IOException { - return splitFile(path, blockSize, blockLocations, length, modificationTime, splittable, partitionValues, - FileSplitCreator.DEFAULT); - } - protected List splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, long modificationTime, boolean splittable, List partitionValues, SplitCreator splitCreator) throws IOException { @@ -271,11 +275,11 @@ protected List splitFile(Path path, long blockSize, BlockLocation[] block result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); return result; } - if (fileSplitSize <= 0) { - fileSplitSize = blockSize; + // if file split size is set by session variable, use session variable. + // Otherwise, use max(file split size, block size) + if (!isSplitSizeSetBySession) { + fileSplitSize = Math.max(fileSplitSize, blockSize); } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE); long bytesRemaining; for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; bytesRemaining -= fileSplitSize) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 7f23385d847886..ebfe2b205f253f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -992,9 +992,6 @@ public String toString() { public static class FileCacheValue { // File Cache for self splitter. private final List files = Lists.newArrayList(); - // File split cache for old splitter. This is a temp variable. - @Deprecated - private final List splits = Lists.newArrayList(); private boolean isSplittable; // The values of partitions. // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile @@ -1015,13 +1012,6 @@ public void addFile(RemoteFile file) { } } - @Deprecated - public void addSplit(FileSplit split) { - if (isFileVisible(split.getPath())) { - splits.add(split); - } - } - public int getValuesSize() { return partitionValues == null ? 0 : partitionValues.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 1bdb805f0fd96d..6ef551825e2afc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -320,10 +320,6 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List doGetSplits() throws UserException { // get splits List splits = new ArrayList<>(); int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); HashSet partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); - CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); + CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize); try (CloseableIterable combinedScanTasks = - TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { + TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { String dataFilePath = normalizeLocation(splitTask.file().path().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index 96e96d3cf19a26..26b90c26a46f0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -139,7 +140,8 @@ public List getSplits() throws UserException { Path path = new Path(fileStatus.getPath()); try { splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(), - fileStatus.getModificationTime(), fileStatus.isSplitable, null)); + fileStatus.getModificationTime(), fileStatus.isSplitable, null, + FileSplitCreator.DEFAULT)); } catch (IOException e) { LOG.warn("get file split failed for TVF: {}", path, e); throw new UserException(e); From d2178b59b73fdc9307eea1c2dfbee219698c0c91 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 30 Jul 2024 23:44:17 +0800 Subject: [PATCH 4/7] 1 --- .../src/main/java/org/apache/doris/analysis/SetVar.java | 4 ++-- .../main/java/org/apache/doris/datasource/FileScanNode.java | 1 - .../org/apache/doris/datasource/hive/HiveMetaStoreCache.java | 1 - .../doris/datasource/iceberg/source/IcebergScanNode.java | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index ce8a97a16c4edb..0f2a82a5ef5699 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -180,8 +180,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { } if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT) - ||getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT) - ||getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { + || getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT) + || getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); this.result = (LiteralExpr) this.value; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 756d7376a4f122..4b7dba1f53a50e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index ebfe2b205f253f..518f43df5e78fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -35,7 +35,6 @@ import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 4504ce2a497398..56222d8495543f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -37,7 +37,6 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; From daee013bf8d8dca0bbfdf215e32b16b4deecf5a4 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 31 Jul 2024 11:35:38 +0800 Subject: [PATCH 5/7] 2 --- .../org/apache/doris/datasource/hive/HiveMetaStoreCache.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 518f43df5e78fe..da88a03f2ebabe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -511,8 +511,7 @@ public List getFilesByPartitions(List partitions, if (LOG.isDebugEnabled()) { LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", - fileLists.stream().mapToInt(l -> l.getFiles() == null - ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), + fileLists.stream().mapToInt(l -> l.getFiles() == null ? 0 : l.getFiles().size()).sum(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); } return fileLists; From 75af39adc24732bc2435d61fdeb5326aa31fb117 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 31 Jul 2024 15:10:44 +0800 Subject: [PATCH 6/7] 1 --- .../doris/analysis/AlterDatabaseQuotaStmt.java | 2 +- .../apache/doris/analysis/CreateTableStmt.java | 2 +- .../apache/doris/analysis/OutFileClause.java | 2 +- .../java/org/apache/doris/analysis/SetVar.java | 18 +++++++++++++++--- .../apache/doris/common/util/ParseUtil.java | 2 +- .../plans/commands/info/CreateTableInfo.java | 2 +- 6 files changed, 20 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java index 552b701d029385..e832e193ab3dd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java @@ -80,7 +80,7 @@ public void analyze(Analyzer analyzer) throws UserException { ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); } if (quotaType == QuotaType.DATA) { - quota = ParseUtil.analyzeDataVolumn(quotaValue); + quota = ParseUtil.analyzeDataVolume(quotaValue); } else if (quotaType == QuotaType.REPLICA) { quota = ParseUtil.analyzeReplicaNumber(quotaValue); } else if (quotaType == QuotaType.TRANSACTION) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index ed775826f8a271..8f53a7a5a1aba2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -127,7 +127,7 @@ private static Map maybeRewriteByAutoBucket(DistributionDesc dis distributionDesc.setBuckets(FeConstants.default_bucket_num); } else { long partitionSize = ParseUtil - .analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); + .analyzeDataVolume(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 8dd91c3fd8a395..6debdca789f125 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -573,7 +573,7 @@ private void analyzeProperties() throws UserException { } if (properties.containsKey(PROP_MAX_FILE_SIZE)) { - maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE)); + maxFileSizeBytes = ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE)); if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) { throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index 0f2a82a5ef5699..e80860bf584392 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -180,9 +180,21 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { } if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT) - || getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT) - || getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { - this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); + || getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { + this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue()))); + this.result = (LiteralExpr) this.value; + } + if (getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { + try { + this.value = new StringLiteral( + Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue()))); + } catch (Throwable t) { + // The way of handling file_split_size should be same as exec_mem_limit or scan_queue_mem_limit. + // But ParseUtil.analyzeDataVolume() does not accept 0 as a valid value. + // So for compatibility, we set origin value to file_split_size + // when the value is 0 or other invalid value. + this.value = new StringLiteral(getResult().getStringValue()); + } this.result = (LiteralExpr) this.value; } if (getVariable().equalsIgnoreCase("is_report_success")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java index c47753d2d42b99..649f03ffcc7fbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -41,7 +41,7 @@ public class ParseUtil { private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)"); - public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException { + public static long analyzeDataVolume(String dataVolumnStr) throws AnalysisException { long dataVolumn = 0; Matcher m = dataVolumnPattern.matcher(dataVolumnStr); if (m.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 38b576af17511d..29e60583751899 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -689,7 +689,7 @@ public static Map maybeRewriteByAutoBucket( if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) { distributionDesc.updateBucketNum(FeConstants.default_bucket_num); } else { - long partitionSize = ParseUtil.analyzeDataVolumn( + long partitionSize = ParseUtil.analyzeDataVolume( newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets)); From 340f80494f1d5f1b50465cb5fac95ebcc7cf0b8d Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 5 Aug 2024 10:46:44 +0800 Subject: [PATCH 7/7] rebase --- .../src/main/java/org/apache/doris/datasource/FileScanNode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 4b7dba1f53a50e..c8c4323d34d33c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -70,7 +70,6 @@ public abstract class FileScanNode extends ExternalScanNode { protected long totalPartitionNum = 0; protected long fileSplitSize; protected boolean isSplitSizeSetBySession = false; - public long rowCount = 0; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) {