Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b
}
return true;
});
DEFINE_Int32(remote_split_source_batch_size, "1024");
DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/split_source_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang
std::lock_guard<std::mutex> l(_range_lock);
*has_next = false;
if (_scan_index == _scan_ranges.size() && !_last_batch) {
SCOPED_RAW_TIMER(&_get_split_timer);
Status coord_status;
FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(),
_state->get_query_ctx()->coord_addr, &coord_status);
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/split_source_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class SplitSourceConnector {
virtual int num_scan_ranges() = 0;

virtual TFileScanRangeParams* get_params() = 0;

virtual int64_t get_split_time() { return 0; }
};

/**
Expand Down Expand Up @@ -95,6 +97,8 @@ class RemoteSplitSourceConnector : public SplitSourceConnector {
int _scan_index = 0;
int _range_index = 0;

int64_t _get_split_timer = 0;

public:
RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int num_splits)
: _state(state), _split_source_id(split_source_id), _num_splits(num_splits) {}
Expand All @@ -110,6 +114,8 @@ class RemoteSplitSourceConnector : public SplitSourceConnector {
TFileScanRangeParams* get_params() override {
LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map";
}

int64_t get_split_time() override { return _get_split_timer; }
};

} // namespace doris::vectorized
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ Status VFileScanner::prepare(
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
_get_split_timer = ADD_TIMER(_parent->_scanner_profile, "GetSplitTime");
} else {
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
_open_reader_timer =
Expand All @@ -183,6 +184,7 @@ Status VFileScanner::prepare(
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
_get_split_timer = ADD_TIMER(_local_state->scanner_profile(), "GetSplitTime");
}

_file_cache_statistics.reset(new io::FileCacheStatistics());
Expand Down Expand Up @@ -1195,6 +1197,7 @@ Status VFileScanner::close(RuntimeState* state) {
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
COUNTER_UPDATE(_get_split_timer, _split_source->get_split_time());

RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
RuntimeProfile::Counter* _get_split_timer = nullptr;

const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -74,10 +74,14 @@ public enum LocationType {
}

private LocationPath(String location) {
this(location, new HashMap<>());
this(location, Collections.emptyMap(), true);
}

public LocationPath(String location, Map<String, String> props) {
this(location, props, true);
}

public LocationPath(String location, Map<String, String> props, boolean convertPath) {
String scheme = parseScheme(location).toLowerCase();
if (scheme.isEmpty()) {
locationType = LocationType.NOSCHEME;
Expand All @@ -88,38 +92,38 @@ public LocationPath(String location, Map<String, String> props) {
locationType = LocationType.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
this.location = normalizedHdfsPath(location, host);
this.location = convertPath ? normalizedHdfsPath(location, host) : location;
break;
case FeConstants.FS_PREFIX_S3:
locationType = LocationType.S3;
this.location = location;
break;
case FeConstants.FS_PREFIX_S3A:
locationType = LocationType.S3A;
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
locationType = LocationType.S3N;
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_BOS:
locationType = LocationType.BOS;
// use s3 client to access
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_GCS:
locationType = LocationType.GCS;
// use s3 client to access
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(location)) {
locationType = LocationType.OSS_HDFS;
this.location = location;
} else {
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
Expand All @@ -128,15 +132,15 @@ public LocationPath(String location, Map<String, String> props) {
break;
case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
locationType = LocationType.COS;
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
Expand Down Expand Up @@ -331,7 +335,7 @@ public static TFileType getTFileTypeForBE(String location) {
if (location == null || location.isEmpty()) {
return null;
}
LocationPath locationPath = new LocationPath(location);
LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false);
return locationPath.getTFileTypeForBE();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,11 +786,7 @@ public Map<String, Boolean> getSpecifiedDatabaseMap(String catalogPropertyKey) {
}

public String bindBrokerName() {
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
}
return null;
return catalogProperty.getProperties().get(HMSExternalCatalog.BIND_BROKER_NAME);
}

// ATTN: this method only return all cached databases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;
private ExecutorService scheduleExecutor;

// catalog id -> HiveMetaStoreCache
private final Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
Expand Down Expand Up @@ -109,6 +110,11 @@ public ExternalMetaCacheMgr() {
Config.max_external_cache_loader_thread_pool_size * 1000,
"FileListingExecutor", 10, true);

scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"scheduleExecutor", 10, true);

fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);

Expand All @@ -121,6 +127,10 @@ public ExecutorService getFileListingExecutor() {
return fileListingExecutor;
}

public ExecutorService getScheduleExecutor() {
return scheduleExecutor;
}

public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
if (cache == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,18 +317,19 @@ public void createScanRangeLocations() throws UserException {
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
// Only provide the unique ID of split source to backend.
SplitAssignment splitAssignment = new SplitAssignment(backendPolicy, this);
splitAssignment = new SplitAssignment(
backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys);
splitAssignment.init();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
if (splitAssignment.getCurrentAssignment().isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
inputSplitsNum = splitAssignment.numApproximateSplits();
inputSplitsNum = numApproximateSplits();

TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getCurrentAssignment().values().iterator().next();
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
Expand All @@ -337,10 +338,9 @@ public void createScanRangeLocations() throws UserException {
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
// Not accurate, only used to estimate concurrency.
int numSplitsPerBE = splitAssignment.numApproximateSplits() / backendPolicy.numBackends();
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
for (Backend backend : backendPolicy.getBackends()) {
SplitSource splitSource = new SplitSource(
this::splitToScanRange, backend, locationProperties, splitAssignment, pathPartitionKeys);
SplitSource splitSource = new SplitSource(backend, splitAssignment);
splitSources.add(splitSource);
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
TScanRangeLocations curLocations = newLocations();
Expand Down Expand Up @@ -583,4 +583,15 @@ protected TFileAttributes getFileAttributes() throws UserException {
protected abstract TableIf getTargetTable() throws UserException;

protected abstract Map<String, String> getLocationProperties() throws UserException;

@Override
public void stop() {
if (splitAssignment != null) {
splitAssignment.stop();
SplitSourceManager manager = Env.getCurrentEnv().getSplitSourceManager();
for (Long sourceId : splitAssignment.getSources()) {
manager.removeSplitSource(sourceId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
output.append(getRuntimeFilterExplainString(false));
}

output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
output.append(prefix);
if (isBatchMode()) {
output.append("(approximate)");
}
output.append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
.append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n");
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
.append("\n");
Expand Down
Loading