From 250e9b30b769dc81650b3b186e22ace6c2311950 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Fri, 24 Jul 2020 08:07:34 +0000 Subject: [PATCH 1/3] [refactor] Optimize threads usage mode in BE BE can not graceful exit because some threads are running in endless loop. This patch do the following optimization: - Use the well encapsulated Thread and ThreadPool instead of std::thread and std::vector - Use CountDownLatch in thread's loop condition to avoid endless loop - Introduce a new class Daemon for daemon works, like tcmalloc_gc, memory_maintenance and calculate_metrics - Decouple statistics type TaskWorkerPool and StorageEngine notification by submit tasks to TaskWorkerPool's queue - Reorder objects' stop and deconstruct in main(), i.e. stop network services at first, then internal services - Use libevent in pthreads mode, by calling evthread_use_pthreads(), then EvHttpServer can exit gracefully in multi-threads - Call brpc::Server's Stop() and ClearServices() explicitly --- be/CMakeLists.txt | 6 + be/src/agent/heartbeat_server.cpp | 2 +- be/src/agent/task_worker_pool.cpp | 708 ++++++++---------- be/src/agent/task_worker_pool.h | 53 +- be/src/common/daemon.cpp | 77 +- be/src/common/daemon.h | 39 +- be/src/exec/tablet_sink.cpp | 17 +- be/src/exec/tablet_sink.h | 5 +- be/src/http/ev_http_server.cpp | 54 +- be/src/http/ev_http_server.h | 6 +- be/src/olap/memtable_flush_executor.h | 6 +- be/src/olap/olap_meta.cpp | 4 +- be/src/olap/olap_server.cpp | 239 +++--- be/src/olap/storage_engine.cpp | 63 +- be/src/olap/storage_engine.h | 77 +- be/src/runtime/broker_mgr.cpp | 19 +- be/src/runtime/broker_mgr.h | 8 +- be/src/runtime/disk_io_mgr.cc | 4 +- be/src/runtime/exec_env_init.cpp | 54 +- be/src/runtime/external_scan_context_mgr.cpp | 20 +- be/src/runtime/external_scan_context_mgr.h | 12 +- be/src/runtime/fragment_mgr.cpp | 23 +- be/src/runtime/fragment_mgr.h | 8 +- be/src/runtime/load_channel_mgr.cpp | 30 +- be/src/runtime/load_channel_mgr.h | 7 +- be/src/runtime/load_path_mgr.cpp | 28 +- be/src/runtime/load_path_mgr.h | 12 +- be/src/runtime/result_buffer_mgr.cpp | 20 +- be/src/runtime/result_buffer_mgr.h | 7 +- .../routine_load/data_consumer_pool.cpp | 23 +- .../runtime/routine_load/data_consumer_pool.h | 17 +- be/src/service/brpc_service.cpp | 2 + be/src/service/doris_main.cpp | 14 +- be/src/service/http_service.cpp | 6 +- be/src/service/http_service.h | 2 + .../external_scan_context_mgr_test.cpp | 4 - 36 files changed, 853 insertions(+), 823 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index b2b3b7904fc199..f0eee6694f7dc8 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -87,6 +87,7 @@ if(PIC_LIB_PATH) set(LIBBZ2 ${PIC_LIB_PATH}/lib/libbz2.a) set(LIBZ ${PIC_LIB_PATH}/lib/libz.a) set(LIBEVENT ${PIC_LIB_PATH}/lib/libevent.a) + set(LIBEVENT_PTHREADS ${PIC_LIB_PATH}/lib/libevent_pthreads.a) else() message(STATUS "undefined PIC_LIB_PATH") set(Boost_USE_STATIC_LIBS ON) @@ -94,6 +95,7 @@ else() set(LIBBZ2 -lbz2) set(LIBZ -lz) set(LIBEVENT event) + set(LIBEVENT_PTHREADS libevent_pthreads) endif() @@ -181,6 +183,9 @@ endif() add_library(libevent STATIC IMPORTED) set_target_properties(libevent PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent.a) +add_library(libevent_pthreads STATIC IMPORTED) +set_target_properties(libevent_pthreads PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent_pthreads.a) + add_library(crypto STATIC IMPORTED) set_target_properties(crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libcrypto.a) @@ -399,6 +404,7 @@ set(DORIS_DEPENDENCIES pprof lz4 libevent + libevent_pthreads curl ${LIBZ} ${LIBBZ2} diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 6725c1e9ec83e6..2157c42bb642d9 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -159,7 +159,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; - _olap_engine->trigger_report(); + _olap_engine->notify_listeners(); } return Status::OK(); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b557f3eeafedd5..b30790cd688eab 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -52,6 +52,7 @@ #include "util/file_utils.h" #include "util/monotime.h" #include "util/stopwatch.hpp" +#include "util/threadpool.h" using std::deque; using std::list; @@ -83,94 +84,101 @@ TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* e _master_client(new MasterServerClient(_master_info, &_master_service_client_cache)), _env(env), _worker_thread_condition_variable(&_worker_thread_lock), + _stop_background_threads_latch(1), + _is_work(false), _task_worker_type(task_worker_type) { _backend.__set_host(BackendOptions::get_localhost()); _backend.__set_be_port(config::be_port); _backend.__set_http_port(config::webserver_port); } -TaskWorkerPool::~TaskWorkerPool() {} +TaskWorkerPool::~TaskWorkerPool() { + _stop_background_threads_latch.count_down(); + stop(); +} void TaskWorkerPool::start() { // Init task pool and task workers + _is_work = true; + std::function cb; switch (_task_worker_type) { case TaskWorkerType::CREATE_TABLE: _worker_count = config::create_tablet_worker_count; - _callback_function = _create_tablet_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_create_tablet_worker_thread_callback, this); break; case TaskWorkerType::DROP_TABLE: _worker_count = config::drop_tablet_worker_count; - _callback_function = _drop_tablet_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this); break; case TaskWorkerType::PUSH: case TaskWorkerType::REALTIME_PUSH: _worker_count = config::push_worker_count_normal_priority + config::push_worker_count_high_priority; - _callback_function = _push_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_push_worker_thread_callback, this); break; case TaskWorkerType::PUBLISH_VERSION: _worker_count = config::publish_version_worker_count; - _callback_function = _publish_version_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_publish_version_worker_thread_callback, this); break; case TaskWorkerType::CLEAR_TRANSACTION_TASK: _worker_count = config::clear_transaction_task_worker_count; - _callback_function = _clear_transaction_task_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback, this); break; case TaskWorkerType::DELETE: _worker_count = config::delete_worker_count; - _callback_function = _push_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_push_worker_thread_callback, this); break; case TaskWorkerType::ALTER_TABLE: _worker_count = config::alter_tablet_worker_count; - _callback_function = _alter_tablet_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this); break; case TaskWorkerType::CLONE: _worker_count = config::clone_worker_count; - _callback_function = _clone_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_clone_worker_thread_callback, this); break; case TaskWorkerType::STORAGE_MEDIUM_MIGRATE: _worker_count = config::storage_medium_migrate_count; - _callback_function = _storage_medium_migrate_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback, this); break; case TaskWorkerType::CHECK_CONSISTENCY: _worker_count = config::check_consistency_worker_count; - _callback_function = _check_consistency_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_check_consistency_worker_thread_callback, this); break; case TaskWorkerType::REPORT_TASK: _worker_count = REPORT_TASK_WORKER_COUNT; - _callback_function = _report_task_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_report_task_worker_thread_callback, this); break; case TaskWorkerType::REPORT_DISK_STATE: _worker_count = REPORT_DISK_STATE_WORKER_COUNT; - _callback_function = _report_disk_state_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_report_disk_state_worker_thread_callback, this); break; case TaskWorkerType::REPORT_OLAP_TABLE: _worker_count = REPORT_OLAP_TABLE_WORKER_COUNT; - _callback_function = _report_tablet_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_report_tablet_worker_thread_callback, this); break; case TaskWorkerType::UPLOAD: _worker_count = config::upload_worker_count; - _callback_function = _upload_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_upload_worker_thread_callback, this); break; case TaskWorkerType::DOWNLOAD: _worker_count = config::download_worker_count; - _callback_function = _download_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_download_worker_thread_callback, this); break; case TaskWorkerType::MAKE_SNAPSHOT: _worker_count = config::make_snapshot_worker_count; - _callback_function = _make_snapshot_thread_callback; + cb = std::bind(&TaskWorkerPool::_make_snapshot_thread_callback, this); break; case TaskWorkerType::RELEASE_SNAPSHOT: _worker_count = config::release_snapshot_worker_count; - _callback_function = _release_snapshot_thread_callback; + cb = std::bind(&TaskWorkerPool::_release_snapshot_thread_callback, this); break; case TaskWorkerType::MOVE: _worker_count = 1; - _callback_function = _move_dir_thread_callback; + cb = std::bind(&TaskWorkerPool::_move_dir_thread_callback, this); break; case TaskWorkerType::UPDATE_TABLET_META_INFO: _worker_count = 1; - _callback_function = _update_tablet_meta_worker_thread_callback; + cb = std::bind(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, this); break; default: // pass @@ -178,12 +186,26 @@ void TaskWorkerPool::start() { } #ifndef BE_TEST - for (uint32_t i = 0; i < _worker_count; i++) { - _spawn_callback_worker_thread(_callback_function); - } + // TODO(yingchun): need a better name + ThreadPoolBuilder(strings::Substitute("TaskWorkerPool.$0", _task_worker_type)) + .set_min_threads(_worker_count) + .set_max_threads(_worker_count) + .build(&_thread_pool); + + auto st = _thread_pool->submit_func(cb); + CHECK(st.ok()) << st.to_string(); #endif } +void TaskWorkerPool::stop() { + { + lock_guard worker_thread_lock(_worker_thread_lock); + _is_work = false; + _worker_thread_condition_variable.notify_all(); + } + _thread_pool->shutdown(); +} + void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { const TTaskType::type task_type = task.task_type; int64_t signature = task.signature; @@ -230,33 +252,6 @@ void TaskWorkerPool::_remove_task_info(const TTaskType::type task_type, int64_t << ", queue_size=" << queue_size; } -void TaskWorkerPool::_spawn_callback_worker_thread(CALLBACK_FUNCTION callback_func) { - pthread_t thread; - sigset_t mask; - sigset_t omask; - int err = 0; - - // TODO: why need to catch these signals, should leave a comment - sigemptyset(&mask); - sigaddset(&mask, SIGCHLD); - sigaddset(&mask, SIGHUP); - sigaddset(&mask, SIGPIPE); - pthread_sigmask(SIG_SETMASK, &mask, &omask); - - while (true) { - err = pthread_create(&thread, NULL, callback_func, this); - if (err != 0) { - LOG(WARNING) << "failed to spawn a thread. error: " << err; -#ifndef BE_TEST - sleep(config::sleep_one_second); -#endif - } else { - pthread_detach(thread); - break; - } - } -} - void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) { // Return result to FE TMasterResult result; @@ -274,9 +269,7 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) LOG(WARNING) << "finish task failed. status_code=" << result.status.status_code; try_time += 1; } -#ifndef BE_TEST sleep(config::sleep_one_second); -#endif } } @@ -306,23 +299,22 @@ uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count, return index; } -void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_create_tablet_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TCreateTabletReq create_tablet_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); create_tablet_req = agent_task_req.create_tablet_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } TStatusCode::type status_code = TStatusCode::OK; @@ -331,7 +323,7 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { std::vector finish_tablet_infos; OLAPStatus create_status = - worker_pool_this->_env->storage_engine()->create_tablet(create_tablet_req); + _env->storage_engine()->create_tablet(create_tablet_req); if (create_status != OLAPStatus::OLAP_SUCCESS) { LOG(WARNING) << "create table failed. status: " << create_status << ", signature: " << agent_task_req.signature; @@ -359,37 +351,33 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { TFinishTaskRequest finish_task_request; finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_report_version(_s_report_version); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_drop_tablet_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TDropTabletReq drop_tablet_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); drop_tablet_req = agent_task_req.drop_tablet_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } TStatusCode::type status_code = TStatusCode::OK; @@ -414,34 +402,30 @@ void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_alter_tablet_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); - worker_pool_this->_tasks.pop_front(); + agent_task_req = _tasks.front(); + _tasks.pop_front(); } int64_t signatrue = agent_task_req.signature; LOG(INFO) << "get alter table task, signature: " << agent_task_req.signature; @@ -459,24 +443,20 @@ void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) { TTaskType::type task_type = agent_task_req.task_type; switch (task_type) { case TTaskType::ALTER: - worker_pool_this->_alter_tablet(worker_pool_this, agent_task_req, signatrue, - task_type, &finish_task_request); + _alter_tablet(agent_task_req, signatrue, + task_type, &finish_task_request); break; default: // pass break; } - worker_pool_this->_finish_task(finish_task_request); + _finish_task(finish_task_request); } - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void TaskWorkerPool::_alter_tablet(TaskWorkerPool* worker_pool_this, - const TAgentTaskRequest& agent_task_req, int64_t signature, +void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int64_t signature, const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) { AgentStatus status = DORIS_SUCCESS; @@ -507,7 +487,7 @@ void TaskWorkerPool::_alter_tablet(TaskWorkerPool* worker_pool_this, new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash; EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2, signature, task_type, &error_msgs, process_name); - OLAPStatus sc_status = worker_pool_this->_env->storage_engine()->execute_task(&engine_task); + OLAPStatus sc_status = _env->storage_engine()->execute_task(&engine_task); if (sc_status != OLAP_SUCCESS) { if (sc_status == OLAP_ERR_DATA_QUALITY_ERR) { error_msgs.push_back("The data quality does not satisfy, please check your data. "); @@ -564,78 +544,73 @@ void TaskWorkerPool::_alter_tablet(TaskWorkerPool* worker_pool_this, finish_task_request->__set_task_status(task_status); } -void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - +void TaskWorkerPool::_push_worker_thread_callback() { // gen high priority worker thread TPriority::type priority = TPriority::NORMAL; int32_t push_worker_count_high_priority = config::push_worker_count_high_priority; static uint32_t s_worker_count = 0; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); + lock_guard worker_thread_lock(_worker_thread_lock); if (s_worker_count < push_worker_count_high_priority) { ++s_worker_count; priority = TPriority::HIGH; } } -#ifndef BE_TEST - while (true) { -#endif + while (_is_work) { AgentStatus status = DORIS_SUCCESS; TAgentTaskRequest agent_task_req; TPushReq push_req; int32_t index = 0; do { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - index = worker_pool_this->_get_next_task_index( + index = _get_next_task_index( config::push_worker_count_normal_priority + config::push_worker_count_high_priority, - worker_pool_this->_tasks, priority); + _tasks, priority); if (index < 0) { // there is no high priority task. notify other thread to handle normal task - worker_pool_this->_worker_thread_condition_variable.notify_one(); + _worker_thread_condition_variable.notify_one(); break; } - agent_task_req = worker_pool_this->_tasks[index]; + agent_task_req = _tasks[index]; push_req = agent_task_req.push_req; - worker_pool_this->_tasks.erase(worker_pool_this->_tasks.begin() + index); + _tasks.erase(_tasks.begin() + index); } while (0); -#ifndef BE_TEST if (index < 0) { // there is no high priority task in queue sleep(1); continue; } -#endif LOG(INFO) << "get push task. signature: " << agent_task_req.signature - << " priority: " << priority << " push_type: " << push_req.push_type; + << " priority: " << priority << " push_type: " << push_req.push_type; vector tablet_infos; EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req.signature, &status); - worker_pool_this->_env->storage_engine()->execute_task(&engine_task); + _env->storage_engine()->execute_task(&engine_task); -#ifndef BE_TEST if (status == DORIS_PUSH_HAD_LOADED) { // remove the task and not return to fe - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); continue; } -#endif // Return result to fe vector error_msgs; TStatus task_status; TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); if (push_req.push_type == TPushType::DELETE) { @@ -667,31 +642,27 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { finish_task_request.__set_task_status(task_status); finish_task_request.__set_report_version(_s_report_version); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - - return (void*)0; } -void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_publish_version_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TPublishVersionRequest publish_version_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); publish_version_req = agent_task_req.publish_version_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } DorisMetrics::instance()->publish_task_request_total.increment(1); @@ -704,7 +675,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { while (retry_time < PUBLISH_VERSION_MAX_RETRY) { error_tablet_ids.clear(); EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids); - res = worker_pool_this->_env->storage_engine()->execute_task(&engine_task); + res = _env->storage_engine()->execute_task(&engine_task); if (res == OLAP_SUCCESS) { break; } else { @@ -730,35 +701,32 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { } st.to_thrift(&finish_task_request.task_status); - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_report_version(_s_report_version); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TClearTransactionTaskRequest clear_transaction_task_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); clear_transaction_task_req = agent_task_req.clear_transaction_task_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get clear transaction task task, signature:" << agent_task_req.signature << ", transaction_id: " << clear_transaction_task_req.transaction_id @@ -773,11 +741,11 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t // If it is not greater than zero, no need to execute // the following clear_transaction_task() function. if (!clear_transaction_task_req.partition_id.empty()) { - worker_pool_this->_env->storage_engine()->clear_transaction_task( + _env->storage_engine()->clear_transaction_task( clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id); } else { - worker_pool_this->_env->storage_engine()->clear_transaction_task( + _env->storage_engine()->clear_transaction_task( clear_transaction_task_req.transaction_id); } LOG(INFO) << "finish to clear transaction task. signature:" << agent_task_req.signature @@ -792,32 +760,31 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t TFinishTaskRequest finish_task_request; finish_task_request.__set_task_status(task_status); - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - while (true) { +void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TUpdateTabletMetaInfoReq update_tablet_meta_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get update tablet meta task, signature:" << agent_task_req.signature; @@ -859,35 +826,33 @@ void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) TFinishTaskRequest finish_task_request; finish_task_request.__set_task_status(task_status); - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } - return (void*)0; } -void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_clone_worker_thread_callback() { + while (_is_work) { AgentStatus status = DORIS_SUCCESS; TAgentTaskRequest agent_task_req; TCloneReq clone_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); clone_req = agent_task_req.clone_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } DorisMetrics::instance()->clone_requests_total.increment(1); @@ -895,13 +860,13 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { vector error_msgs; vector tablet_infos; - EngineCloneTask engine_task(clone_req, worker_pool_this->_master_info, + EngineCloneTask engine_task(clone_req, _master_info, agent_task_req.signature, &error_msgs, &tablet_infos, &status); - worker_pool_this->_env->storage_engine()->execute_task(&engine_task); + _env->storage_engine()->execute_task(&engine_task); // Return result to fe TStatus task_status; TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); @@ -920,39 +885,34 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - - return (void*)0; } -void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TStorageMediumMigrateReq storage_medium_migrate_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); storage_medium_migrate_req = agent_task_req.storage_medium_migrate_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } TStatusCode::type status_code = TStatusCode::OK; vector error_msgs; TStatus task_status; EngineStorageMigrationTask engine_task(storage_medium_migrate_req); - OLAPStatus res = worker_pool_this->_env->storage_engine()->execute_task(&engine_task); + OLAPStatus res = _env->storage_engine()->execute_task(&engine_task); if (res != OLAP_SUCCESS) { LOG(WARNING) << "storage media migrate failed. status: " << res << ", signature: " << agent_task_req.signature; @@ -966,36 +926,32 @@ void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_check_consistency_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TCheckConsistencyReq check_consistency_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); check_consistency_req = agent_task_req.check_consistency_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } TStatusCode::type status_code = TStatusCode::OK; @@ -1006,7 +962,7 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) EngineChecksumTask engine_task( check_consistency_req.tablet_id, check_consistency_req.schema_hash, check_consistency_req.version, check_consistency_req.version_hash, &checksum); - OLAPStatus res = worker_pool_this->_env->storage_engine()->execute_task(&engine_task); + OLAPStatus res = _env->storage_engine()->execute_task(&engine_task); if (res != OLAP_SUCCESS) { LOG(WARNING) << "check consistency failed. status: " << res << ", signature: " << agent_task_req.signature; @@ -1020,7 +976,7 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); @@ -1028,23 +984,16 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) finish_task_request.__set_request_version(check_consistency_req.version); finish_task_request.__set_request_version_hash(check_consistency_req.version_hash); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - +void TaskWorkerPool::_report_task_worker_thread_callback() { TReportRequest request; - request.__set_backend(worker_pool_this->_backend); + request.__set_backend(_backend); -#ifndef BE_TEST - while (true) { -#endif + do { { lock_guard task_signatures_lock(_s_task_signatures_lock); request.__set_tasks(_s_task_signatures); @@ -1052,42 +1001,44 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) { DorisMetrics::instance()->report_task_requests_total.increment(1); TMasterResult result; - AgentStatus status = worker_pool_this->_master_client->report(request, &result); + AgentStatus status = _master_client->report(request, &result); if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_task_requests_failed.increment(1); LOG(WARNING) << "finish report task failed. status:" << status << ", master host:" - << worker_pool_this->_master_info.network_address.hostname - << "port:" << worker_pool_this->_master_info.network_address.port; + << _master_info.network_address.hostname + << "port:" << _master_info.network_address.port; } - -#ifndef BE_TEST - sleep(config::report_task_interval_seconds); - } -#endif - - return (void*)0; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::report_task_interval_seconds))); } -void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; +void TaskWorkerPool::_report_disk_state_worker_thread_callback() { + StorageEngine::instance()->register_report_listener(this); TReportRequest request; - request.__set_backend(worker_pool_this->_backend); + request.__set_backend(_backend); -#ifndef BE_TEST - while (true) { - if (worker_pool_this->_master_info.network_address.port == 0) { + while (_is_work) { + if (_master_info.network_address.port == 0) { // port == 0 means not received heartbeat yet // sleep a short time and try again LOG(INFO) << "waiting to receive first heartbeat from frontend"; sleep(config::sleep_one_second); continue; } -#endif + + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; + } + TAgentTaskRequest agent_task_req = _tasks.front(); + _tasks.pop_front(); + vector data_dir_infos; - worker_pool_this->_env->storage_engine()->get_all_data_dir_info(&data_dir_infos, - true /* update */); + _env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */); map disks; for (auto& root_path_info : data_dir_infos) { @@ -1105,60 +1056,55 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) DorisMetrics::instance()->report_disk_requests_total.increment(1); TMasterResult result; - AgentStatus status = worker_pool_this->_master_client->report(request, &result); + AgentStatus status = _master_client->report(request, &result); if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_disk_requests_failed.increment(1); LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:" - << worker_pool_this->_master_info.network_address.hostname - << ", port:" << worker_pool_this->_master_info.network_address.port; + << _master_info.network_address.hostname + << ", port:" << _master_info.network_address.port; } - -#ifndef BE_TEST - // wait for notifying until timeout - StorageEngine::instance()->wait_for_report_notify( - config::report_disk_state_interval_seconds, false); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - - return (void*)0; + StorageEngine::instance()->deregister_report_listener(this); } -void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; +void TaskWorkerPool::_report_tablet_worker_thread_callback() { + StorageEngine::instance()->register_report_listener(this); TReportRequest request; - request.__set_backend(worker_pool_this->_backend); + request.__set_backend(_backend); request.__isset.tablets = true; - AgentStatus status = DORIS_SUCCESS; + request.__set_report_version(_s_report_version); -#ifndef BE_TEST - while (true) { - if (worker_pool_this->_master_info.network_address.port == 0) { + while (_is_work) { + if (_master_info.network_address.port == 0) { // port == 0 means not received heartbeat yet // sleep a short time and try again LOG(INFO) << "waiting to receive first heartbeat from frontend"; sleep(config::sleep_one_second); continue; } -#endif - request.tablets.clear(); - request.__set_report_version(_s_report_version); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; + } + + TAgentTaskRequest agent_task_req = _tasks.front(); + _tasks.pop_front(); + + request.tablets.clear(); OLAPStatus report_all_tablets_info_status = StorageEngine::instance()->tablet_manager()->report_all_tablets_info( &request.tablets); if (report_all_tablets_info_status != OLAP_SUCCESS) { LOG(WARNING) << "report get all tablets info failed. status: " << report_all_tablets_info_status; -#ifndef BE_TEST - // wait for notifying until timeout - StorageEngine::instance()->wait_for_report_notify( - config::report_tablet_interval_seconds, true); continue; -#else - return (void*)0; -#endif } int64_t max_compaction_score = std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score.value(), @@ -1166,50 +1112,42 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { request.__set_tablet_max_compaction_score(max_compaction_score); TMasterResult result; - status = worker_pool_this->_master_client->report(request, &result); - + AgentStatus status = _master_client->report(request, &result); if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_all_tablets_requests_failed.increment(1); LOG(WARNING) << "finish report olap table state failed. status:" << status << ", master host:" - << worker_pool_this->_master_info.network_address.hostname - << ", port:" << worker_pool_this->_master_info.network_address.port; + << _master_info.network_address.hostname + << ", port:" << _master_info.network_address.port; } - -#ifndef BE_TEST - // wait for notifying until timeout - StorageEngine::instance()->wait_for_report_notify(config::report_tablet_interval_seconds, - true); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - - return (void*)0; + StorageEngine::instance()->deregister_report_listener(this); } -void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_upload_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TUploadReq upload_request; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); upload_request = agent_task_req.upload_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get upload task, signature:" << agent_task_req.signature << ", job id:" << upload_request.job_id; std::map> tablet_files; - SnapshotLoader loader(worker_pool_this->_env, upload_request.job_id, + SnapshotLoader loader(_env, upload_request.job_id, agent_task_req.signature); Status status = loader.upload(upload_request.src_dest_map, upload_request.broker_addr, upload_request.broker_prop, &tablet_files); @@ -1228,40 +1166,36 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); finish_task_request.__set_tablet_files(tablet_files); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); LOG(INFO) << "finished upload task, signature: " << agent_task_req.signature << ", job id:" << upload_request.job_id; -#ifndef BE_TEST } -#endif - return (void*)0; } -void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_download_worker_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TDownloadReq download_request; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); download_request = agent_task_req.download_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get download task, signature: " << agent_task_req.signature << ", job id:" << download_request.job_id; @@ -1272,7 +1206,7 @@ void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) { // TODO: download std::vector downloaded_tablet_ids; - SnapshotLoader loader(worker_pool_this->_env, download_request.job_id, + SnapshotLoader loader(_env, download_request.job_id, agent_task_req.signature); Status status = loader.download(download_request.src_dest_map, download_request.broker_addr, download_request.broker_prop, &downloaded_tablet_ids); @@ -1288,40 +1222,36 @@ void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); LOG(INFO) << "finished download task, signature: " << agent_task_req.signature << ", job id:" << download_request.job_id; -#ifndef BE_TEST } -#endif - return (void*)0; } -void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_make_snapshot_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TSnapshotRequest snapshot_request; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); snapshot_request = agent_task_req.snapshot_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get snapshot task, signature:" << agent_task_req.signature; @@ -1375,38 +1305,34 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_snapshot_path(snapshot_path); finish_task_request.__set_snapshot_files(snapshot_files); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } -void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_release_snapshot_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TReleaseSnapshotRequest release_snapshot_request; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + lock_guard worker_thread_lock(_worker_thread_lock); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); release_snapshot_request = agent_task_req.release_snapshot_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get release snapshot task, signature:" << agent_task_req.signature; @@ -1432,17 +1358,14 @@ void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } AgentStatus TaskWorkerPool::_get_tablet_info(const TTabletId tablet_id, @@ -1462,23 +1385,22 @@ AgentStatus TaskWorkerPool::_get_tablet_info(const TTabletId tablet_id, return status; } -void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) { - TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - -#ifndef BE_TEST - while (true) { -#endif +void TaskWorkerPool::_move_dir_thread_callback() { + while (_is_work) { TAgentTaskRequest agent_task_req; TMoveDirReq move_dir_req; { - MutexLock worker_thread_lock(&(worker_pool_this->_worker_thread_lock)); - while (worker_pool_this->_tasks.empty()) { - worker_pool_this->_worker_thread_condition_variable.wait(); + MutexLock worker_thread_lock(&(_worker_thread_lock)); + while (_is_work && _tasks.empty()) { + _worker_thread_condition_variable.wait(); + } + if (!_is_work) { + return; } - agent_task_req = worker_pool_this->_tasks.front(); + agent_task_req = _tasks.front(); move_dir_req = agent_task_req.move_dir_req; - worker_pool_this->_tasks.pop_front(); + _tasks.pop_front(); } LOG(INFO) << "get move dir task, signature:" << agent_task_req.signature << ", job id:" << move_dir_req.job_id; @@ -1488,7 +1410,7 @@ void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) { TStatus task_status; // TODO: move dir - AgentStatus status = worker_pool_this->_move_dir( + AgentStatus status = _move_dir( move_dir_req.tablet_id, move_dir_req.schema_hash, move_dir_req.src, move_dir_req.job_id, true /* TODO */, &error_msgs); @@ -1509,18 +1431,14 @@ void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) { task_status.__set_error_msgs(error_msgs); TFinishTaskRequest finish_task_request; - finish_task_request.__set_backend(worker_pool_this->_backend); + finish_task_request.__set_backend(_backend); finish_task_request.__set_task_type(agent_task_req.task_type); finish_task_request.__set_signature(agent_task_req.signature); finish_task_request.__set_task_status(task_status); - worker_pool_this->_finish_task(finish_task_request); - worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature); - -#ifndef BE_TEST + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } -#endif - return (void*)0; } AgentStatus TaskWorkerPool::_move_dir(const TTabletId tablet_id, const TSchemaHash schema_hash, diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index c7500b5f6adde9..d7536d7a861600 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -28,14 +28,18 @@ #include "agent/utils.h" #include "gen_cpp/AgentService_types.h" #include "gen_cpp/HeartbeatService_types.h" +#include "gutil/ref_counted.h" #include "olap/olap_define.h" #include "olap/storage_engine.h" #include "util/condition_variable.h" #include "util/mutex.h" +#include "util/countdown_latch.h" +#include "util/thread.h" namespace doris { class ExecEnv; +class ThreadPool; class TaskWorkerPool { public: @@ -67,17 +71,18 @@ class TaskWorkerPool { UPDATE_TABLET_META_INFO }; - typedef void* (*CALLBACK_FUNCTION)(void*); - TaskWorkerPool( const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info); virtual ~TaskWorkerPool(); - // Start the task worker callback thread + // Start the task worker thread pool virtual void start(); + // Stop the task worker + virtual void stop(); + // Submit task to task pool // // Input parameters: @@ -87,32 +92,30 @@ class TaskWorkerPool { private: bool _register_task_info(const TTaskType::type task_type, int64_t signature); void _remove_task_info(const TTaskType::type task_type, int64_t signature); - void _spawn_callback_worker_thread(CALLBACK_FUNCTION callback_func); void _finish_task(const TFinishTaskRequest& finish_task_request); uint32_t _get_next_task_index(int32_t thread_count, std::deque& tasks, TPriority::type priority); - static void* _create_tablet_worker_thread_callback(void* arg_this); - static void* _drop_tablet_worker_thread_callback(void* arg_this); - static void* _push_worker_thread_callback(void* arg_this); - static void* _publish_version_worker_thread_callback(void* arg_this); - static void* _clear_transaction_task_worker_thread_callback(void* arg_this); - static void* _alter_tablet_worker_thread_callback(void* arg_this); - static void* _clone_worker_thread_callback(void* arg_this); - static void* _storage_medium_migrate_worker_thread_callback(void* arg_this); - static void* _check_consistency_worker_thread_callback(void* arg_this); - static void* _report_task_worker_thread_callback(void* arg_this); - static void* _report_disk_state_worker_thread_callback(void* arg_this); - static void* _report_tablet_worker_thread_callback(void* arg_this); - static void* _upload_worker_thread_callback(void* arg_this); - static void* _download_worker_thread_callback(void* arg_this); - static void* _make_snapshot_thread_callback(void* arg_this); - static void* _release_snapshot_thread_callback(void* arg_this); - static void* _move_dir_thread_callback(void* arg_this); - static void* _update_tablet_meta_worker_thread_callback(void* arg_this); + void _create_tablet_worker_thread_callback(); + void _drop_tablet_worker_thread_callback(); + void _push_worker_thread_callback(); + void _publish_version_worker_thread_callback(); + void _clear_transaction_task_worker_thread_callback(); + void _alter_tablet_worker_thread_callback(); + void _clone_worker_thread_callback(); + void _storage_medium_migrate_worker_thread_callback(); + void _check_consistency_worker_thread_callback(); + void _report_task_worker_thread_callback(); + void _report_disk_state_worker_thread_callback(); + void _report_tablet_worker_thread_callback(); + void _upload_worker_thread_callback(); + void _download_worker_thread_callback(); + void _make_snapshot_thread_callback(); + void _release_snapshot_thread_callback(); + void _move_dir_thread_callback(); + void _update_tablet_meta_worker_thread_callback(); void _alter_tablet( - TaskWorkerPool* worker_pool_this, const TAgentTaskRequest& alter_tablet_request, int64_t signature, const TTaskType::type task_type, @@ -142,11 +145,13 @@ class TaskWorkerPool { // Protect task queue Mutex _worker_thread_lock; ConditionVariable _worker_thread_condition_variable; + CountDownLatch _stop_background_threads_latch; + bool _is_work; + std::unique_ptr _thread_pool; std::deque _tasks; uint32_t _worker_count; TaskWorkerType _task_worker_type; - CALLBACK_FUNCTION _callback_function; static FrontendServiceClientCache _master_service_client_cache; static std::atomic_ulong _s_report_version; diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index f3516f308ccb20..aa3afd575cfee0 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -64,34 +64,26 @@ namespace doris { bool k_doris_exit = false; -void* tcmalloc_gc_thread(void* dummy) { - while (1) { - sleep(10); +void Daemon::tcmalloc_gc_thread() { + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(10))) { size_t used_size = 0; size_t free_size = 0; -#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes", &used_size); MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", &free_size); -#endif size_t alloc_size = used_size + free_size; if (alloc_size > config::tc_use_memory_min) { -#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) size_t max_free_size = alloc_size * config::tc_free_memory_rate / 100; if (free_size > max_free_size) { MallocExtension::instance()->ReleaseToSystem(free_size - max_free_size); } -#endif } } - - return NULL; } -void* memory_maintenance_thread(void* dummy) { - while (true) { - sleep(config::memory_maintenance_sleep_time_s); +void Daemon::memory_maintenance_thread() { + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::memory_maintenance_sleep_time_s))) { ExecEnv* env = ExecEnv::GetInstance(); // ExecEnv may not have been created yet or this may be the catalogd or statestored, // which don't have ExecEnvs. @@ -111,8 +103,6 @@ void* memory_maintenance_thread(void* dummy) { } } } - - return NULL; } /* @@ -123,7 +113,7 @@ void* memory_maintenance_thread(void* dummy) { * 4. max network send bytes rate * 5. max network receive bytes rate */ -void* calculate_metrics(void* dummy) { +void Daemon::calculate_metrics_thread() { int64_t last_ts = -1L; int64_t lst_push_bytes = -1; int64_t lst_query_bytes = -1; @@ -132,7 +122,7 @@ void* calculate_metrics(void* dummy) { std::map lst_net_send_bytes; std::map lst_net_receive_bytes; - while (true) { + do { DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true); if (last_ts == -1L) { @@ -176,11 +166,7 @@ void* calculate_metrics(void* dummy) { // update lst map DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes); } - - sleep(15); // 15 seconds - } - - return NULL; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(15))); } static void init_doris_metrics(const std::vector& store_paths) { @@ -205,11 +191,6 @@ static void init_doris_metrics(const std::vector& store_paths) { } DorisMetrics::instance()->initialize( init_system_metrics, disk_devices, network_interfaces); - - if (config::enable_metric_calculator) { - pthread_t calculator_pid; - pthread_create(&calculator_pid, NULL, calculate_metrics, NULL); - } } void sigterm_handler(int signo) { @@ -242,7 +223,7 @@ void init_signals() { } } -void init_daemon(int argc, char** argv, const std::vector& paths) { +void Daemon::init(int argc, char** argv, const std::vector& paths) { // google::SetVersionString(get_build_version(false)); // google::ParseCommandLineFlags(&argc, &argv, true); google::ParseCommandLineFlags(&argc, &argv, true); @@ -278,19 +259,49 @@ void init_daemon(int argc, char** argv, const std::vector& paths) { HllFunctions::init(); HashFunctions::init(); - pthread_t tc_malloc_pid; - pthread_create(&tc_malloc_pid, NULL, tcmalloc_gc_thread, NULL); - - pthread_t buffer_pool_pid; - pthread_create(&buffer_pool_pid, NULL, memory_maintenance_thread, NULL); - LOG(INFO) << CpuInfo::debug_string(); LOG(INFO) << DiskInfo::debug_string(); LOG(INFO) << MemInfo::debug_string(); + init_doris_metrics(paths); init_signals(); ChunkAllocator::init_instance(config::chunk_reserved_bytes_limit); } +void Daemon::start() { + Status st; +#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) + st = Thread::create("Daemon", "tcmalloc_gc_thread", + [this]() { this->tcmalloc_gc_thread(); }, + &_tcmalloc_gc_thread); + CHECK(st.ok()) << st.to_string(); +#endif + st = Thread::create("Daemon", "memory_maintenance_thread", + [this]() { this->memory_maintenance_thread(); }, + &_memory_maintenance_thread); + CHECK(st.ok()) << st.to_string(); + + if (config::enable_metric_calculator) { + st = Thread::create("Daemon", "calculate_metrics_thread", + [this]() { this->calculate_metrics_thread(); }, + &_calculate_metrics_thread); + CHECK(st.ok()) << st.to_string(); + } } + +void Daemon::stop() { + _stop_background_threads_latch.count_down(); + + if (_tcmalloc_gc_thread) { + _tcmalloc_gc_thread->join(); + } + if (_memory_maintenance_thread) { + _memory_maintenance_thread->join(); + } + if (_calculate_metrics_thread) { + _calculate_metrics_thread->join(); + } +} + +} // namespace doris diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 4215541db57dbd..c11d99f441d584 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -15,20 +15,43 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_COMMON_COMMON_DAEMON_H -#define DORIS_BE_SRC_COMMON_COMMON_DAEMON_H +#pragma once +#include #include +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" +#include "util/thread.h" + namespace doris { class StorePath; +class Thread; + +class Daemon { +public: + Daemon() : _stop_background_threads_latch(1) {} + + // Initialises logging, flags etc. Callers that want to override default gflags + // variables should do so before calling this method; no logging should be + // performed until after this method returns. + void init(int argc, char** argv, const std::vector& paths); + + // Start background threads + void start(); -// Initialises logging, flags etc. Callers that want to override default gflags -// variables should do so before calling this method; no logging should be -// performed until after this method returns. -void init_daemon(int argc, char** argv, const std::vector& paths); + // Stop background threads + void stop(); -} +private: + void tcmalloc_gc_thread(); + void memory_maintenance_thread(); + void calculate_metrics_thread(); -#endif + CountDownLatch _stop_background_threads_latch; + scoped_refptr _tcmalloc_gc_thread; + scoped_refptr _memory_maintenance_thread; + scoped_refptr _calculate_metrics_thread; +}; +} // namespace doris diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2845127525fa49..2aa280720add9a 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -411,7 +411,8 @@ bool IndexChannel::has_intolerable_failure() { OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, Status* status) - : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) { + : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024), + _stop_background_threads_latch(1) { if (!texprs.empty()) { *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs); } @@ -592,7 +593,9 @@ Status OlapTableSink::open(RuntimeState* state) { } } - _sender_thread = std::thread(&OlapTableSink::_send_batch_process, this); + RETURN_IF_ERROR(Thread::create("OlapTableSink", "send_batch_process", + [this]() { this->_send_batch_process(); }, + &_sender_thread)); return Status::OK(); } @@ -722,8 +725,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // Sender join() must put after node channels mark_close/cancel. // But there is no specific sequence required between sender join() & close_wait(). - if (_sender_thread.joinable()) { - _sender_thread.join(); + _stop_background_threads_latch.count_down(); + if (_sender_thread) { + _sender_thread->join(); } Expr::close(_output_expr_ctxs, state); @@ -899,7 +903,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* void OlapTableSink::_send_batch_process() { SCOPED_RAW_TIMER(&_non_blocking_send_ns); - while (true) { + do { int running_channels_num = 0; for (auto index_channel : _channels) { index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) { @@ -912,8 +916,7 @@ void OlapTableSink::_send_batch_process() { "consumer thread exit."; return; } - SleepFor(MonoDelta::FromMilliseconds(config::olap_table_sink_send_interval_ms)); - } + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromMilliseconds(config::olap_table_sink_send_interval_ms))); } } // namespace stream_load diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 5db4ca3640aceb..9143f9799cedd3 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -34,6 +34,8 @@ #include "util/bitmap.h" #include "util/ref_count_closure.h" #include "util/thrift_util.h" +#include "util/countdown_latch.h" +#include "util/thread.h" namespace doris { @@ -360,7 +362,8 @@ class OlapTableSink : public DataSink { // index_channel std::vector _channels; - std::thread _sender_thread; + CountDownLatch _stop_background_threads_latch; + scoped_refptr _sender_thread; std::vector _max_decimal_val; std::vector _min_decimal_val; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index ee8684da245920..3e59e567ef469e 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include "common/logging.h" #include "service/brpc.h" @@ -34,6 +35,7 @@ #include "http/http_headers.h" #include "http/http_channel.h" #include "util/debug_util.h" +#include "util/threadpool.h" namespace doris { @@ -84,46 +86,50 @@ EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers) } EvHttpServer::~EvHttpServer() { + stop(); pthread_rwlock_destroy(&_rw_lock); } -Status EvHttpServer::start() { +void EvHttpServer::start() { // bind to - RETURN_IF_ERROR(_bind()); + CHECK(_bind().ok()); + ThreadPoolBuilder("EvHttpServer") + .set_min_threads(_num_workers) + .set_max_threads(_num_workers) + .build(&_workers); + + evthread_use_pthreads(); + event_bases.resize(_num_workers); for (int i = 0; i < _num_workers; ++i) { - auto worker = [this, i] () { - LOG(INFO) << "EvHttpServer worker start, id=" << i; - std::shared_ptr base( - event_base_new(), [] (event_base* base) { event_base_free(base); }); - if (base == nullptr) { - LOG(WARNING) << "Couldn't create an event_base."; - return; - } + CHECK(_workers->submit_func([this, i]() { + std::shared_ptr base(event_base_new(), [](event_base *base) { + event_base_free(base); + }); + CHECK(base != nullptr) << "Couldn't create an event_base."; + event_bases[i] = base; + /* Create a new evhttp object to handle requests. */ - std::shared_ptr http( - evhttp_new(base.get()), [] (evhttp* http) { evhttp_free(http); }); - if (http == nullptr) { - LOG(WARNING) << "Couldn't create an evhttp."; - return; - } + std::shared_ptr http(evhttp_new(base.get()), [](evhttp *http) { + evhttp_free(http); + }); + CHECK(http != nullptr) << "Couldn't create an evhttp."; + auto res = evhttp_accept_socket(http.get(), _server_fd); - if (res < 0) { - LOG(WARNING) << "evhttp accept socket failed"; - return; - } + CHECK(res >= 0) << "evhttp accept socket failed, res=" << res; evhttp_set_newreqcb(http.get(), on_connection, this); evhttp_set_gencb(http.get(), on_request, this); event_base_dispatch(base.get()); - }; - _workers.emplace_back(worker); - _workers[i].detach(); + }).ok()); } - return Status::OK(); } void EvHttpServer::stop() { + for (int i = 0; i < _num_workers; ++i) { + LOG(WARNING) << "event_base_loopexit ret: " << event_base_loopexit(event_bases[i].get(), nullptr); + } + _workers->shutdown(); close(_server_fd); } diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h index 6ef632ed8be5c5..055456bc627c1f 100644 --- a/be/src/http/ev_http_server.h +++ b/be/src/http/ev_http_server.h @@ -29,6 +29,7 @@ namespace doris { class HttpHandler; class HttpRequest; +class ThreadPool; class EvHttpServer { public: @@ -42,7 +43,7 @@ class EvHttpServer { void register_static_file_handler(HttpHandler* handler); - Status start(); + void start(); void stop(); void join(); @@ -67,7 +68,8 @@ class EvHttpServer { int _real_port; int _server_fd = -1; - std::vector _workers; + std::unique_ptr _workers; + std::vector> event_bases; pthread_rwlock_t _rw_lock; diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index b3d0cbeb47d7c6..8af09d1bfa3e4f 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -83,14 +83,16 @@ class FlushToken { // Usage Example: // ... // std::shared_ptr flush_handler; -// memTableFlushExecutor.create_flush_token(path_hash, &flush_handler); +// memTableFlushExecutor.create_flush_token(&flush_handler); // ... // flush_token->submit(memtable) // ... class MemTableFlushExecutor { public: MemTableFlushExecutor() {} - ~MemTableFlushExecutor() {} + ~MemTableFlushExecutor() { + _flush_pool->shutdown(); + } // init should be called after storage engine is opened, // because it needs path hash of each data dir. diff --git a/be/src/olap/olap_meta.cpp b/be/src/olap/olap_meta.cpp index a912f4fc07fa56..0be7626b9dec3f 100755 --- a/be/src/olap/olap_meta.cpp +++ b/be/src/olap/olap_meta.cpp @@ -52,10 +52,10 @@ OlapMeta::OlapMeta(const std::string& root_path) OlapMeta::~OlapMeta() { for (auto handle : _handles) { - delete handle; + _db->DestroyColumnFamilyHandle(handle); + handle = nullptr; } if (_db != nullptr) { - _db->Close(); delete _db; _db= nullptr; } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 8948e7ab986257..50935f06a672f2 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -39,42 +39,28 @@ using std::string; namespace doris { -// TODO(yingchun): should be more graceful in the future refactor. -#define SLEEP_IN_BG_WORKER(seconds) \ - int64_t left_seconds = (seconds); \ - while (!_stop_bg_worker && left_seconds > 0) { \ - sleep(1); \ - --left_seconds; \ - } \ - if (_stop_bg_worker) { \ - break; \ - } - // number of running SCHEMA-CHANGE threads volatile uint32_t g_schema_change_active_threads = 0; Status StorageEngine::start_bg_threads() { - _unused_rowset_monitor_thread = std::thread( - [this] { - _unused_rowset_monitor_thread_callback(nullptr); - }); - _unused_rowset_monitor_thread.detach(); + RETURN_IF_ERROR( + Thread::create("StorageEngine", "unused_rowset_monitor_thread", + [this]() { this->_unused_rowset_monitor_thread_callback(); }, + &_unused_rowset_monitor_thread)); LOG(INFO) << "unused rowset monitor thread started"; // start thread for monitoring the snapshot and trash folder - _garbage_sweeper_thread = std::thread( - [this] { - _garbage_sweeper_thread_callback(nullptr); - }); - _garbage_sweeper_thread.detach(); + RETURN_IF_ERROR( + Thread::create("StorageEngine", "garbage_sweeper_thread", + [this]() { this->_garbage_sweeper_thread_callback(); }, + &_garbage_sweeper_thread)); LOG(INFO) << "garbage sweeper thread started"; // start thread for monitoring the tablet with io error - _disk_stat_monitor_thread = std::thread( - [this] { - _disk_stat_monitor_thread_callback(nullptr); - }); - _disk_stat_monitor_thread.detach(); + RETURN_IF_ERROR( + Thread::create("StorageEngine", "disk_stat_monitor_thread", + [this]() { this->_disk_stat_monitor_thread_callback(); }, + &_disk_stat_monitor_thread)); LOG(INFO) << "disk stat monitor thread started"; @@ -87,6 +73,7 @@ Status StorageEngine::start_bg_threads() { // check cumulative compaction config _check_cumulative_compaction_config(); + // base and cumulative compaction threads int32_t base_compaction_num_threads_per_disk = std::max(1, config::base_compaction_num_threads_per_disk); int32_t cumulative_compaction_num_threads_per_disk = std::max(1, config::cumulative_compaction_num_threads_per_disk); @@ -103,66 +90,60 @@ Status StorageEngine::start_bg_threads() { _base_compaction_threads.reserve(base_compaction_num_threads); for (uint32_t i = 0; i < base_compaction_num_threads; ++i) { - _base_compaction_threads.emplace_back( - [this, data_dir_num, data_dirs, i] { - _base_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); - }); - } - for (auto& thread : _base_compaction_threads) { - thread.detach(); + scoped_refptr base_compaction_thread; + RETURN_IF_ERROR( + Thread::create("StorageEngine", "base_compaction_thread", + [this, i, data_dir_num, data_dirs]() { this->_base_compaction_thread_callback(data_dirs[i % data_dir_num]); }, + &base_compaction_thread)); + _base_compaction_threads.emplace_back(base_compaction_thread); } LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads; _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads); for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) { - _cumulative_compaction_threads.emplace_back( - [this, data_dir_num, data_dirs, i] { - _cumulative_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); - }); - } - for (auto& thread : _cumulative_compaction_threads) { - thread.detach(); + scoped_refptr cumulative_compaction_thread; + RETURN_IF_ERROR( + Thread::create("StorageEngine", "cumulative_compaction_thread", + [this, i, data_dir_num, data_dirs]() { this->_cumulative_compaction_thread_callback(data_dirs[i % data_dir_num]); }, + &cumulative_compaction_thread)); + _cumulative_compaction_threads.emplace_back(cumulative_compaction_thread); } LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads; // tablet checkpoint thread for (auto data_dir : data_dirs) { - _tablet_checkpoint_threads.emplace_back( - [this, data_dir] { - _tablet_checkpoint_callback((void*)data_dir); - }); - } - for (auto& thread : _tablet_checkpoint_threads) { - thread.detach(); + scoped_refptr tablet_checkpoint_thread; + RETURN_IF_ERROR( + Thread::create("StorageEngine", "tablet_checkpoint_thread", + [this, data_dir]() { this->_tablet_checkpoint_callback(data_dir); }, + &tablet_checkpoint_thread)); + _tablet_checkpoint_threads.emplace_back(tablet_checkpoint_thread); } LOG(INFO) << "tablet checkpint thread started"; // fd cache clean thread - _fd_cache_clean_thread = std::thread( - [this] { - _fd_cache_clean_callback(nullptr); - }); - _fd_cache_clean_thread.detach(); + RETURN_IF_ERROR( + Thread::create("StorageEngine", "fd_cache_clean_thread", + [this]() { this->_fd_cache_clean_callback(); }, + &_fd_cache_clean_thread)); LOG(INFO) << "fd cache clean thread started"; // path scan and gc thread if (config::path_gc_check) { for (auto data_dir : get_stores()) { - _path_scan_threads.emplace_back( - [this, data_dir] { - _path_scan_thread_callback((void*)data_dir); - }); - - _path_gc_threads.emplace_back( - [this, data_dir] { - _path_gc_thread_callback((void*)data_dir); - }); - } - for (auto& thread : _path_scan_threads) { - thread.detach(); - } - for (auto& thread : _path_gc_threads) { - thread.detach(); + scoped_refptr path_scan_thread; + RETURN_IF_ERROR( + Thread::create("StorageEngine", "path_scan_thread", + [this, data_dir]() { this->_path_scan_thread_callback(data_dir); }, + &path_scan_thread)); + _path_scan_threads.emplace_back(path_scan_thread); + + scoped_refptr path_gc_thread; + RETURN_IF_ERROR( + Thread::create("StorageEngine", "path_gc_thread", + [this, data_dir]() { this->_path_gc_thread_callback(data_dir); }, + &path_gc_thread)); + _path_gc_threads.emplace_back(path_gc_thread); } LOG(INFO) << "path scan/gc threads started. number:" << get_stores().size(); } @@ -171,33 +152,30 @@ Status StorageEngine::start_bg_threads() { return Status::OK(); } -void* StorageEngine::_fd_cache_clean_callback(void* arg) { +void StorageEngine::_fd_cache_clean_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { - int32_t interval = config::file_descriptor_cache_clean_interval; + int32_t interval = 600; + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))) { + interval = config::file_descriptor_cache_clean_interval; if (interval <= 0) { OLAP_LOG_WARNING("config of file descriptor clean interval is illegal: [%d], " "force set to 3600", interval); interval = 3600; } - SLEEP_IN_BG_WORKER(interval); _start_clean_fd_cache(); } - - return nullptr; } -void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_dir) { +void StorageEngine::_base_compaction_thread_callback(DataDir* data_dir) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - //string last_base_compaction_fs; - //TTabletId last_base_compaction_tablet_id = -1; - while (!_stop_bg_worker) { + int32_t interval = config::base_compaction_check_interval_seconds; + do { if (!config::disable_auto_compaction) { // must be here, because this thread is start on start and // cgroup is not initialized at this time @@ -208,20 +186,17 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d } } - int32_t interval = config::base_compaction_check_interval_seconds; + interval = config::base_compaction_check_interval_seconds; if (interval <= 0) { OLAP_LOG_WARNING("base compaction check interval config is illegal: [%d], " "force set to 1", interval); interval = 1; } - SLEEP_IN_BG_WORKER(interval); - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { +void StorageEngine::_garbage_sweeper_thread_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif @@ -241,7 +216,8 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { const double pi = 4 * std::atan(1); double usage = 1.0; // 程序启动后经过min_interval后触发第一轮扫描 - while (!_stop_bg_worker) { + uint32_t curr_interval = min_interval; + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(curr_interval))) { usage *= 100.0; // 该函数特性:当磁盘使用率<60%的时候,ratio接近于1; // 当使用率介于[60%, 75%]之间时,ratio急速从0.87降到0.27; @@ -253,7 +229,6 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { // 此时的特性,当usage<60%时,curr_interval的时间接近max_interval, // 当usage > 80%时,curr_interval接近min_interval curr_interval = curr_interval > min_interval ? curr_interval : min_interval; - SLEEP_IN_BG_WORKER(curr_interval); // 开始清理,并得到清理后的磁盘使用率 OLAPStatus res = _start_trash_sweep(&usage); @@ -263,27 +238,24 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { // do nothing. continue next loop. } } - - return nullptr; } -void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) { +void StorageEngine::_disk_stat_monitor_thread_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { + + int32_t interval = config::disk_stat_monitor_interval; + do { _start_disk_stat_monitor(); - int32_t interval = config::disk_stat_monitor_interval; + interval = config::disk_stat_monitor_interval; if (interval <= 0) { LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval << ", force set to 1"; interval = 1; } - SLEEP_IN_BG_WORKER(interval); - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } void StorageEngine::_check_cumulative_compaction_config() { @@ -314,13 +286,14 @@ void StorageEngine::_check_cumulative_compaction_config() { } } -void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* data_dir) { +void StorageEngine::_cumulative_compaction_thread_callback(DataDir* data_dir) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif LOG(INFO) << "try to start cumulative compaction process!"; - while (!_stop_bg_worker) { + int32_t interval = config::cumulative_compaction_check_interval_seconds; + do { if (!config::disable_auto_compaction) { // must be here, because this thread is start on start and // cgroup is not initialized at this time @@ -330,76 +303,65 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* _perform_cumulative_compaction(data_dir); } } - int32_t interval = config::cumulative_compaction_check_interval_seconds; + + interval = config::cumulative_compaction_check_interval_seconds; if (interval <= 0) { LOG(WARNING) << "cumulative compaction check interval config is illegal:" << interval << "will be forced set to one"; interval = 1; } - SLEEP_IN_BG_WORKER(interval); - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -void* StorageEngine::_unused_rowset_monitor_thread_callback(void* arg) { +void StorageEngine::_unused_rowset_monitor_thread_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { + int32_t interval = config::unused_rowset_monitor_interval; + do { start_delete_unused_rowset(); - int32_t interval = config::unused_rowset_monitor_interval; + interval = config::unused_rowset_monitor_interval; if (interval <= 0) { LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval << ", force set to 1"; interval = 1; } - SLEEP_IN_BG_WORKER(interval); - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } - - -void* StorageEngine::_path_gc_thread_callback(void* arg) { +void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif LOG(INFO) << "try to start path gc thread!"; - - while (!_stop_bg_worker) { + int32_t interval = config::path_gc_check_interval_second; + do { LOG(INFO) << "try to perform path gc by tablet!"; - ((DataDir*)arg)->perform_path_gc_by_tablet(); + data_dir->perform_path_gc_by_tablet(); LOG(INFO) << "try to perform path gc by rowsetid!"; - // perform path gc by rowset id - ((DataDir*)arg)->perform_path_gc_by_rowsetid(); + data_dir->perform_path_gc_by_rowsetid(); - int32_t interval = config::path_gc_check_interval_second; + interval = config::path_gc_check_interval_second; if (interval <= 0) { LOG(WARNING) << "path gc thread check interval config is illegal:" << interval << "will be forced set to half hour"; interval = 1800; // 0.5 hour } - SLEEP_IN_BG_WORKER(interval); - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -void* StorageEngine::_path_scan_thread_callback(void* arg) { +void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - LOG(INFO) << "try to start path scan thread!"; - - while (!_stop_bg_worker) { + int32_t interval = config::path_scan_interval_second; + do { LOG(INFO) << "try to perform path scan!"; - ((DataDir*)arg)->perform_path_scan(); + data_dir->perform_path_scan(); int32_t interval = config::path_scan_interval_second; if (interval <= 0) { @@ -407,31 +369,26 @@ void* StorageEngine::_path_scan_thread_callback(void* arg) { << "will be forced set to one day"; interval = 24 * 3600; // one day } - SLEEP_IN_BG_WORKER(interval); - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } -void* StorageEngine::_tablet_checkpoint_callback(void* arg) { +void StorageEngine::_tablet_checkpoint_callback(DataDir* data_dir) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - LOG(INFO) << "try to start tablet meta checkpoint thread!"; - while (!_stop_bg_worker) { - LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path(); + + int64_t interval = config::tablet_meta_checkpoint_min_interval_secs; + do { + LOG(INFO) << "begin to do tablet meta checkpoint:" << data_dir->path(); int64_t start_time = UnixMillis(); - _tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg); + _tablet_manager->do_tablet_meta_checkpoint(data_dir); int64_t used_time = (UnixMillis() - start_time) / 1000; if (used_time < config::tablet_meta_checkpoint_min_interval_secs) { - int64_t interval = config::tablet_meta_checkpoint_min_interval_secs - used_time; - SLEEP_IN_BG_WORKER(interval); + interval = config::tablet_meta_checkpoint_min_interval_secs - used_time; } else { - sleep(1); + interval = 1; } - } - - return nullptr; + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))); } } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1a22e2cadf9209..fa72c9f85ba5e9 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -34,6 +34,7 @@ #include #include +#include "agent/task_worker_pool.h" #include "env/env.h" #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" @@ -113,6 +114,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _index_stream_lru_cache(NULL), _file_cache(nullptr), _compaction_mem_tracker(MemTracker::CreateTracker(-1, "compaction mem tracker(unlimited)")), + _stop_background_threads_latch(1), _tablet_manager(new TabletManager(config::tablet_map_shard_size)), _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), @@ -349,7 +351,7 @@ void StorageEngine::_start_disk_stat_monitor() { // If some tablets were dropped, we should notify disk_state_worker_thread and // tablet_worker_thread (see TaskWorkerPool) to make them report to FE ASAP. if (some_tablets_were_dropped) { - trigger_report(); + notify_listeners(); } } @@ -489,19 +491,53 @@ bool StorageEngine::_delete_tablets_on_unused_root_path() { return !tablet_info_vec.empty(); } +void StorageEngine::stop() { + // trigger the waitting threads + notify_listeners(); + + std::lock_guard l(_store_lock); + for (auto& store_pair : _store_map) { + // TODO(yingchun): is it needed? + store_pair.second->stop_bg_worker(); + } + + _stop_background_threads_latch.count_down(); +#define THREAD_JOIN(thread) \ + if (thread) { \ + thread->join(); \ + } + + THREAD_JOIN(_unused_rowset_monitor_thread); + THREAD_JOIN(_garbage_sweeper_thread); + THREAD_JOIN(_disk_stat_monitor_thread); + THREAD_JOIN(_fd_cache_clean_thread); +#undef THREAD_JOIN + +#define THREADS_JOIN(threads) \ + for (const auto thread : threads) { \ + if (thread) { \ + thread->join(); \ + } \ + } + + THREADS_JOIN(_base_compaction_threads); + THREADS_JOIN(_cumulative_compaction_threads); + THREADS_JOIN(_path_gc_threads); + THREADS_JOIN(_path_scan_threads); + THREADS_JOIN(_tablet_checkpoint_threads); +#undef THREADS_JOIN +} + void StorageEngine::_clear() { SAFE_DELETE(_index_stream_lru_cache); _file_cache.reset(); std::lock_guard l(_store_lock); for (auto& store_pair : _store_map) { - store_pair.second->stop_bg_worker(); delete store_pair.second; store_pair.second = nullptr; } _store_map.clear(); - - _stop_bg_worker = true; } void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { @@ -929,6 +965,25 @@ OLAPStatus StorageEngine::load_header( return res; } +void StorageEngine::register_report_listener(TaskWorkerPool* listener) { + std::lock_guard l(_report_mtx); + CHECK(_report_listeners.find(listener) == _report_listeners.end()); + _report_listeners.insert(listener); +} + +void StorageEngine::deregister_report_listener(TaskWorkerPool* listener) { + std::lock_guard l(_report_mtx); + _report_listeners.erase(listener); +} + +void StorageEngine::notify_listeners() { + std::lock_guard l(_report_mtx); + for (auto& listener : _report_listeners) { + TAgentTaskRequest task; + listener->submit_task(task); + } +} + OLAPStatus StorageEngine::execute_task(EngineTask* task) { // 1. add wlock to related tablets // 2. do prepare work diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index ed7bb5d0a040ff..0b3aacfee3e81e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -36,6 +36,7 @@ #include "gen_cpp/AgentService_types.h" #include "gen_cpp/BackendService_types.h" #include "gen_cpp/MasterService_types.h" +#include "gutil/ref_counted.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/tablet.h" @@ -48,6 +49,8 @@ #include "olap/rowset/rowset_id_generator.h" #include "olap/fs/fs_util.h" #include "runtime/heartbeat_flags.h" +#include "util/countdown_latch.h" +#include "util/thread.h" namespace doris { @@ -56,6 +59,7 @@ class EngineTask; class BlockManager; class MemTableFlushExecutor; class Tablet; +class TaskWorkerPool; // StorageEngine singleton to manage all Table pointers. // Providing add/drop/get operations. @@ -135,30 +139,9 @@ class StorageEngine { // @return OLAP_SUCCESS if load tablet success OLAPStatus load_header(const std::string& shard_path, const TCloneReq& request, bool restore = false); - // To trigger a disk-stat and tablet report - void trigger_report() { - std::lock_guard l(_report_mtx); - _need_report_tablet = true; - _need_report_disk_stat = true; - _report_cv.notify_all(); - } - - // call this to wait for a report notification until timeout - void wait_for_report_notify(int64_t timeout_sec, bool from_report_tablet_thread) { - auto wait_timeout_sec = std::chrono::seconds(timeout_sec); - std::unique_lock l(_report_mtx); - // When wait_for() returns, regardless of the return-result(possibly a timeout - // error), the report_tablet_thread and report_disk_stat_thread(see TaskWorkerPool) - // immediately begin the next round of reporting, so there is no need to check - // the return-value of wait_for(). - if (from_report_tablet_thread) { - _report_cv.wait_for(l, wait_timeout_sec, [this] { return _need_report_tablet; }); - _need_report_tablet = false; - } else { - _report_cv.wait_for(l, wait_timeout_sec, [this] { return _need_report_disk_stat; }); - _need_report_disk_stat = false; - } - } + void register_report_listener(TaskWorkerPool* listener); + void deregister_report_listener(TaskWorkerPool* listener); + void notify_listeners(); OLAPStatus execute_task(EngineTask* task); @@ -196,6 +179,8 @@ class StorageEngine { // start all backgroud threads. This should be call after env is ready. Status start_bg_threads(); + void stop(); + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -224,30 +209,30 @@ class StorageEngine { // All these xxx_callback() functions are for Background threads // unused rowset monitor thread - void* _unused_rowset_monitor_thread_callback(void* arg); + void _unused_rowset_monitor_thread_callback(); // base compaction thread process function - void* _base_compaction_thread_callback(void* arg, DataDir* data_dir); - // cumulative process function - void* _cumulative_compaction_thread_callback(void* arg, DataDir* data_dir); + void _base_compaction_thread_callback(DataDir* data_dir); // check cumulative compaction config void _check_cumulative_compaction_config(); + // cumulative process function + void _cumulative_compaction_thread_callback(DataDir* data_dir); // garbage sweep thread process function. clear snapshot and trash folder - void* _garbage_sweeper_thread_callback(void* arg); + void _garbage_sweeper_thread_callback(); // delete tablet with io error process function - void* _disk_stat_monitor_thread_callback(void* arg); + void _disk_stat_monitor_thread_callback(); // clean file descriptors cache - void* _fd_cache_clean_callback(void* arg); + void _fd_cache_clean_callback(); // path gc process function - void* _path_gc_thread_callback(void* arg); + void _path_gc_thread_callback(DataDir* data_dir); - void* _path_scan_thread_callback(void* arg); + void _path_scan_thread_callback(DataDir* data_dir); - void* _tablet_checkpoint_callback(void* arg); + void _tablet_checkpoint_callback(DataDir* data_dir); // parse the default rowset type config to RowsetTypePB void _parse_default_rowset_type(); @@ -318,29 +303,27 @@ class StorageEngine { std::shared_ptr _compaction_mem_tracker; - bool _stop_bg_worker = false; - std::thread _unused_rowset_monitor_thread; + CountDownLatch _stop_background_threads_latch; + scoped_refptr _unused_rowset_monitor_thread; // thread to monitor snapshot expiry - std::thread _garbage_sweeper_thread; + scoped_refptr _garbage_sweeper_thread; // thread to monitor disk stat - std::thread _disk_stat_monitor_thread; + scoped_refptr _disk_stat_monitor_thread; // threads to run base compaction - std::vector _base_compaction_threads; + std::vector> _base_compaction_threads; // threads to check cumulative - std::vector _cumulative_compaction_threads; + std::vector> _cumulative_compaction_threads; + scoped_refptr _fd_cache_clean_thread; // threads to clean all file descriptor not actively in use - std::thread _fd_cache_clean_thread; - std::vector _path_gc_threads; + std::vector> _path_gc_threads; // threads to scan disk paths - std::vector _path_scan_threads; + std::vector> _path_scan_threads; // threads to run tablet checkpoint - std::vector _tablet_checkpoint_threads; + std::vector> _tablet_checkpoint_threads; // For tablet and disk-stat report std::mutex _report_mtx; - std::condition_variable _report_cv; - bool _need_report_tablet = false; - bool _need_report_disk_stat = false; + std::set _report_listeners; Mutex _engine_task_mutex; diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp index 8dead28a7b75df..21499524537570 100644 --- a/be/src/runtime/broker_mgr.cpp +++ b/be/src/runtime/broker_mgr.cpp @@ -33,7 +33,13 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(broker_count, MetricUnit::NOUNIT); BrokerMgr::BrokerMgr(ExecEnv* exec_env) : - _exec_env(exec_env), _thread_stop(false), _ping_thread(&BrokerMgr::ping_worker, this) { + _exec_env(exec_env), _stop_background_threads_latch(1) { + CHECK(Thread::create("BrokerMgr", "ping_worker", + [this]() { + this->ping_worker(); + }, + &_ping_thread).ok()); + REGISTER_HOOK_METRIC(broker_count, [this]() { std::lock_guard l(_mutex); return _broker_set.size(); @@ -42,8 +48,10 @@ BrokerMgr::BrokerMgr(ExecEnv* exec_env) : BrokerMgr::~BrokerMgr() { DEREGISTER_HOOK_METRIC(broker_count); - _thread_stop = true; - _ping_thread.join(); + _stop_background_threads_latch.count_down(); + if (_ping_thread) { + _ping_thread->join(); + } } void BrokerMgr::init() { @@ -93,7 +101,7 @@ void BrokerMgr::ping(const TNetworkAddress& addr) { } void BrokerMgr::ping_worker() { - while (!_thread_stop) { + do { std::vector addresses; { std::lock_guard l(_mutex); @@ -104,8 +112,7 @@ void BrokerMgr::ping_worker() { for (auto& addr : addresses) { ping(addr); } - sleep(5); - } + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(5))); } } diff --git a/be/src/runtime/broker_mgr.h b/be/src/runtime/broker_mgr.h index 6446d94faf0211..7842ebb5585edf 100644 --- a/be/src/runtime/broker_mgr.h +++ b/be/src/runtime/broker_mgr.h @@ -23,7 +23,10 @@ #include #include "gen_cpp/Types_types.h" +#include "gutil/ref_counted.h" #include "util/hash_util.hpp" +#include "util/countdown_latch.h" +#include "util/thread.h" namespace doris { @@ -43,8 +46,9 @@ class BrokerMgr { std::string _client_id; std::mutex _mutex; std::unordered_set _broker_set; - bool _thread_stop; - std::thread _ping_thread; + + CountDownLatch _stop_background_threads_latch; + scoped_refptr _ping_thread; }; } diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc index 2372048cc1aae1..29a282b041cbc1 100644 --- a/be/src/runtime/disk_io_mgr.cc +++ b/be/src/runtime/disk_io_mgr.cc @@ -783,7 +783,7 @@ bool DiskIoMgr::get_next_request_range(DiskQueue* disk_queue, RequestRange** ran *range = NULL; // This loops returns either with work to do or when the disk IoMgr shuts down. - while (true) { + while (!_shut_down) { *request_context = NULL; RequestContext::PerDiskState* request_disk_state = NULL; { @@ -989,7 +989,7 @@ void DiskIoMgr::work_loop(DiskQueue* disk_queue) { // re-enqueues the request. // 3. Perform the read or write as specified. // Cancellation checking needs to happen in both steps 1 and 3. - while (true) { + while (!_shut_down) { RequestContext* worker_context = NULL;; RequestRange* range = NULL; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a429cc679ce011..fb84bb114a3735 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -207,33 +207,33 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, } void ExecEnv::_destory() { - delete _brpc_stub_cache; - delete _load_stream_mgr; - delete _load_channel_mgr; - delete _broker_mgr; - delete _bfd_parser; - delete _tmp_file_mgr; - delete _disk_io_mgr; - delete _load_path_mgr; - delete _etl_job_mgr; - delete _master_info; - delete _fragment_mgr; - delete _cgroups_mgr; - delete _etl_thread_pool; - delete _thread_pool; - delete _thread_mgr; - delete _pool_mem_trackers; - delete _broker_client_cache; - delete _extdatasource_client_cache; - delete _frontend_client_cache; - delete _backend_client_cache; - delete _result_mgr; - delete _result_queue_mgr; - delete _stream_mgr; - delete _stream_load_executor; - delete _routine_load_task_executor; - delete _external_scan_context_mgr; - delete _heartbeat_flags; + SAFE_DELETE(_brpc_stub_cache); + SAFE_DELETE(_load_stream_mgr); + SAFE_DELETE(_load_channel_mgr); + SAFE_DELETE(_broker_mgr); + SAFE_DELETE(_bfd_parser); + SAFE_DELETE(_tmp_file_mgr); + SAFE_DELETE(_disk_io_mgr); + SAFE_DELETE(_load_path_mgr); + SAFE_DELETE(_etl_job_mgr); + SAFE_DELETE(_master_info); + SAFE_DELETE(_fragment_mgr); + SAFE_DELETE(_cgroups_mgr); + SAFE_DELETE(_etl_thread_pool); + SAFE_DELETE(_thread_pool); + SAFE_DELETE(_thread_mgr); + SAFE_DELETE(_pool_mem_trackers); + SAFE_DELETE(_broker_client_cache); + SAFE_DELETE(_extdatasource_client_cache); + SAFE_DELETE(_frontend_client_cache); + SAFE_DELETE(_backend_client_cache); + SAFE_DELETE(_result_mgr); + SAFE_DELETE(_result_queue_mgr); + SAFE_DELETE(_stream_mgr); + SAFE_DELETE(_stream_load_executor); + SAFE_DELETE(_routine_load_task_executor); + SAFE_DELETE(_external_scan_context_mgr); + SAFE_DELETE(_heartbeat_flags); } void ExecEnv::destroy(ExecEnv* env) { diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 03e1feefece599..9a9e4886e66207 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -30,11 +30,14 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(active_scan_context_count, MetricUnit::NOUNIT); -ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) : _exec_env(exec_env), _is_stop(false) { +ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { // start the reaper thread for gc the expired context - _keep_alive_reaper.reset( - new std::thread( - std::bind(std::mem_fn(&ExternalScanContextMgr::gc_expired_context), this))); + CHECK(Thread::create("ExternalScanContextMgr", "gc_expired_context", + [this]() { + this->gc_expired_context(); + }, + &_keep_alive_reaper).ok()); + REGISTER_HOOK_METRIC(active_scan_context_count, [this]() { std::lock_guard l(_lock); return _active_contexts.size(); @@ -43,8 +46,10 @@ ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) : _exec_env(ex ExternalScanContextMgr::~ExternalScanContextMgr() { DEREGISTER_HOOK_METRIC(active_scan_context_count); - _is_stop = true; - _keep_alive_reaper->join(); + _stop_background_threads_latch.count_down(); + if (_keep_alive_reaper) { + _keep_alive_reaper->join(); + } } Status ExternalScanContextMgr::create_scan_context(std::shared_ptr* p_context) { @@ -101,8 +106,7 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) void ExternalScanContextMgr::gc_expired_context() { #ifndef BE_TEST - while (!_is_stop) { - std::this_thread::sleep_for(std::chrono::seconds(doris::config::scan_context_gc_interval_min * 60)); + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(doris::config::scan_context_gc_interval_min * 60))) { time_t current_time = time(NULL); std::vector> expired_contexts; { diff --git a/be/src/runtime/external_scan_context_mgr.h b/be/src/runtime/external_scan_context_mgr.h index 6aa2893375620f..bc8665c9a79d7c 100644 --- a/be/src/runtime/external_scan_context_mgr.h +++ b/be/src/runtime/external_scan_context_mgr.h @@ -28,6 +28,9 @@ #include "common/status.h" #include "gen_cpp/Types_types.h" #include "runtime/exec_env.h" +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" +#include "util/thread.h" namespace doris { @@ -49,7 +52,6 @@ class ExternalScanContextMgr { public: ExternalScanContextMgr(ExecEnv* exec_env); - ~ExternalScanContextMgr(); Status create_scan_context(std::shared_ptr* p_context); @@ -58,14 +60,14 @@ class ExternalScanContextMgr { Status clear_scan_context(const std::string& context_id); - private: - ExecEnv* _exec_env; std::map> _active_contexts; void gc_expired_context(); - bool _is_stop; - std::unique_ptr _keep_alive_reaper; + + CountDownLatch _stop_background_threads_latch; + scoped_refptr _keep_alive_reaper; + std::mutex _lock; }; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c3bf83bb86863d..df0ff5f05255d7 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -380,8 +380,13 @@ void FragmentExecState::coordinator_callback( FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _fragment_map(), - _stop(false), - _cancel_thread(std::bind(&FragmentMgr::cancel_worker, this)) { + _stop_background_threads_latch(1) { + CHECK(Thread::create("FragmentMgr", "cancel_timeout_plan_fragment", + [this]() { + this->cancel_worker(); + }, + &_cancel_thread).ok()); + REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { std::lock_guard lock(_lock); return _fragment_map.size(); @@ -397,9 +402,10 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) FragmentMgr::~FragmentMgr() { DEREGISTER_HOOK_METRIC(plan_fragment_count); - // stop thread - _stop = true; - _cancel_thread.join(); + _stop_background_threads_latch.count_down(); + if (_cancel_thread) { + _cancel_thread->join(); + } // Stop all the worker, should wait for a while? // _thread_pool->wait_for(); _thread_pool->shutdown(); @@ -506,7 +512,7 @@ Status FragmentMgr::cancel(const TUniqueId& id, const PPlanFragmentCancelReason& void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; - while (!_stop) { + do { std::vector to_delete; DateTimeValue now = DateTimeValue::local_time(); { @@ -521,10 +527,7 @@ void FragmentMgr::cancel_worker() { cancel(id, PPlanFragmentCancelReason::TIMEOUT); LOG(INFO) << "FragmentMgr cancel worker going to cancel timouet fragment " << print_id(id); } - - // check every 1 seconds - sleep(1); - } + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(1))); LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 70c430cb9f3e20..287664e63f0b95 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -31,6 +31,9 @@ #include "gen_cpp/internal_service.pb.h" #include "util/hash_util.hpp" #include "http/rest_monitor_iface.h" +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" +#include "util/thread.h" namespace doris { @@ -86,9 +89,8 @@ class FragmentMgr : public RestMonitorIface { // Make sure that remove this before no data reference FragmentExecState std::unordered_map> _fragment_map; - // Cancel thread - bool _stop; - std::thread _cancel_thread; + CountDownLatch _stop_background_threads_latch; + scoped_refptr _cancel_thread; // every job is a pool std::unique_ptr _thread_pool; }; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index b02cb14b58d41a..5a20ec7d6ffd3e 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -63,7 +63,7 @@ static int64_t calc_job_timeout_s(int64_t timeout_in_req_s) { return load_channel_timeout_s; } -LoadChannelMgr::LoadChannelMgr() : _is_stopped(false) { +LoadChannelMgr::LoadChannelMgr() : _stop_background_threads_latch(1) { REGISTER_HOOK_METRIC(load_channel_count, [this]() { std::lock_guard l(_lock); return _load_channels.size(); @@ -73,9 +73,9 @@ LoadChannelMgr::LoadChannelMgr() : _is_stopped(false) { LoadChannelMgr::~LoadChannelMgr() { DEREGISTER_HOOK_METRIC(load_channel_count); - _is_stopped.store(true); - if (_load_channels_clean_thread.joinable()) { - _load_channels_clean_thread.join(); + _stop_background_threads_latch.count_down(); + if (_load_channels_clean_thread) { + _load_channels_clean_thread->join(); } delete _lastest_success_channel; } @@ -215,22 +215,22 @@ Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { } Status LoadChannelMgr::_start_bg_worker() { - _load_channels_clean_thread = std::thread( - [this] { + RETURN_IF_ERROR( + Thread::create("LoadChannelMgr", "cancel_timeout_load_channels", + [this]() { #ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); + ProfilerRegisterThread(); #endif - #ifndef BE_TEST - uint32_t interval = 60; + uint32_t interval = 60; #else - uint32_t interval = 1; + uint32_t interval = 1; #endif - while (!_is_stopped.load()) { - _start_load_channels_clean(); - sleep(interval); - } - }); + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))) { + _start_load_channels_clean(); + }}, + &_load_channels_clean_thread)); + return Status::OK(); } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 9af843561872dc..e506e192bb5f85 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -28,6 +28,9 @@ #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/internal_service.pb.h" #include "runtime/tablets_channel.h" +#include "util/countdown_latch.h" +#include "gutil/ref_counted.h" +#include "util/thread.h" #include "util/uid_util.h" namespace doris { @@ -72,10 +75,10 @@ class LoadChannelMgr { // check the total load mem consumption of this Backend std::shared_ptr _mem_tracker; + CountDownLatch _stop_background_threads_latch; // thread to clean timeout load channels - std::thread _load_channels_clean_thread; + scoped_refptr _load_channels_clean_thread; Status _start_load_channels_clean(); - std::atomic _is_stopped; }; } diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp index a794ae792d97de..d7e637cdd5d237 100644 --- a/be/src/runtime/load_path_mgr.cpp +++ b/be/src/runtime/load_path_mgr.cpp @@ -37,7 +37,14 @@ static const uint32_t MAX_SHARD_NUM = 1024; static const std::string SHARD_PREFIX = "__shard_"; LoadPathMgr::LoadPathMgr(ExecEnv* exec_env) : _exec_env(exec_env), - _idx(0), _next_shard(0), _error_path_next_shard(0) { } + _idx(0), _next_shard(0), _error_path_next_shard(0), _stop_background_threads_latch(1) { } + +LoadPathMgr::~LoadPathMgr() { + _stop_background_threads_latch.count_down(); + if (_clean_thread) { + _clean_thread->join(); + } +} Status LoadPathMgr::init() { _path_vec.clear(); @@ -53,20 +60,17 @@ Status LoadPathMgr::init() { _idx = 0; _reserved_hours = std::max(config::load_data_reserve_hours, 1L); - pthread_create(&_cleaner_id, nullptr, LoadPathMgr::cleaner, this); + RETURN_IF_ERROR( + Thread::create("LoadPathMgr", "clean_expired_temp_path", + [this]() { + // TODO(zc): add this thread to cgroup for control resource it use + while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(3600))) { + this->clean(); + }}, + &_clean_thread)); return Status::OK(); } -void* LoadPathMgr::cleaner(void* param) { - // TODO(zc): add this thread to cgroup for control resource it use - LoadPathMgr* mgr = (LoadPathMgr*)param; - while (true) { - sleep(3600); // clean every one hour - mgr->clean(); - } - return nullptr; -} - Status LoadPathMgr::allocate_dir( const std::string& db, const std::string& label, diff --git a/be/src/runtime/load_path_mgr.h b/be/src/runtime/load_path_mgr.h index d03e71cd837db2..fd7fe6f4bc45e1 100644 --- a/be/src/runtime/load_path_mgr.h +++ b/be/src/runtime/load_path_mgr.h @@ -22,7 +22,11 @@ #include #include #include + #include "common/status.h" +#include "gutil/ref_counted.h" +#include "util/thread.h" +#include "util/uid_util.h" namespace doris { @@ -34,9 +38,7 @@ class ExecEnv; class LoadPathMgr { public: LoadPathMgr(ExecEnv* env); - - ~LoadPathMgr() { - } + ~LoadPathMgr(); Status init(); @@ -61,8 +63,6 @@ class LoadPathMgr { void clean(); void process_path(time_t now, const std::string& path, int64_t reserve_hours); - static void* cleaner(void* param); - ExecEnv* _exec_env; std::mutex _lock; std::vector _path_vec; @@ -72,6 +72,8 @@ class LoadPathMgr { std::string _error_log_dir; uint32_t _next_shard; uint32_t _error_path_next_shard; + CountDownLatch _stop_background_threads_latch; + scoped_refptr _clean_thread; }; } diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 2d9c84ad3284ef..8983298be288a1 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -35,7 +35,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(result_buffer_block_count, MetricUnit::NOUNIT //} ResultBufferMgr::ResultBufferMgr() - : _is_stop(false) { + : _stop_background_threads_latch(1) { // Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the // actual size of all BufferControlBlock. REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() { @@ -46,14 +46,16 @@ ResultBufferMgr::ResultBufferMgr() ResultBufferMgr::~ResultBufferMgr() { DEREGISTER_HOOK_METRIC(result_buffer_block_count); - _is_stop = true; - _cancel_thread->join(); + _stop_background_threads_latch.count_down(); + if (_clean_thread) { + _clean_thread->join(); + } } Status ResultBufferMgr::init() { - _cancel_thread.reset( - new boost::thread( - boost::bind(boost::mem_fn(&ResultBufferMgr::cancel_thread), this))); + RETURN_IF_ERROR(Thread::create("ResultBufferMgr", "cancel_timeout_result", + [this]() { this->cancel_thread(); }, + &_clean_thread)); return Status::OK(); } @@ -144,7 +146,7 @@ Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& quer void ResultBufferMgr::cancel_thread() { LOG(INFO) << "result buffer manager cancel thread begin."; - while (!_is_stop) { + do { // get query std::vector query_to_cancel; time_t now_time = time(NULL); @@ -165,9 +167,7 @@ void ResultBufferMgr::cancel_thread() { for (int i = 0; i < query_to_cancel.size(); ++i) { cancel(query_to_cancel[i]); } - - sleep(1); - } + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(1))); LOG(INFO) << "result buffer manager cancel thread finish."; } diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index c38476a35c41ef..07b69bdc586a18 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -28,6 +28,9 @@ #include "common/status.h" #include "gen_cpp/Types_types.h" #include "util/uid_util.h" +#include "gutil/ref_counted.h" +#include "util/thread.h" +#include "util/uid_util.h" namespace doris { @@ -69,7 +72,6 @@ class ResultBufferMgr { // when fe crush, this thread clear the buffer avoid memory leak in this backend void cancel_thread(); - bool _is_stop; // lock for buffer map boost::mutex _lock; // buffer block map @@ -82,7 +84,8 @@ class ResultBufferMgr { // cancel time maybe equal, so use one list TimeoutMap _timeout_map; - boost::scoped_ptr _cancel_thread; + CountDownLatch _stop_background_threads_latch; + scoped_refptr _clean_thread; }; // TUniqueId hash function used for boost::unordered_map diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 2810cd68f78f15..3fb80459453f45 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -111,19 +111,16 @@ void DataConsumerPool::return_consumers(DataConsumerGroup* grp) { } Status DataConsumerPool::start_bg_worker() { - _clean_idle_consumer_thread = std::thread( - [this] { - #ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); - #endif - - uint32_t interval = 60; - while (true) { - _clean_idle_consumer_bg(); - sleep(interval); - } - }); - _clean_idle_consumer_thread.detach(); + RETURN_IF_ERROR(Thread::create("ResultBufferMgr", "clean_idle_consumer", + [this]() { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + do { + _clean_idle_consumer_bg(); + } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(60))); + }, + &_clean_idle_consumer_thread)); return Status::OK(); } diff --git a/be/src/runtime/routine_load/data_consumer_pool.h b/be/src/runtime/routine_load/data_consumer_pool.h index 16808dbb2332fc..05dc0153804272 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.h +++ b/be/src/runtime/routine_load/data_consumer_pool.h @@ -20,10 +20,13 @@ #include #include #include -#include +#include "gutil/ref_counted.h" #include "runtime/routine_load/data_consumer.h" +#include "util/countdown_latch.h" #include "util/lru_cache.hpp" +#include "util/thread.h" + namespace doris { @@ -35,11 +38,16 @@ class Status; // to be reused class DataConsumerPool { public: - DataConsumerPool(int64_t max_pool_size): - _max_pool_size(max_pool_size) { + DataConsumerPool(int64_t max_pool_size) + : _max_pool_size(max_pool_size), + _stop_background_threads_latch(1) { } ~DataConsumerPool() { + _stop_background_threads_latch.count_down(); + if (_clean_idle_consumer_thread) { + _clean_idle_consumer_thread->join(); + } } // get a already initialized consumer from cache, @@ -68,7 +76,8 @@ class DataConsumerPool { std::list> _pool; int64_t _max_pool_size; - std::thread _clean_idle_consumer_thread; + CountDownLatch _stop_background_threads_latch; + scoped_refptr _clean_idle_consumer_thread; }; } // end namespace doris diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp index 2684aa86fd4906..a388ed85b465b6 100644 --- a/be/src/service/brpc_service.cpp +++ b/be/src/service/brpc_service.cpp @@ -66,7 +66,9 @@ Status BRpcService::start(int port) { } void BRpcService::join() { + _server->Stop(1000); _server->Join(); + _server->ClearServices(); } } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 5fdb29e07f7ba7..1734528cb78a83 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -172,7 +172,9 @@ int main(int argc, char** argv) { // add logger for thrift internal apache::thrift::GlobalOutput.setOutputFunction(doris::thrift_output); - doris::init_daemon(argc, argv, paths); + doris::Daemon daemon; + daemon.init(argc, argv, paths); + daemon.start(); doris::ResourceTls::init(); if (!doris::BackendOptions::init()) { @@ -258,12 +260,22 @@ int main(int argc, char** argv) { #endif sleep(10); } + http_service.stop(); + brpc_service.join(); + daemon.stop(); heartbeat_thrift_server->stop(); heartbeat_thrift_server->join(); be_server->stop(); be_server->join(); + engine->stop(); delete be_server; + be_server = nullptr; + delete engine; + engine = nullptr; + delete heartbeat_thrift_server; + heartbeat_thrift_server = nullptr; + doris::ExecEnv::destroy(exec_env); return 0; } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 191d40aac4033d..c902c0e954f814 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -130,8 +130,12 @@ Status HttpService::start() { UpdateConfigAction* update_config_action = new UpdateConfigAction(); _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action); - RETURN_IF_ERROR(_ev_http_server->start()); + _ev_http_server->start(); return Status::OK(); } +void HttpService::stop() { + _ev_http_server->stop(); +} + } diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h index f79208b0277709..feb82e6fa8b9b1 100644 --- a/be/src/service/http_service.h +++ b/be/src/service/http_service.h @@ -34,6 +34,8 @@ class HttpService { ~HttpService(); Status start(); + void stop(); + private: ExecEnv* _env; diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp b/be/test/runtime/external_scan_context_mgr_test.cpp index 73dc4feaeb937f..9d6ed82783b9cc 100644 --- a/be/test/runtime/external_scan_context_mgr_test.cpp +++ b/be/test/runtime/external_scan_context_mgr_test.cpp @@ -53,7 +53,6 @@ class ExternalScanContextMgrTest : public testing::Test { TEST_F(ExternalScanContextMgrTest, create_normal) { std::shared_ptr context; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.create_scan_context(&context); ASSERT_TRUE(st.ok()); ASSERT_TRUE(context != nullptr); @@ -62,7 +61,6 @@ TEST_F(ExternalScanContextMgrTest, create_normal) { TEST_F(ExternalScanContextMgrTest, get_normal) { std::shared_ptr context; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.create_scan_context(&context); ASSERT_TRUE(st.ok()); ASSERT_TRUE(context != nullptr); @@ -78,7 +76,6 @@ TEST_F(ExternalScanContextMgrTest, get_abnormal) { std::string context_id = "not_exist"; std::shared_ptr result; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.get_scan_context(context_id, &result); ASSERT_TRUE(!st.ok()); ASSERT_TRUE(result == nullptr); @@ -87,7 +84,6 @@ TEST_F(ExternalScanContextMgrTest, get_abnormal) { TEST_F(ExternalScanContextMgrTest, clear_context) { std::shared_ptr context; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.create_scan_context(&context); ASSERT_TRUE(st.ok()); ASSERT_TRUE(context != nullptr); From 9413ab409579cd8c20f5c1831d242b31ecb3e153 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Mon, 24 Aug 2020 23:12:40 +0800 Subject: [PATCH 2/3] remove comments --- be/src/olap/storage_engine.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index fa72c9f85ba5e9..1b470d10671f55 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -497,7 +497,6 @@ void StorageEngine::stop() { std::lock_guard l(_store_lock); for (auto& store_pair : _store_map) { - // TODO(yingchun): is it needed? store_pair.second->stop_bg_worker(); } From 2f878abbf846b9359086b6a4315ce102087cad32 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Wed, 2 Sep 2020 14:51:56 +0800 Subject: [PATCH 3/3] fix redundant assignment --- be/src/olap/olap_server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 50935f06a672f2..3f860f4f5971ec 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -363,7 +363,7 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) { LOG(INFO) << "try to perform path scan!"; data_dir->perform_path_scan(); - int32_t interval = config::path_scan_interval_second; + interval = config::path_scan_interval_second; if (interval <= 0) { LOG(WARNING) << "path gc thread check interval config is illegal:" << interval << "will be forced set to one day";