diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 725ca95d5bfdff..8b6a6545426488 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -71,6 +71,8 @@ map> TaskWorkerPool::_s_running_task_user map> TaskWorkerPool::_s_total_task_user_count; map TaskWorkerPool::_s_total_task_count; FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache; +boost::mutex TaskWorkerPool::_disk_broken_lock; +boost::posix_time::time_duration TaskWorkerPool::_wait_duration; TaskWorkerPool::TaskWorkerPool( const TaskWorkerType task_worker_type, @@ -146,10 +148,12 @@ void TaskWorkerPool::start() { _callback_function = _report_task_worker_thread_callback; break; case TaskWorkerType::REPORT_DISK_STATE: + _wait_duration = boost::posix_time::time_duration(0, 0, config::report_disk_state_interval_seconds, 0); _worker_count = REPORT_DISK_STATE_WORKER_COUNT; _callback_function = _report_disk_state_worker_thread_callback; break; case TaskWorkerType::REPORT_OLAP_TABLE: + _wait_duration = boost::posix_time::time_duration(0, 0, config::report_disk_state_interval_seconds, 0); _worker_count = REPORT_OLAP_TABLE_WORKER_COUNT; _callback_function = _report_olap_table_worker_thread_callback; break; @@ -1525,15 +1529,6 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) OLAPStatus get_all_root_path_stat = worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat); - if (get_all_root_path_stat != OLAPStatus::OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to get all root path stat."); -#ifndef BE_TEST - sleep(config::report_disk_state_interval_seconds); - continue; -#else - return (void*)0; -#endif - } map disks; for (auto root_path_state : root_paths_stat) { @@ -1558,7 +1553,15 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } #ifndef BE_TEST - sleep(config::report_disk_state_interval_seconds); + { + // wait disk_broken_cv awaken + // if awaken, set is_report_disk_state_already to true, it will not notify again + // if overtime, while will go to next cycle + boost::unique_lock lk(_disk_broken_lock); + if (OLAPRootPath::get_instance()->disk_broken_cv.timed_wait(lk, _wait_duration)) { + OLAPRootPath::get_instance()->is_report_disk_state_already = true; + } + } } #endif @@ -1588,7 +1591,13 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this) OLAP_LOG_WARNING("report get all tablets info failed. status: %d", report_all_tablets_info_status); #ifndef BE_TEST - sleep(config::report_olap_table_interval_seconds); + // wait disk_broken_cv awaken + // if awaken, set is_report_olap_table_already to true, it will not notify again + // if overtime, while will go to next cycle + boost::unique_lock lk(_disk_broken_lock); + if (OLAPRootPath::get_instance()->disk_broken_cv.timed_wait(lk, _wait_duration)) { + OLAPRootPath::get_instance()->is_report_olap_table_already = true; + } continue; #else return (void*)0; @@ -1606,7 +1615,13 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this) } #ifndef BE_TEST - sleep(config::report_olap_table_interval_seconds); + // wait disk_broken_cv awaken + // if awaken, set is_report_olap_table_already to true, it will not notify again + // if overtime, while will go to next cycle + boost::unique_lock lk(_disk_broken_lock); + if (OLAPRootPath::get_instance()->disk_broken_cv.timed_wait(lk, _wait_duration)) { + OLAPRootPath::get_instance()->is_report_olap_table_already = true; + } } #endif diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index f013672b28daee..8f2ed61d5818c0 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -17,6 +17,9 @@ #define BDG_PALO_BE_SRC_TASK_WORKER_POOL_H #include +#include +#include +#include #include #include #include @@ -27,6 +30,7 @@ #include "gen_cpp/HeartbeatService_types.h" #include "olap/command_executor.h" #include "olap/olap_define.h" +#include "olap/olap_rootpath.h" #include "olap/utils.h" namespace palo { @@ -147,6 +151,9 @@ class TaskWorkerPool { static MutexLock _s_running_task_user_count_lock; static FrontendServiceClientCache _master_service_client_cache; + static boost::mutex _disk_broken_lock; + static boost::posix_time::time_duration _wait_duration; + DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool); }; // class TaskWorkerPool } // namespace palo diff --git a/be/src/olap/olap_rootpath.cpp b/be/src/olap/olap_rootpath.cpp index e38069d947408b..19fe70163b0694 100644 --- a/be/src/olap/olap_rootpath.cpp +++ b/be/src/olap/olap_rootpath.cpp @@ -76,7 +76,10 @@ OLAPRootPath::OLAPRootPath() : _total_storage_medium_type_count(0), _available_storage_medium_type_count(0), _effective_cluster_id(-1), - _is_all_cluster_id_exist(true) {} + _is_all_cluster_id_exist(true), + _is_drop_tables(false), + is_report_disk_state_already(false), + is_report_olap_table_already(false) {} OLAPRootPath::~OLAPRootPath() { clear(); @@ -435,8 +438,25 @@ void OLAPRootPath::start_disk_stat_monitor() { _start_check_disks(); _detect_unused_flag(); _delete_tables_on_unused_root_path(); + + // if drop tables + // notify disk_state_worker_thread and olap_table_worker_thread until they received + if (_is_drop_tables) { + disk_broken_cv.notify_all(); + + bool is_report_disk_state_expected = true; + bool is_report_olap_table_expected = true; + bool is_report_disk_state_exchanged = + is_report_disk_state_already.compare_exchange_strong(is_report_disk_state_expected, false); + bool is_report_olap_table_exchanged = + is_report_olap_table_already.compare_exchange_strong(is_report_olap_table_expected, false); + if (is_report_disk_state_exchanged && is_report_olap_table_exchanged) { + _is_drop_tables = false; + } + } } + void OLAPRootPath::_start_check_disks() { OLAPRootPath::RootPathVec all_available_root_path; get_all_available_root_path(&all_available_root_path); @@ -940,6 +960,10 @@ void OLAPRootPath::_delete_tables_on_unused_root_path() { exit(0); } + if (!table_info_vec.empty()) { + _is_drop_tables = true; + } + OLAPEngine::get_instance()->drop_tables_on_error_root_path(table_info_vec); } diff --git a/be/src/olap/olap_rootpath.h b/be/src/olap/olap_rootpath.h index 6ecc1cfd16e3ed..a7c16664689edf 100644 --- a/be/src/olap/olap_rootpath.h +++ b/be/src/olap/olap_rootpath.h @@ -16,6 +16,8 @@ #ifndef BDG_PALO_BE_SRC_OLAP_OLAP_ROOTPATH_H #define BDG_PALO_BE_SRC_OLAP_OLAP_ROOTPATH_H +#include +#include #include #include #include @@ -130,6 +132,10 @@ class OLAPRootPath { const std::vector& root_path_vec, const std::vector& is_accessable_vec); + boost::condition_variable disk_broken_cv; + std::atomic_bool is_report_disk_state_already; + std::atomic_bool is_report_olap_table_already; + private: struct RootPathInfo { RootPathInfo(): @@ -221,6 +227,7 @@ class OLAPRootPath { int32_t _effective_cluster_id; bool _is_all_cluster_id_exist; + bool _is_drop_tables; // 错误磁盘所在百分比,超过设定的值,则engine需要退出运行 uint32_t _min_percentage_of_error_disk;