Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ namespace config {
CONF_Int32(number_tablet_writer_threads, "16");

CONF_Int64(streaming_load_max_mb, "10240");
CONF_Int32(streaming_load_rpc_max_alive_time_sec, "600");

// Fragment thread pool
CONF_Int32(fragment_pool_thread_num, "64");
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
exit(-1);
}
_broker_mgr->init();
return _init_mem_tracker();
_init_mem_tracker();
RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker());
return Status::OK;
}

Status ExecEnv::_init_mem_tracker() {
Expand Down
48 changes: 48 additions & 0 deletions be/src/runtime/tablet_writer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class TabletsChannel {
const google::protobuf::RepeatedField<int64_t>& partition_ids,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);

time_t last_updated_time() {
return _last_updated_time;
}

private:
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& params);
Expand Down Expand Up @@ -80,6 +84,9 @@ class TabletsChannel {

// TODO(zc): to add this tracker to somewhere
MemTracker _mem_tracker;

//use to erase timeout TabletsChannel in _tablets_channels
time_t _last_updated_time;
};

TabletsChannel::~TabletsChannel() {
Expand Down Expand Up @@ -110,6 +117,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
RETURN_IF_ERROR(_open_all_writers(params));

_opened = true;
_last_updated_time = time(nullptr);
return Status::OK;
}

Expand Down Expand Up @@ -148,6 +156,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
}
}
_next_seqs[params.sender_id()]++;
_last_updated_time = time(nullptr);
return Status::OK;
}

Expand Down Expand Up @@ -315,6 +324,45 @@ Status TabletWriterMgr::cancel(const PTabletWriterCancelRequest& params) {
return Status::OK;
}

Status TabletWriterMgr::start_bg_worker() {
_tablets_channel_clean_thread = std::thread(
[this] {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif

uint32_t interval = 60;
while (true) {
_start_tablets_channel_clean();
sleep(interval);
}
});
_tablets_channel_clean_thread.detach();
return Status::OK;
}

Status TabletWriterMgr::_start_tablets_channel_clean() {
const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec;
time_t now = time(nullptr);
{
std::lock_guard<std::mutex> l(_lock);
std::vector<TabletsChannelKey> need_delete_keys;

for (auto& kv : _tablets_channels) {
time_t last_updated_time = kv.second->last_updated_time();
if (difftime(now, last_updated_time) >= max_alive_time) {
need_delete_keys.emplace_back(kv.first);
}
}

for(auto& key: need_delete_keys) {
_tablets_channels.erase(key);
LOG(INFO) << "erase timeout tablets channel: " << key;
}
}
return Status::OK;
}

std::string TabletsChannelKey::to_string() const {
std::stringstream ss;
ss << *this;
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/tablet_writer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <mutex>
#include <ostream>
#include <sstream>
#include <thread>
#include <ctime>

#include "common/status.h"
#include "gen_cpp/Types_types.h"
Expand Down Expand Up @@ -80,6 +82,8 @@ class TabletWriterMgr {
// id: stream load's id
Status cancel(const PTabletWriterCancelRequest& request);

Status start_bg_worker();

private:
ExecEnv* _exec_env;
// lock protect the channel map
Expand All @@ -92,6 +96,11 @@ class TabletWriterMgr {
TabletsChannelKeyHasher> _tablets_channels;

Cache* _lastest_success_channel = nullptr;

// thread to clean timeout tablets_channel
std::thread _tablets_channel_clean_thread;

Status _start_tablets_channel_clean();
};

std::ostream& operator<<(std::ostream& os, const TabletsChannelKey&);
Expand Down