Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,8 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}
}

ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token();

/*********************************
* 优先级调度基本策略:
* 1. 通过查询拆分的Range个数来确定初始nice值
Expand All @@ -1348,7 +1350,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
* nice值越大的,越优先获得的查询资源
* 4. 定期提高队列内残留任务的优先级,避免大查询完全饿死
*********************************/
PriorityThreadPool* thread_pool = state->exec_env()->thread_pool();
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
_total_assign_num = 0;
_nice = 18 + std::max(0, 2 - (int)_olap_scanners.size() / 5);
std::list<OlapScanner*> olap_scanners;
Expand Down Expand Up @@ -1413,17 +1415,30 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}

auto iter = olap_scanners.begin();
while (iter != olap_scanners.end()) {
PriorityThreadPool::Task task;
task.work_function = std::bind(&OlapScanNode::scanner_thread, this, *iter);
task.priority = _nice;
(*iter)->start_wait_worker_timer();
if (thread_pool->offer(task)) {
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
if (thread_token != nullptr) {
while (iter != olap_scanners.end()) {
auto s = thread_token->submit_func(std::bind(&OlapScanNode::scanner_thread, this, *iter));
if (s.ok()) {
(*iter)->start_wait_worker_timer();
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg();
}
++_total_assign_num;
}
} else {
while (iter != olap_scanners.end()) {
PriorityThreadPool::Task task;
task.work_function = std::bind(&OlapScanNode::scanner_thread, this, *iter);
task.priority = _nice;
(*iter)->start_wait_worker_timer();
if (thread_pool->offer(task)) {
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
}
++_total_assign_num;
}
++_total_assign_num;
}

RowBatchInterface* scan_batch = NULL;
Expand Down Expand Up @@ -1466,7 +1481,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
if (NULL != scan_batch) {
add_one_batch(scan_batch);
}
}
} // end of transfer while

state->resource_pool()->release_thread_token(true);
VLOG_CRITICAL << "TransferThread finish.";
Expand Down
19 changes: 17 additions & 2 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "common/status.h"
#include "olap/options.h"
#include "util/threadpool.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -113,7 +114,8 @@ class ExecEnv {
std::shared_ptr<MemTracker> process_mem_tracker() { return _mem_tracker; }
PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; }
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
PriorityThreadPool* thread_pool() { return _thread_pool; }
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); }
PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
Expand Down Expand Up @@ -173,7 +175,20 @@ class ExecEnv {
std::shared_ptr<MemTracker> _mem_tracker;
PoolMemTrackerRegistry* _pool_mem_trackers = nullptr;
ThreadResourceMgr* _thread_mgr = nullptr;
PriorityThreadPool* _thread_pool = nullptr;

// The following two thread pools are used in different scenarios.
// _scan_thread_pool is a priority thread pool.
// Scanner threads for common queries will use this thread pool,
// and the priority of each scan task is set according to the size of the query.

// _limited_scan_thread_pool is also the thread pool used for scanner.
// The difference is that it is no longer a priority queue, but according to the concurrency
// set by the user to control the number of threads that can be used by a query.

// TODO(cmy): find a better way to unify these 2 pools.
PriorityThreadPool* _scan_thread_pool = nullptr;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

PriorityThreadPool* _etl_thread_pool = nullptr;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
Expand Down
13 changes: 10 additions & 3 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,15 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
new ExtDataSourceServiceClientCache(config::max_client_cache_size_per_host);
_pool_mem_trackers = new PoolMemTrackerRegistry();
_thread_mgr = new ThreadResourceMgr();
_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
_scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);

ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(1)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool);

_etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size,
config::etl_thread_pool_queue_size);
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
Expand Down Expand Up @@ -233,7 +240,7 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity,

void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size, [this]() {
return _thread_pool->get_queue_size();
return _scan_thread_pool->get_queue_size();
});

REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() {
Expand Down Expand Up @@ -266,7 +273,7 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_fold_constant_mgr);
SAFE_DELETE(_cgroups_mgr);
SAFE_DELETE(_etl_thread_pool);
SAFE_DELETE(_thread_pool);
SAFE_DELETE(_scan_thread_pool);
SAFE_DELETE(_thread_mgr);
SAFE_DELETE(_pool_mem_trackers);
SAFE_DELETE(_broker_client_cache);
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host));
fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
fragments_ctx->query_id = params.params.query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
Expand All @@ -577,6 +577,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi

if (params.__isset.query_options) {
fragments_ctx->timeout_second = params.query_options.query_timeout;
if (params.query_options.__isset.resource_limit) {
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
}
}

