From 0e1b997200a35acd28bb50956e3a221acafb5044 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 14 Sep 2020 13:20:27 +0800 Subject: [PATCH 1/2] [Bug] Tablet and Disk report thread not work The tablet and disk information reporting threads need to report to the FE periodically. At the same time these two reporting threads will also be triggered by certain events. The modification in PR #4440 caused these two threads to be triggered only by events, and could not report regularly. --- be/src/agent/heartbeat_server.cpp | 2 +- be/src/agent/task_worker_pool.cpp | 79 ++++++++++++++++++++++--------- be/src/agent/task_worker_pool.h | 10 ++++ be/src/olap/storage_engine.cpp | 16 +++++-- be/src/olap/storage_engine.h | 2 +- 5 files changed, 79 insertions(+), 30 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 2157c42bb642d9..c2ac2ceb7f6dc9 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -159,7 +159,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; - _olap_engine->notify_listeners(); + _olap_engine->notify_listeners(false); } return Status::OK(); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 411e294772c783..85e6f6c978714c 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -77,9 +77,36 @@ Mutex TaskWorkerPool::_s_task_signatures_lock; map> TaskWorkerPool::_s_task_signatures; FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache; +const char* TaskWorkerPool::TYPE_STRING[] = { + "CREATE_TABLE", + "DROP_TABLE", + "PUSH", + "REALTIME_PUSH", + "PUBLISH_VERSION", + "CLEAR_ALTER_TASK", + "CLEAR_TRANSACTION_TASK", + "DELETE", + "ALTER_TABLE", + "QUERY_SPLIT_KEY", + "CLONE", + "STORAGE_MEDIUM_MIGRATE", + "CHECK_CONSISTENCY", + "REPORT_TASK", + "REPORT_DISK_STATE", + "REPORT_OLAP_TABLE", + "UPLOAD", + "DOWNLOAD", + "MAKE_SNAPSHOT", + "RELEASE_SNAPSHOT", + "MOVE", + "RECOVER_TABLET", + "UPDATE_TABLET_META_INFO" +}; + TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info) - : _master_info(master_info), + : _name(strings::Substitute("TaskWorkerPool.$0", TYPE_STRING[_task_worker_type])), + _master_info(master_info), _agent_utils(new AgentUtils()), _master_client(new MasterServerClient(_master_info, &_master_service_client_cache)), _env(env), @@ -186,8 +213,7 @@ void TaskWorkerPool::start() { } #ifndef BE_TEST - // TODO(yingchun): need a better name - ThreadPoolBuilder(strings::Substitute("TaskWorkerPool.$0", _task_worker_type)) + ThreadPoolBuilder(_name) .set_min_threads(_worker_count) .set_max_threads(_worker_count) .build(&_thread_pool); @@ -233,6 +259,11 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { } } +void TaskWorkerPool::notify_thread() { + _worker_thread_condition_variable.notify_one(); + LOG(INFO) << "notify task worker pool: " << _name; +} + bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) { lock_guard task_signatures_lock(_s_task_signatures_lock); set& signature_set = _s_task_signatures[task_type]; @@ -1007,13 +1038,18 @@ void TaskWorkerPool::_report_task_worker_thread_callback() { if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_task_requests_failed->increment(1); - LOG(WARNING) << "finish report task failed. status:" << status << ", master host:" + LOG(WARNING) << "report task failed. status:" << status << ", master host:" << _master_info.network_address.hostname << "port:" << _master_info.network_address.port; + } else { + LOG(INFO) << "finish report task. master host: " + << _master_info.network_address.hostname + << "port:" << _master_info.network_address.port; } } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::report_task_interval_seconds))); } +/// disk state report thread will report disk state at a configurable fix interval. void TaskWorkerPool::_report_disk_state_worker_thread_callback() { StorageEngine::instance()->register_report_listener(this); @@ -1029,15 +1065,11 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { continue; } - lock_guard worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(); - } + // wait at most report_disk_state_interval_seconds, or being notified + _worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_disk_state_interval_seconds)); if (!_is_work) { - return; + break; } - TAgentTaskRequest agent_task_req = _tasks.front(); - _tasks.pop_front(); vector data_dir_infos; _env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */); @@ -1062,11 +1094,14 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_disk_requests_failed->increment(1); - LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:" + LOG(WARNING) << "report disk state failed. status:" << status << ", master host:" << _master_info.network_address.hostname << ", port:" << _master_info.network_address.port; + } else { + LOG(INFO) << "finish report disk state. master host:" + << _master_info.network_address.hostname + << ", port:" << _master_info.network_address.port; } - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } StorageEngine::instance()->deregister_report_listener(this); } @@ -1088,17 +1123,12 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { continue; } - lock_guard worker_thread_lock(_worker_thread_lock); - while (_is_work && _tasks.empty()) { - _worker_thread_condition_variable.wait(); - } + // wait at most report_tablet_interval_seconds, or being notified + _worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_tablet_interval_seconds)); if (!_is_work) { - return; + break; } - TAgentTaskRequest agent_task_req = _tasks.front(); - _tasks.pop_front(); - request.tablets.clear(); OLAPStatus report_all_tablets_info_status = StorageEngine::instance()->tablet_manager()->report_all_tablets_info( @@ -1117,12 +1147,15 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { AgentStatus status = _master_client->report(request, &result); if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1); - LOG(WARNING) << "finish report olap table state failed. status:" << status + LOG(WARNING) << "report tablets failed. status:" << status << ", master host:" << _master_info.network_address.hostname << ", port:" << _master_info.network_address.port; + } else { + LOG(INFO) << "finish report tablets. master host:" + << _master_info.network_address.hostname + << ", port:" << _master_info.network_address.port; } - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } StorageEngine::instance()->deregister_report_listener(this); } diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index d7536d7a861600..fc1b64a3e6aaf6 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -43,6 +43,8 @@ class ThreadPool; class TaskWorkerPool { public: + // You need to modify the content in TYPE_STRING at the same time, + // and pay attention to ensure that the order is consistent. enum TaskWorkerType { CREATE_TABLE, DROP_TABLE, @@ -89,6 +91,9 @@ class TaskWorkerPool { // * task: the task need callback thread to do virtual void submit_task(const TAgentTaskRequest& task); + // notify the worker. currently for task/disk/tablet report thread + void notify_thread(); + private: bool _register_task_info(const TTaskType::type task_type, int64_t signature); void _remove_task_info(const TTaskType::type task_type, int64_t signature); @@ -135,6 +140,9 @@ class TaskWorkerPool { bool overwrite, std::vector* error_msgs); +private: + std::string _name; + // Reference to the ExecEnv::_master_info const TMasterInfo& _master_info; TBackend _backend; @@ -159,6 +167,8 @@ class TaskWorkerPool { static Mutex _s_task_signatures_lock; static std::map> _s_task_signatures; + static const char *TYPE_STRING[]; + DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool); }; // class TaskWorkerPool } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index a07d6b9eb93558..7ebbbbf828fda1 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -351,7 +351,7 @@ void StorageEngine::_start_disk_stat_monitor() { // If some tablets were dropped, we should notify disk_state_worker_thread and // tablet_worker_thread (see TaskWorkerPool) to make them report to FE ASAP. if (some_tablets_were_dropped) { - notify_listeners(); + notify_listeners(false); } } @@ -493,7 +493,7 @@ bool StorageEngine::_delete_tablets_on_unused_root_path() { void StorageEngine::stop() { // trigger the waitting threads - notify_listeners(); + notify_listeners(false); std::lock_guard l(_store_lock); for (auto& store_pair : _store_map) { @@ -975,11 +975,17 @@ void StorageEngine::deregister_report_listener(TaskWorkerPool* listener) { _report_listeners.erase(listener); } -void StorageEngine::notify_listeners() { +/// if submit_task is true, it will notify the listeners by submmiting a task. +/// otherwise, it just notify the thread +void StorageEngine::notify_listeners(bool submit_task) { std::lock_guard l(_report_mtx); for (auto& listener : _report_listeners) { - TAgentTaskRequest task; - listener->submit_task(task); + if (submit_task) { + TAgentTaskRequest task; + listener->submit_task(task); + } else { + listener->notify_thread(); + } } } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 0b3aacfee3e81e..60e62d920f43aa 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -141,7 +141,7 @@ class StorageEngine { void register_report_listener(TaskWorkerPool* listener); void deregister_report_listener(TaskWorkerPool* listener); - void notify_listeners(); + void notify_listeners(bool submit_task); OLAPStatus execute_task(EngineTask* task); From e28c782079fecbde8746955d25712450ffff2523 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 14 Sep 2020 14:53:03 +0800 Subject: [PATCH 2/2] fix by review --- be/src/agent/heartbeat_server.cpp | 2 +- be/src/agent/task_worker_pool.cpp | 53 ++++++++----------------------- be/src/agent/task_worker_pool.h | 32 +++++++++++++++++-- be/src/olap/storage_engine.cpp | 15 +++------ be/src/olap/storage_engine.h | 2 +- 5 files changed, 48 insertions(+), 56 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index c2ac2ceb7f6dc9..2157c42bb642d9 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -159,7 +159,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; - _olap_engine->notify_listeners(false); + _olap_engine->notify_listeners(); } return Status::OK(); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 85e6f6c978714c..2aacd4c39ecbf3 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -77,35 +77,9 @@ Mutex TaskWorkerPool::_s_task_signatures_lock; map> TaskWorkerPool::_s_task_signatures; FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache; -const char* TaskWorkerPool::TYPE_STRING[] = { - "CREATE_TABLE", - "DROP_TABLE", - "PUSH", - "REALTIME_PUSH", - "PUBLISH_VERSION", - "CLEAR_ALTER_TASK", - "CLEAR_TRANSACTION_TASK", - "DELETE", - "ALTER_TABLE", - "QUERY_SPLIT_KEY", - "CLONE", - "STORAGE_MEDIUM_MIGRATE", - "CHECK_CONSISTENCY", - "REPORT_TASK", - "REPORT_DISK_STATE", - "REPORT_OLAP_TABLE", - "UPLOAD", - "DOWNLOAD", - "MAKE_SNAPSHOT", - "RELEASE_SNAPSHOT", - "MOVE", - "RECOVER_TABLET", - "UPDATE_TABLET_META_INFO" -}; - TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info) - : _name(strings::Substitute("TaskWorkerPool.$0", TYPE_STRING[_task_worker_type])), + : _name(strings::Substitute("TaskWorkerPool.$0", TYPE_STRING(_task_worker_type))), _master_info(master_info), _agent_utils(new AgentUtils()), _master_client(new MasterServerClient(_master_info, &_master_service_client_cache)), @@ -1001,7 +975,7 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() { << ", signature: " << agent_task_req.signature; status_code = TStatusCode::RUNTIME_ERROR; } else { - LOG(INFO) << "check consistency success. status:" << res + LOG(INFO) << "check consistency success. status: " << res << ", signature:" << agent_task_req.signature << ", checksum:" << checksum; } @@ -1038,13 +1012,13 @@ void TaskWorkerPool::_report_task_worker_thread_callback() { if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_task_requests_failed->increment(1); - LOG(WARNING) << "report task failed. status:" << status << ", master host:" + LOG(WARNING) << "report task failed. status: " << status << ", master host: " << _master_info.network_address.hostname - << "port:" << _master_info.network_address.port; + << "port: " << _master_info.network_address.port; } else { LOG(INFO) << "finish report task. master host: " << _master_info.network_address.hostname - << "port:" << _master_info.network_address.port; + << "port: " << _master_info.network_address.port; } } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::report_task_interval_seconds))); } @@ -1094,13 +1068,13 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_disk_requests_failed->increment(1); - LOG(WARNING) << "report disk state failed. status:" << status << ", master host:" + LOG(WARNING) << "report disk state failed. status: " << status << ", master host: " << _master_info.network_address.hostname - << ", port:" << _master_info.network_address.port; + << ", port: " << _master_info.network_address.port; } else { - LOG(INFO) << "finish report disk state. master host:" + LOG(INFO) << "finish report disk state. master host: " << _master_info.network_address.hostname - << ", port:" << _master_info.network_address.port; + << ", port: " << _master_info.network_address.port; } } StorageEngine::instance()->deregister_report_listener(this); @@ -1147,14 +1121,13 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { AgentStatus status = _master_client->report(request, &result); if (status != DORIS_SUCCESS) { DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1); - LOG(WARNING) << "report tablets failed. status:" << status - << ", master host:" - << _master_info.network_address.hostname + LOG(WARNING) << "report tablets failed. status: " << status + << ", master host: " << _master_info.network_address.hostname << ", port:" << _master_info.network_address.port; } else { - LOG(INFO) << "finish report tablets. master host:" + LOG(INFO) << "finish report tablets. master host: " << _master_info.network_address.hostname - << ", port:" << _master_info.network_address.port; + << ", port: " << _master_info.network_address.port; } } StorageEngine::instance()->deregister_report_listener(this); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index fc1b64a3e6aaf6..7cb1d0aca18acb 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -44,7 +44,6 @@ class ThreadPool; class TaskWorkerPool { public: // You need to modify the content in TYPE_STRING at the same time, - // and pay attention to ensure that the order is consistent. enum TaskWorkerType { CREATE_TABLE, DROP_TABLE, @@ -73,6 +72,35 @@ class TaskWorkerPool { UPDATE_TABLET_META_INFO }; + inline const std::string TYPE_STRING(TaskWorkerType type) { + switch(type) { + case CREATE_TABLE: return "CREATE_TABLE"; + case DROP_TABLE: return "DROP_TABLE"; + case PUSH: return "PUSH"; + case REALTIME_PUSH: return "REALTIME_PUSH"; + case PUBLISH_VERSION: return "PUBLISH_VERSION"; + case CLEAR_ALTER_TASK: return "CLEAR_ALTER_TASK"; + case CLEAR_TRANSACTION_TASK: return "CLEAR_TRANSACTION_TASK"; + case DELETE: return "DELETE"; + case ALTER_TABLE: return "ALTER_TABLE"; + case QUERY_SPLIT_KEY: return "QUERY_SPLIT_KEY"; + case CLONE: return "CLONE"; + case STORAGE_MEDIUM_MIGRATE: return "STORAGE_MEDIUM_MIGRATE"; + case CHECK_CONSISTENCY: return "CHECK_CONSISTENCY"; + case REPORT_TASK: return "REPORT_TASK"; + case REPORT_DISK_STATE: return "REPORT_DISK_STATE"; + case REPORT_OLAP_TABLE: return "REPORT_OLAP_TABLE"; + case UPLOAD: return "UPLOAD"; + case DOWNLOAD: return "DOWNLOAD"; + case MAKE_SNAPSHOT: return "MAKE_SNAPSHOT"; + case RELEASE_SNAPSHOT: return "RELEASE_SNAPSHOT"; + case MOVE: return "MOVE"; + case RECOVER_TABLET: return "RECOVER_TABLET"; + case UPDATE_TABLET_META_INFO: return "UPDATE_TABLET_META_INFO"; + default: return "Unknown"; + } + } + TaskWorkerPool( const TaskWorkerType task_worker_type, ExecEnv* env, @@ -167,8 +195,6 @@ class TaskWorkerPool { static Mutex _s_task_signatures_lock; static std::map> _s_task_signatures; - static const char *TYPE_STRING[]; - DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool); }; // class TaskWorkerPool } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 7ebbbbf828fda1..689fa8a90bd01b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -351,7 +351,7 @@ void StorageEngine::_start_disk_stat_monitor() { // If some tablets were dropped, we should notify disk_state_worker_thread and // tablet_worker_thread (see TaskWorkerPool) to make them report to FE ASAP. if (some_tablets_were_dropped) { - notify_listeners(false); + notify_listeners(); } } @@ -493,7 +493,7 @@ bool StorageEngine::_delete_tablets_on_unused_root_path() { void StorageEngine::stop() { // trigger the waitting threads - notify_listeners(false); + notify_listeners(); std::lock_guard l(_store_lock); for (auto& store_pair : _store_map) { @@ -975,17 +975,10 @@ void StorageEngine::deregister_report_listener(TaskWorkerPool* listener) { _report_listeners.erase(listener); } -/// if submit_task is true, it will notify the listeners by submmiting a task. -/// otherwise, it just notify the thread -void StorageEngine::notify_listeners(bool submit_task) { +void StorageEngine::notify_listeners() { std::lock_guard l(_report_mtx); for (auto& listener : _report_listeners) { - if (submit_task) { - TAgentTaskRequest task; - listener->submit_task(task); - } else { - listener->notify_thread(); - } + listener->notify_thread(); } } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 60e62d920f43aa..0b3aacfee3e81e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -141,7 +141,7 @@ class StorageEngine { void register_report_listener(TaskWorkerPool* listener); void deregister_report_listener(TaskWorkerPool* listener); - void notify_listeners(bool submit_task); + void notify_listeners(); OLAPStatus execute_task(EngineTask* task);