From 66383612a70c4f7f42e3d325bb6ac58f95b4aae4 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Sat, 9 Mar 2019 17:24:21 +0800 Subject: [PATCH 1/4] Clean timeout tablets channel in TabletWriterMgr --- be/src/common/config.h | 1 + be/src/runtime/exec_env_init.cpp | 4 ++- be/src/runtime/tablet_writer_mgr.cpp | 40 ++++++++++++++++++++++++++++ be/src/runtime/tablet_writer_mgr.h | 13 +++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index a2dc8a16dc907d..eb9807e7199924 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -265,6 +265,7 @@ namespace config { CONF_Int32(number_tablet_writer_threads, "16"); CONF_Int64(streaming_load_max_mb, "10240"); + CONF_Int32(streaming_load_max_duration_time_sec, "3600"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num, "64"); diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 930286e51993e0..6a04f0b0b5db17 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -114,7 +114,9 @@ Status ExecEnv::_init(const std::vector& store_paths) { exit(-1); } _broker_mgr->init(); - return _init_mem_tracker(); + _init_mem_tracker(); + RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker()); + return Status::OK; } Status ExecEnv::_init_mem_tracker() { diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index de678561054395..cac070bec4cabf 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -251,6 +251,7 @@ Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) { // create a new channel.reset(new TabletsChannel(key)); _tablets_channels.insert(key, channel); + _key_time_map.emplace(key, time(nullptr)); } } RETURN_IF_ERROR(channel->open(params)); @@ -296,6 +297,7 @@ Status TabletWriterMgr::add_batch( if (finished) { std::lock_guard l(_lock); _tablets_channels.erase(key); + _key_time_map.erase(key); if (st.ok()) { auto handle = _lastest_success_channel->insert( key.to_string(), nullptr, 1, dummy_deleter); @@ -311,6 +313,44 @@ Status TabletWriterMgr::cancel(const PTabletWriterCancelRequest& params) { { std::lock_guard l(_lock); _tablets_channels.erase(key); + _key_time_map.erase(key); + } + return Status::OK; +} + +Status TabletWriterMgr::start_bg_worker() { + _tablets_channel_clean_thread = std::thread([this] { + _tablets_channel_clean_thread_callback(nullptr); + }); + return Status::OK; +} + +void* TabletWriterMgr::_tablets_channel_clean_thread_callback(void* arg) { +#ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); +#endif + uint32_t interval = 600; + while (true) { + _start_tablets_channel_clean(); + sleep(interval); + } +} + +Status TabletWriterMgr::_start_tablets_channel_clean() { + const int32_t duration_time = config::streaming_load_max_duration_time_sec; + time_t now = time(nullptr); + { + std::lock_guard l(_lock); + auto itr = _key_time_map.begin(); + while (itr != _key_time_map.end()) { + if (difftime(now, itr->second) >= duration_time) { + _tablets_channels.erase(itr->first); + LOG(INFO) << "erase timeout tablets channel: " << itr->first; + itr = _key_time_map.erase(itr); + } else { + ++itr; + } + } } return Status::OK; } diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/tablet_writer_mgr.h index ed1cb44b878d6f..0c79c646fec353 100644 --- a/be/src/runtime/tablet_writer_mgr.h +++ b/be/src/runtime/tablet_writer_mgr.h @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include "common/status.h" #include "gen_cpp/Types_types.h" @@ -80,6 +82,8 @@ class TabletWriterMgr { // id: stream load's id Status cancel(const PTabletWriterCancelRequest& request); + Status start_bg_worker(); + private: ExecEnv* _exec_env; // lock protect the channel map @@ -92,6 +96,15 @@ class TabletWriterMgr { TabletsChannelKeyHasher> _tablets_channels; Cache* _lastest_success_channel = nullptr; + + std::unordered_map _key_time_map; + + // thread to clean timeout tablets_channel + std::thread _tablets_channel_clean_thread; + + Status _start_tablets_channel_clean(); + + void* _tablets_channel_clean_thread_callback(void* arg); }; std::ostream& operator<<(std::ostream& os, const TabletsChannelKey&); From 1bae3b11c6d82f160a63a95ee0c65d7605b41853 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Mon, 11 Mar 2019 16:11:41 +0800 Subject: [PATCH 2/4] Add last_updated_time field to TabletsChannel --- be/src/common/config.h | 2 +- be/src/runtime/tablet_writer_mgr.cpp | 43 +++++++++++++++++----------- be/src/runtime/tablet_writer_mgr.h | 2 -- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index eb9807e7199924..3bbc83ce5ccefa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -265,7 +265,7 @@ namespace config { CONF_Int32(number_tablet_writer_threads, "16"); CONF_Int64(streaming_load_max_mb, "10240"); - CONF_Int32(streaming_load_max_duration_time_sec, "3600"); + CONF_Int32(streaming_load_rpc_max_alive_time_sec, "600"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num, "64"); diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index cac070bec4cabf..faee88c04d6cfa 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -47,6 +47,10 @@ class TabletsChannel { const google::protobuf::RepeatedField& partition_ids, google::protobuf::RepeatedPtrField* tablet_vec); + time_t last_updated_time() { + return _last_updated_time; + } + private: // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& params); @@ -80,6 +84,9 @@ class TabletsChannel { // TODO(zc): to add this tracker to somewhere MemTracker _mem_tracker; + + //use to erase timeout TabletsChannel in _tablets_channels + time_t _last_updated_time; }; TabletsChannel::~TabletsChannel() { @@ -110,6 +117,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { RETURN_IF_ERROR(_open_all_writers(params)); _opened = true; + _last_updated_time = time(nullptr); return Status::OK; } @@ -148,6 +156,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { } } _next_seqs[params.sender_id()]++; + _last_updated_time = time(nullptr); return Status::OK; } @@ -251,7 +260,6 @@ Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) { // create a new channel.reset(new TabletsChannel(key)); _tablets_channels.insert(key, channel); - _key_time_map.emplace(key, time(nullptr)); } } RETURN_IF_ERROR(channel->open(params)); @@ -297,7 +305,6 @@ Status TabletWriterMgr::add_batch( if (finished) { std::lock_guard l(_lock); _tablets_channels.erase(key); - _key_time_map.erase(key); if (st.ok()) { auto handle = _lastest_success_channel->insert( key.to_string(), nullptr, 1, dummy_deleter); @@ -313,15 +320,16 @@ Status TabletWriterMgr::cancel(const PTabletWriterCancelRequest& params) { { std::lock_guard l(_lock); _tablets_channels.erase(key); - _key_time_map.erase(key); } return Status::OK; } Status TabletWriterMgr::start_bg_worker() { - _tablets_channel_clean_thread = std::thread([this] { - _tablets_channel_clean_thread_callback(nullptr); - }); + _tablets_channel_clean_thread = std::thread( + [this] { + _tablets_channel_clean_thread_callback(nullptr); + }); + _tablets_channel_clean_thread.detach(); return Status::OK; } @@ -329,7 +337,7 @@ void* TabletWriterMgr::_tablets_channel_clean_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - uint32_t interval = 600; + uint32_t interval = 60; while (true) { _start_tablets_channel_clean(); sleep(interval); @@ -337,20 +345,23 @@ void* TabletWriterMgr::_tablets_channel_clean_thread_callback(void* arg) { } Status TabletWriterMgr::_start_tablets_channel_clean() { - const int32_t duration_time = config::streaming_load_max_duration_time_sec; + const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec; time_t now = time(nullptr); { std::lock_guard l(_lock); - auto itr = _key_time_map.begin(); - while (itr != _key_time_map.end()) { - if (difftime(now, itr->second) >= duration_time) { - _tablets_channels.erase(itr->first); - LOG(INFO) << "erase timeout tablets channel: " << itr->first; - itr = _key_time_map.erase(itr); - } else { - ++itr; + std::vector need_delete_keys; + + for (auto& kv : _tablets_channels) { + time_t last_updated_time = kv.second->last_updated_time(); + if (difftime(now, last_updated_time) >= max_alive_time) { + need_delete_keys.emplace_back(kv.first); } } + + for(auto key: need_delete_keys) { + _tablets_channels.erase(key); + LOG(INFO) << "erase timeout tablets channel: " << key; + } } return Status::OK; } diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/tablet_writer_mgr.h index 0c79c646fec353..6b005c9ba49d3d 100644 --- a/be/src/runtime/tablet_writer_mgr.h +++ b/be/src/runtime/tablet_writer_mgr.h @@ -97,8 +97,6 @@ class TabletWriterMgr { Cache* _lastest_success_channel = nullptr; - std::unordered_map _key_time_map; - // thread to clean timeout tablets_channel std::thread _tablets_channel_clean_thread; From cfeb51c2250507bcd8ac3ad2396dee63985a67e6 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Mon, 11 Mar 2019 18:27:39 +0800 Subject: [PATCH 3/4] Use lambda --- be/src/runtime/tablet_writer_mgr.cpp | 25 +++++++++++-------------- be/src/runtime/tablet_writer_mgr.h | 2 -- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index faee88c04d6cfa..4233795cb2115d 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -326,24 +326,21 @@ Status TabletWriterMgr::cancel(const PTabletWriterCancelRequest& params) { Status TabletWriterMgr::start_bg_worker() { _tablets_channel_clean_thread = std::thread( - [this] { - _tablets_channel_clean_thread_callback(nullptr); - }); + [this] { + #ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); + #endif + + uint32_t interval = 60; + while (true) { + _start_tablets_channel_clean(); + sleep(interval); + } + }); _tablets_channel_clean_thread.detach(); return Status::OK; } -void* TabletWriterMgr::_tablets_channel_clean_thread_callback(void* arg) { -#ifdef GOOGLE_PROFILER - ProfilerRegisterThread(); -#endif - uint32_t interval = 60; - while (true) { - _start_tablets_channel_clean(); - sleep(interval); - } -} - Status TabletWriterMgr::_start_tablets_channel_clean() { const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec; time_t now = time(nullptr); diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/tablet_writer_mgr.h index 6b005c9ba49d3d..a750258b668652 100644 --- a/be/src/runtime/tablet_writer_mgr.h +++ b/be/src/runtime/tablet_writer_mgr.h @@ -101,8 +101,6 @@ class TabletWriterMgr { std::thread _tablets_channel_clean_thread; Status _start_tablets_channel_clean(); - - void* _tablets_channel_clean_thread_callback(void* arg); }; std::ostream& operator<<(std::ostream& os, const TabletsChannelKey&); From 3ea9761dc54826f306135cfd10a2ba04453aad05 Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Mon, 11 Mar 2019 18:48:15 +0800 Subject: [PATCH 4/4] Use const reference Co-Authored-By: kangkaisen --- be/src/runtime/tablet_writer_mgr.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 4233795cb2115d..ee0693eacd74a7 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -355,7 +355,7 @@ Status TabletWriterMgr::_start_tablets_channel_clean() { } } - for(auto key: need_delete_keys) { + for(auto& key: need_delete_keys) { _tablets_channels.erase(key); LOG(INFO) << "erase timeout tablets channel: " << key; }