diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 411e294772c783..2aacd4c39ecbf3 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -79,7 +79,8 @@ FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache; TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info) - : _master_info(master_info), + : _name(strings::Substitute("TaskWorkerPool.$0", TYPE_STRING(_task_worker_type))), + _master_info(master_info), _agent_utils(new AgentUtils()), _master_client(new MasterServerClient(_master_info, &_master_service_client_cache)), _env(env), @@ -186,8 +187,7 @@ void TaskWorkerPool::start() { } #ifndef BE_TEST - // TODO(yingchun): need a better name - ThreadPoolBuilder(strings::Substitute("TaskWorkerPool.$0", _task_worker_type)) + ThreadPoolBuilder(_name) .set_min_threads(_worker_count) .set_max_threads(_worker_count) .build(&_thread_pool); @@ -233,6 +233,11 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { } } +void TaskWorkerPool::notify_thread() { + _worker_thread_condition_variable.notify_one(); + LOG(INFO) << "notify task worker pool: " << _name; +} + bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) { lock_guard task_signatures_lock(_s_task_signatures_lock); set& signature_set = _s_task_signatures[task_type]; @@ -970,7 +975,7 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() { << ", signature: " << agent_task_req.signature; status_code = TStatusCode::RUNTIME_ERROR; } else { - LOG(INFO) << "check consistency success. status:" << res + LOG(INFO) << "check consistency success. status: " << res << ", signature:" << agent_task_req.signature << ", checksum:" << checksum; } @@ -1007,13 +1012,18 @@ void TaskWorkerPool::_report_task_worker_thread_callback() { if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_task_requests_failed->increment(1); - LOG(WARNING) << "finish report task failed. status:" << status << ", master host:" + LOG(WARNING) << "report task failed. status: " << status << ", master host: " << _master_info.network_address.hostname - << "port:" << _master_info.network_address.port; + << "port: " << _master_info.network_address.port; + } else { + LOG(INFO) << "finish report task. master host: " + << _master_info.network_address.hostname + << "port: " << _master_info.network_address.port; } } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::report_task_interval_seconds))); } +/// disk state report thread will report disk state at a configurable fix interval. void TaskWorkerPool::_report_disk_state_worker_thread_callback() { StorageEngine::instance()->register_report_listener(this); @@ -1029,15 +1039,11 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { continue; } - lock_guard worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(); - } + // wait at most report_disk_state_interval_seconds, or being notified + _worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_disk_state_interval_seconds)); if (!_is_work) { - return; + break; } - TAgentTaskRequest agent_task_req = _tasks.front(); - _tasks.pop_front(); vector data_dir_infos; _env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */); @@ -1062,11 +1068,14 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_disk_requests_failed->increment(1); - LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:" + LOG(WARNING) << "report disk state failed. status: " << status << ", master host: " << _master_info.network_address.hostname - << ", port:" << _master_info.network_address.port; + << ", port: " << _master_info.network_address.port; + } else { + LOG(INFO) << "finish report disk state. master host: " + << _master_info.network_address.hostname + << ", port: " << _master_info.network_address.port; } - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } StorageEngine::instance()->deregister_report_listener(this); } @@ -1088,17 +1097,12 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { continue; } - lock_guard worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(); - } + // wait at most report_tablet_interval_seconds, or being notified + _worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_tablet_interval_seconds)); if (!_is_work) { - return; + break; } - TAgentTaskRequest agent_task_req = _tasks.front(); - _tasks.pop_front(); - request.tablets.clear(); OLAPStatus report_all_tablets_info_status = StorageEngine::instance()->tablet_manager()->report_all_tablets_info( @@ -1117,12 +1121,14 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { AgentStatus status = _master_client->report(request, &result); if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1); - LOG(WARNING) << "finish report olap table state failed. status:" << status - << ", master host:" - << _master_info.network_address.hostname + LOG(WARNING) << "report tablets failed. status: " << status + << ", master host: " << _master_info.network_address.hostname << ", port:" << _master_info.network_address.port; + } else { + LOG(INFO) << "finish report tablets. master host: " + << _master_info.network_address.hostname + << ", port: " << _master_info.network_address.port; } - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } StorageEngine::instance()->deregister_report_listener(this); } diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index d7536d7a861600..7cb1d0aca18acb 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -43,6 +43,7 @@ class ThreadPool; class TaskWorkerPool { public: + // You need to modify the content in TYPE_STRING at the same time, enum TaskWorkerType { CREATE_TABLE, DROP_TABLE, @@ -71,6 +72,35 @@ class TaskWorkerPool { UPDATE_TABLET_META_INFO }; + inline const std::string TYPE_STRING(TaskWorkerType type) { + switch(type) { + case CREATE_TABLE: return "CREATE_TABLE"; + case DROP_TABLE: return "DROP_TABLE"; + case PUSH: return "PUSH"; + case REALTIME_PUSH: return "REALTIME_PUSH"; + case PUBLISH_VERSION: return "PUBLISH_VERSION"; + case CLEAR_ALTER_TASK: return "CLEAR_ALTER_TASK"; + case CLEAR_TRANSACTION_TASK: return "CLEAR_TRANSACTION_TASK"; + case DELETE: return "DELETE"; + case ALTER_TABLE: return "ALTER_TABLE"; + case QUERY_SPLIT_KEY: return "QUERY_SPLIT_KEY"; + case CLONE: return "CLONE"; + case STORAGE_MEDIUM_MIGRATE: return "STORAGE_MEDIUM_MIGRATE"; + case CHECK_CONSISTENCY: return "CHECK_CONSISTENCY"; + case REPORT_TASK: return "REPORT_TASK"; + case REPORT_DISK_STATE: return "REPORT_DISK_STATE"; + case REPORT_OLAP_TABLE: return "REPORT_OLAP_TABLE"; + case UPLOAD: return "UPLOAD"; + case DOWNLOAD: return "DOWNLOAD"; + case MAKE_SNAPSHOT: return "MAKE_SNAPSHOT"; + case RELEASE_SNAPSHOT: return "RELEASE_SNAPSHOT"; + case MOVE: return "MOVE"; + case RECOVER_TABLET: return "RECOVER_TABLET"; + case UPDATE_TABLET_META_INFO: return "UPDATE_TABLET_META_INFO"; + default: return "Unknown"; + } + } + TaskWorkerPool( const TaskWorkerType task_worker_type, ExecEnv* env, @@ -89,6 +119,9 @@ class TaskWorkerPool { // * task: the task need callback thread to do virtual void submit_task(const TAgentTaskRequest& task); + // notify the worker. currently for task/disk/tablet report thread + void notify_thread(); + private: bool _register_task_info(const TTaskType::type task_type, int64_t signature); void _remove_task_info(const TTaskType::type task_type, int64_t signature); @@ -135,6 +168,9 @@ class TaskWorkerPool { bool overwrite, std::vector* error_msgs); +private: + std::string _name; + // Reference to the ExecEnv::_master_info const TMasterInfo& _master_info; TBackend _backend; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index a07d6b9eb93558..689fa8a90bd01b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -978,8 +978,7 @@ void StorageEngine::deregister_report_listener(TaskWorkerPool* listener) { void StorageEngine::notify_listeners() { std::lock_guard l(_report_mtx); for (auto& listener : _report_listeners) { - TAgentTaskRequest task; - listener->submit_task(task); + listener->notify_thread(); } }