{
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
}

Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
const QueryFragmentsCtx* fragments_ctx) {
QueryFragmentsCtx* fragments_ctx) {
const TPlanFragmentExecParams& params = request.params;
_query_id = params.query_id;

Expand All @@ -79,6 +79,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
const TQueryGlobals& query_globals =
fragments_ctx == nullptr ? request.query_globals : fragments_ctx->query_globals;
_runtime_state.reset(new RuntimeState(params, request.query_options, query_globals, _exec_env));
_runtime_state->set_query_fragments_ctx(fragments_ctx);

RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
_runtime_state->set_be_number(request.backend_num);
Expand Down
49 changes: 2 additions & 47 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/datetime_value.h"
#include "runtime/query_fragments_ctx.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "util/hash_util.hpp"
Expand Down Expand Up @@ -97,7 +98,7 @@ class PlanFragmentExecutor {
// The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
// If fragments_ctx is not null, some components will be got from fragments_ctx.
Status prepare(const TExecPlanFragmentParams& request,
const QueryFragmentsCtx* fragments_ctx = nullptr);
QueryFragmentsCtx* fragments_ctx = nullptr);

// Start execution. Call this prior to get_next().
// If this fragment has a sink, open() will send all rows produced
Expand Down Expand Up @@ -264,52 +265,6 @@ class PlanFragmentExecutor {
void _collect_query_statistics();
};

// Save the common components of fragments in a query.
// Some components like DescriptorTbl may be very large
// that will slow down each execution of fragments when DeSer them every time.
class QueryFragmentsCtx {
public:
QueryFragmentsCtx(int total_fragment_num)
: fragment_num(total_fragment_num), timeout_second(-1) {
_start_time = DateTimeValue::local_time();
}

bool countdown() { return fragment_num.fetch_sub(1) == 1; }

bool is_timeout(const DateTimeValue& now) const {
if (timeout_second <= 0) {
return false;
}
if (now.second_diff(_start_time) > timeout_second) {
return true;
}
return false;
}

public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
bool set_rsc_info = false;
std::string user;
std::string group;
TNetworkAddress coord_addr;
TQueryGlobals query_globals;

/// In the current implementation, for multiple fragments executed by a query on the same BE node,
/// we store some common components in QueryFragmentsCtx, and save QueryFragmentsCtx in FragmentMgr.
/// When all Fragments are executed, QueryFragmentsCtx needs to be deleted from FragmentMgr.
/// Here we use a counter to store the number of Fragments that have not yet been completed,
/// and after each Fragment is completed, this value will be reduced by one.
/// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment
/// will clean up QueryFragmentsCtx.
std::atomic<int> fragment_num;
int timeout_second;
ObjectPool obj_pool;

private:
DateTimeValue _start_time;
};

} // namespace doris

#endif
100 changes: 100 additions & 0 deletions be/src/runtime/query_fragments_ctx.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <atomic>
#include <string>

#include "common/object_pool.h"
#include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/datetime_value.h"
#include "runtime/exec_env.h"
#include "util/threadpool.h"

namespace doris {

// Save the common components of fragments in a query.
// Some components like DescriptorTbl may be very large
// that will slow down each execution of fragments when DeSer them every time.
class DescriptorTbl;
class QueryFragmentsCtx {
public:
QueryFragmentsCtx(int total_fragment_num, ExecEnv* exec_env)
: fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) {
_start_time = DateTimeValue::local_time();
}

bool countdown() { return fragment_num.fetch_sub(1) == 1; }

bool is_timeout(const DateTimeValue& now) const {
if (timeout_second <= 0) {
return false;
}
if (now.second_diff(_start_time) > timeout_second) {
return true;
}
return false;
}

void set_thread_token(int cpu_limit) {
if (cpu_limit > 0) {
// For now, cpu_limit will be the max concurrency of the scan thread pool token.
_thread_token = _exec_env->limited_scan_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT,
cpu_limit);
}
}

ThreadPoolToken* get_token() {
return _thread_token.get();
}

public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
bool set_rsc_info = false;
std::string user;
std::string group;
TNetworkAddress coord_addr;
TQueryGlobals query_globals;

/// In the current implementation, for multiple fragments executed by a query on the same BE node,
/// we store some common components in QueryFragmentsCtx, and save QueryFragmentsCtx in FragmentMgr.
/// When all Fragments are executed, QueryFragmentsCtx needs to be deleted from FragmentMgr.
/// Here we use a counter to store the number of Fragments that have not yet been completed,
/// and after each Fragment is completed, this value will be reduced by one.
/// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment
/// will clean up QueryFragmentsCtx.
std::atomic<int> fragment_num;
int timeout_second;
ObjectPool obj_pool;
private:
ExecEnv* _exec_env;
DateTimeValue _start_time;

// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from exec env.
// And will be shared by all instances of this query.
// So that we can control the max thread that a query can be used to execute.
// If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
std::unique_ptr<ThreadPoolToken> _thread_token;
};

} // end of namespace

10 changes: 10 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/mem_pool.h"
#include "runtime/query_fragments_ctx.h"
#include "runtime/thread_resource_mgr.h"
#include "util/logging.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -371,6 +372,14 @@ class RuntimeState {
int64_t get_load_mem_limit();

RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }

void set_query_fragments_ctx(QueryFragmentsCtx* ctx) {
_query_ctx = ctx;
}

QueryFragmentsCtx* get_query_fragments_ctx() {
return _query_ctx;
}

private:
// Use a custom block manager for the query for testing purposes.
Expand Down Expand Up @@ -516,6 +525,7 @@ class RuntimeState {
/// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575).
AtomicInt32 _initial_reservation_refcnt;

QueryFragmentsCtx* _query_ctx;
// prohibit copies
RuntimeState(const RuntimeState&);
};
Expand Down
2 changes: 1 addition & 1 deletion be/test/runtime/fragment_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
PlanFragmentExecutor::~PlanFragmentExecutor() {}

Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
const QueryFragmentsCtx* batch_ctx) {
QueryFragmentsCtx* batch_ctx) {
return s_prepare_status;
}

Expand Down
Loading