diff --git a/be/src/common/config.h b/be/src/common/config.h index a2dc8a16dc907d..3bbc83ce5ccefa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -265,6 +265,7 @@ namespace config { CONF_Int32(number_tablet_writer_threads, "16"); CONF_Int64(streaming_load_max_mb, "10240"); + CONF_Int32(streaming_load_rpc_max_alive_time_sec, "600"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num, "64"); diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 930286e51993e0..6a04f0b0b5db17 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -114,7 +114,9 @@ Status ExecEnv::_init(const std::vector& store_paths) { exit(-1); } _broker_mgr->init(); - return _init_mem_tracker(); + _init_mem_tracker(); + RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker()); + return Status::OK; } Status ExecEnv::_init_mem_tracker() { diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index de678561054395..ee0693eacd74a7 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -47,6 +47,10 @@ class TabletsChannel { const google::protobuf::RepeatedField& partition_ids, google::protobuf::RepeatedPtrField* tablet_vec); + time_t last_updated_time() { + return _last_updated_time; + } + private: // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& params); @@ -80,6 +84,9 @@ class TabletsChannel { // TODO(zc): to add this tracker to somewhere MemTracker _mem_tracker; + + //use to erase timeout TabletsChannel in _tablets_channels + time_t _last_updated_time; }; TabletsChannel::~TabletsChannel() { @@ -110,6 +117,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { RETURN_IF_ERROR(_open_all_writers(params)); _opened = true; + _last_updated_time = time(nullptr); return Status::OK; } @@ -148,6 +156,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { } } _next_seqs[params.sender_id()]++; + _last_updated_time = time(nullptr); return Status::OK; } @@ -315,6 +324,45 @@ Status TabletWriterMgr::cancel(const PTabletWriterCancelRequest& params) { return Status::OK; } +Status TabletWriterMgr::start_bg_worker() { + _tablets_channel_clean_thread = std::thread( + [this] { + #ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); + #endif + + uint32_t interval = 60; + while (true) { + _start_tablets_channel_clean(); + sleep(interval); + } + }); + _tablets_channel_clean_thread.detach(); + return Status::OK; +} + +Status TabletWriterMgr::_start_tablets_channel_clean() { + const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec; + time_t now = time(nullptr); + { + std::lock_guard l(_lock); + std::vector need_delete_keys; + + for (auto& kv : _tablets_channels) { + time_t last_updated_time = kv.second->last_updated_time(); + if (difftime(now, last_updated_time) >= max_alive_time) { + need_delete_keys.emplace_back(kv.first); + } + } + + for(auto& key: need_delete_keys) { + _tablets_channels.erase(key); + LOG(INFO) << "erase timeout tablets channel: " << key; + } + } + return Status::OK; +} + std::string TabletsChannelKey::to_string() const { std::stringstream ss; ss << *this; diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/tablet_writer_mgr.h index ed1cb44b878d6f..a750258b668652 100644 --- a/be/src/runtime/tablet_writer_mgr.h +++ b/be/src/runtime/tablet_writer_mgr.h @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include "common/status.h" #include "gen_cpp/Types_types.h" @@ -80,6 +82,8 @@ class TabletWriterMgr { // id: stream load's id Status cancel(const PTabletWriterCancelRequest& request); + Status start_bg_worker(); + private: ExecEnv* _exec_env; // lock protect the channel map @@ -92,6 +96,11 @@ class TabletWriterMgr { TabletsChannelKeyHasher> _tablets_channels; Cache* _lastest_success_channel = nullptr; + + // thread to clean timeout tablets_channel + std::thread _tablets_channel_clean_thread; + + Status _start_tablets_channel_clean(); }; std::ostream& operator<<(std::ostream& os, const TabletsChannelKey&);