diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index f96b24cc942aa7..88c19348e92111 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -133,6 +133,8 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { } } + _last_visit_time = time(nullptr); + int64_t left_time = ctx->max_interval_s; int64_t left_rows = ctx->max_batch_rows; int64_t left_bytes = ctx->max_batch_size; @@ -141,7 +143,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { LOG(INFO) << "start consumer" << ". max time(s): " << left_time - << ", bath rows: " << left_rows + << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " << ctx->brief(); diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index d71aa0d24038d7..7c4cdad0a44abb 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include "librdkafka/rdkafkacpp.h" @@ -35,7 +36,8 @@ class DataConsumer { DataConsumer(StreamLoadContext* ctx): _init(false), _finished(false), - _cancelled(false) { + _cancelled(false), + _last_visit_time(0) { } virtual ~DataConsumer() { @@ -56,6 +58,7 @@ class DataConsumer { virtual bool match(StreamLoadContext* ctx) = 0; const UniqueId& id() { return _id; } + time_t last_visit_time() { return _last_visit_time; } protected: UniqueId _id; @@ -65,6 +68,7 @@ class DataConsumer { bool _init; bool _finished; bool _cancelled; + time_t _last_visit_time; }; class KafkaEventCb : public RdKafka::EventCb { diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index f7ee3af1a79259..6d6de3777b36b4 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -64,7 +64,7 @@ void DataConsumerPool::return_consumer(std::shared_ptr consumer) { std::unique_lock l(_lock); if (_pool.size() == _max_pool_size) { - VLOG(3) << "data consumer pool is full: " << _pool.size() + VLOG(3) << "data consumer pool is full: " << _pool.size() << "-" << _max_pool_size << ", discard the returned consumer: " << consumer->id(); return; @@ -77,4 +77,40 @@ void DataConsumerPool::return_consumer(std::shared_ptr consumer) { return; } +Status DataConsumerPool::start_bg_worker() { + _clean_idle_consumer_thread = std::thread( + [this] { + #ifdef GOOGLE_PROFILER + ProfilerRegisterThread(); + #endif + + uint32_t interval = 60; + while (true) { + _clean_idle_consumer_bg(); + sleep(interval); + } + }); + _clean_idle_consumer_thread.detach(); + return Status::OK; +} + +void DataConsumerPool::_clean_idle_consumer_bg() { + const static int32_t max_idle_time_second = 600; + + std::unique_lock l(_lock); + time_t now = time(nullptr); + + auto iter = std::begin(_pool); + while (iter != std::end(_pool)) { + if (difftime(now, (*iter)->last_visit_time()) >= max_idle_time_second) { + LOG(INFO) << "remove data consumer " << (*iter)->id() + << ", since it last visit: " << (*iter)->last_visit_time() + << ", now: " << now; + iter = _pool.erase(iter); + } else { + ++iter; + } + } +} + } // end namespace doris diff --git a/be/src/runtime/routine_load/data_consumer_pool.h b/be/src/runtime/routine_load/data_consumer_pool.h index 05b63e3da73210..1d74002db6c275 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.h +++ b/be/src/runtime/routine_load/data_consumer_pool.h @@ -17,8 +17,10 @@ #pragma once +#include #include #include +#include #include "runtime/routine_load/data_consumer.h" #include "util/lru_cache.hpp" @@ -48,10 +50,17 @@ class DataConsumerPool { // erase the specified cache void return_consumer(std::shared_ptr consumer); -protected: + Status start_bg_worker(); + +private: + void _clean_idle_consumer_bg(); + +private: std::mutex _lock; std::list> _pool; int64_t _max_pool_size; + + std::thread _clean_idle_consumer_thread; }; } // end namespace doris diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index b8dd206ccaa64b..0424b98cadc2b8 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -174,12 +174,13 @@ void RoutineLoadTaskExecutor::exec_task( ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; - // commit txn - HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); - // return the consumer back to pool + // call this before commit txn, in case the next task can come very fast consumer_pool->return_consumer(consumer); + // commit txn + HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); + cb(ctx); } diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 3f78e2a26996a6..11a54ae155b0d3 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -45,6 +45,8 @@ class RoutineLoadTaskExecutor { _exec_env(exec_env), _thread_pool(10, 1000), _data_consumer_pool(10) { + + _data_consumer_pool.start_bg_worker(); } ~RoutineLoadTaskExecutor() {