Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
// port for brpc
CONF_Int32(brpc_port, "8060");

// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1");

// port to brpc server for single replica load
Expand Down Expand Up @@ -385,8 +386,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
CONF_Int32(number_tablet_writer_threads, "16");
CONF_Int32(number_slave_replica_download_threads, "64");

// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
CONF_Int32(brpc_heavy_work_pool_threads, "192");
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
CONF_Int32(brpc_light_work_pool_threads, "32");
CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");

// The maximum amount of data that can be processed by a stream load
CONF_mInt64(streaming_load_max_mb, "10240");
Expand Down
953 changes: 597 additions & 356 deletions be/src/service/internal_service.cpp

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,13 @@ class PInternalServiceImpl : public PBackendService {

private:
ExecEnv* _exec_env;
PriorityThreadPool _tablet_worker_pool;
PriorityThreadPool _slave_replica_worker_pool;

// every brpc service request should put into thread pool
// the reason see issue #16634
// define the interface for reading and writing data as heavy interface
// otherwise as light interface
PriorityThreadPool _heavy_work_pool;
PriorityThreadPool _light_work_pool;
};

} // namespace doris
10 changes: 10 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ class DorisMetrics {
IntCounter* upload_rowset_count;
IntCounter* upload_fail_count;

UIntGauge* light_work_pool_queue_size;
UIntGauge* heavy_work_pool_queue_size;
UIntGauge* heavy_work_active_threads;
UIntGauge* light_work_active_threads;

UIntGauge* heavy_work_pool_max_queue_size;
UIntGauge* light_work_pool_max_queue_size;
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
Expand Down
6 changes: 5 additions & 1 deletion be/src/util/priority_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class PriorityThreadPool {
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name)
: _work_queue(queue_size), _shutdown(false), _name(name) {
: _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
Expand Down Expand Up @@ -101,6 +101,7 @@ class PriorityThreadPool {
virtual void join() { _threads.join_all(); }

virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
virtual uint32_t get_active_threads() const { return _active_threads; }

// Blocks until the work queue is empty, and then calls shutdown to stop the worker
// threads and Join to wait until they are finished.
Expand Down Expand Up @@ -136,7 +137,9 @@ class PriorityThreadPool {
while (!is_shutdown()) {
Task task;
if (_work_queue.blocking_get(&task)) {
_active_threads++;
task.work_function();
_active_threads--;
}
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
Expand All @@ -151,6 +154,7 @@ class PriorityThreadPool {
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
std::string _name;
std::atomic<int> _active_threads;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
|`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 |
|`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 |
|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 |
|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |

### 机器监控

Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ enum PCacheStatus {
INVALID_KEY_RANGE = 6;
DATA_OVERDUE = 7;
EMPTY_DATA = 8;
CANCELED = 9;
};

enum CacheType {
Expand Down