From 97f7c6a443589fca80c434bbc141b776d2ade2dc Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Sat, 29 Jun 2024 15:09:06 +0800 Subject: [PATCH] add set thread num config for wg flush pool --- be/src/common/config.cpp | 3 ++ be/src/common/config.h | 4 ++ be/src/olap/delta_writer_v2.cpp | 2 +- be/src/olap/storage_engine.cpp | 3 +- be/src/olap/storage_engine.h | 4 ++ be/src/runtime/query_context.cpp | 6 +-- be/src/runtime/query_context.h | 4 +- .../runtime/workload_group/workload_group.cpp | 53 ++++++++++++------- .../runtime/workload_group/workload_group.h | 4 +- .../vec/sink/writer/async_result_writer.cpp | 27 +++------- 10 files changed, 64 insertions(+), 46 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c2274fd169bb75..9e46d3328fc66e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -691,6 +691,9 @@ DEFINE_Int32(high_priority_flush_thread_num_per_store, "6"); // max_flush_thread_num_per_cpu * num_cpu) DEFINE_Int32(max_flush_thread_num_per_cpu, "4"); +DEFINE_mInt32(wg_flush_thread_num_per_store, "6"); +DEFINE_mInt32(wg_flush_thread_num_per_cpu, "4"); + // config for tablet meta checkpoint DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 9920b65fe52ddf..90e95824d3dd59 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -743,6 +743,10 @@ DECLARE_Int32(high_priority_flush_thread_num_per_store); // max_flush_thread_num_per_cpu * num_cpu) DECLARE_Int32(max_flush_thread_num_per_cpu); +// workload group flush pool params +DECLARE_mInt32(wg_flush_thread_num_per_store); +DECLARE_mInt32(wg_flush_thread_num_per_cpu); + // config for tablet meta checkpoint DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num); DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 80978280b9236b..3f2f7bf99fa834 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -128,7 +128,7 @@ Status DeltaWriterV2::init() { RETURN_IF_ERROR(_rowset_writer->init(context)); ThreadPool* wg_thread_pool_ptr = nullptr; if (_state->get_query_ctx()) { - wg_thread_pool_ptr = _state->get_query_ctx()->get_non_pipe_exec_thread_pool(); + wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool(); } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, wg_thread_pool_ptr, diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 5d50bb5f4dfdb6..90093241ad274c 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -233,8 +233,9 @@ Status StorageEngine::_open() { auto dirs = get_stores(); RETURN_IF_ERROR(load_data_dirs(dirs)); + _disk_num = dirs.size(); _memtable_flush_executor = std::make_unique(); - _memtable_flush_executor->init(dirs.size()); + _memtable_flush_executor->init(_disk_num); _calc_delete_bitmap_executor = std::make_unique(); _calc_delete_bitmap_executor->init(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 94cf142a8c1ec2..5ddd888db6d243 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -131,6 +131,8 @@ class BaseStorageEngine { int64_t memory_limitation_bytes_per_thread_for_schema_change() const; + int get_disk_num() { return _disk_num; } + protected: void _evict_querying_rowset(); void _evict_quring_rowset_thread_callback(); @@ -153,6 +155,8 @@ class BaseStorageEngine { scoped_refptr _evict_quering_rowset_thread; int64_t _memory_limitation_bytes_for_schema_change; + + int _disk_num {-1}; }; class StorageEngine final : public BaseStorageEngine { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 18d565dcfef439..dd7cf4f55b8706 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -326,9 +326,9 @@ doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { return _exec_env->pipeline_task_scheduler(); } -ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { +ThreadPool* QueryContext::get_memtable_flush_pool() { if (_workload_group) { - return _non_pipe_thread_pool; + return _memtable_flush_pool; } else { return nullptr; } @@ -340,7 +340,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { // see task_group_manager::delete_workload_group_by_ids _workload_group->add_mem_tracker_limiter(query_mem_tracker); _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, - &_non_pipe_thread_pool, &_remote_scan_task_scheduler); + &_memtable_flush_pool, &_remote_scan_task_scheduler); return Status::OK(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index aee05ed3185327..b565214ef22082 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -205,7 +205,7 @@ class QueryContext { doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); - ThreadPool* get_non_pipe_exec_thread_pool(); + ThreadPool* get_memtable_flush_pool(); std::vector get_fragment_instance_ids() const { return fragment_instance_ids; } @@ -298,7 +298,7 @@ class QueryContext { doris::pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; - ThreadPool* _non_pipe_thread_pool = nullptr; + ThreadPool* _memtable_flush_pool = nullptr; vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr _execution_dependency; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index b7a460902303f0..843e06440d2e95 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -27,6 +27,7 @@ #include #include "common/logging.h" +#include "olap/storage_engine.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" @@ -430,19 +431,35 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e min_remote_scan_thread_num); } - if (_non_pipe_thread_pool == nullptr) { - std::unique_ptr thread_pool = nullptr; - auto ret = ThreadPoolBuilder("nonPip_" + tg_name) - .set_min_threads(1) - .set_max_threads(config::fragment_pool_thread_num_max) - .set_max_queue_size(config::fragment_pool_queue_size) - .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) - .build(&thread_pool); - if (!ret.ok()) { - LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid=" - << tg_id; - } else { - _non_pipe_thread_pool = std::move(thread_pool); + if (_memtable_flush_pool == nullptr) { + int num_disk = ExecEnv::GetInstance()->storage_engine().get_disk_num(); + // -1 means disk num may not be inited, so not create flush pool + if (num_disk != -1) { + std::unique_ptr thread_pool = nullptr; + num_disk = std::max(1, num_disk); + int num_cpus = std::thread::hardware_concurrency(); + + int min_threads = std::max(1, config::wg_flush_thread_num_per_store); + int max_threads = num_cpus == 0 + ? num_disk * min_threads + : std::min(num_disk * min_threads, + num_cpus * config::wg_flush_thread_num_per_cpu); + + std::string pool_name = "wg_flush_" + tg_name; + auto ret = ThreadPoolBuilder(pool_name) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) + .build(&thread_pool); + if (!ret.ok()) { + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid=" + << tg_id; + } else { + _memtable_flush_pool = std::move(thread_pool); + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id + << ", max thread num=" << max_threads + << ", min thread num=" << min_threads; + } } } @@ -470,13 +487,13 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool, + ThreadPool** memtable_flush_pool, vectorized::SimplifiedScanScheduler** remote_scan_sched) { std::shared_lock rlock(_task_sched_lock); *exec_sched = _task_sched.get(); *scan_sched = _scan_task_sched.get(); *remote_scan_sched = _remote_scan_task_sched.get(); - *non_pipe_thread_pool = _non_pipe_thread_pool.get(); + *memtable_flush_pool = _memtable_flush_pool.get(); } void WorkloadGroup::try_stop_schedulers() { @@ -490,9 +507,9 @@ void WorkloadGroup::try_stop_schedulers() { if (_remote_scan_task_sched) { _remote_scan_task_sched->stop(); } - if (_non_pipe_thread_pool) { - _non_pipe_thread_pool->shutdown(); - _non_pipe_thread_pool->wait(); + if (_memtable_flush_pool) { + _memtable_flush_pool->shutdown(); + _memtable_flush_pool->wait(); } } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 8386d778aece6a..786e297bc293ef 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -151,7 +151,7 @@ class WorkloadGroup : public std::enable_shared_from_this { void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool, + ThreadPool** memtable_flush_pool, vectorized::SimplifiedScanScheduler** remote_scan_sched); void try_stop_schedulers(); @@ -189,7 +189,7 @@ class WorkloadGroup : public std::enable_shared_from_this { std::unique_ptr _task_sched {nullptr}; std::unique_ptr _scan_task_sched {nullptr}; std::unique_ptr _remote_scan_task_sched {nullptr}; - std::unique_ptr _non_pipe_thread_pool = nullptr; + std::unique_ptr _memtable_flush_pool = nullptr; }; using WorkloadGroupPtr = std::shared_ptr; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 42fd8468e86987..82c5f4ab2883cd 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -96,25 +96,14 @@ Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* prof // This is a async thread, should lock the task ctx, to make sure runtimestate and profile // not deconstructed before the thread exit. auto task_ctx = state->get_task_execution_context(); - if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) { - ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool(); - RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return; - } - this->process_block(state, profile); - })); - } else { - RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( - [this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return; - } - this->process_block(state, profile); - })); - } + RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this, state, profile, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + return; + } + this->process_block(state, profile); + })); return Status::OK(); }