Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
}
}

_last_visit_time = time(nullptr);

int64_t left_time = ctx->max_interval_s;
int64_t left_rows = ctx->max_batch_rows;
int64_t left_bytes = ctx->max_batch_size;
Expand All @@ -141,7 +143,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {

LOG(INFO) << "start consumer"
<< ". max time(s): " << left_time
<< ", bath rows: " << left_rows
<< ", batch rows: " << left_rows
<< ", batch size: " << left_bytes
<< ". " << ctx->brief();

Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <ctime>
#include <mutex>

#include "librdkafka/rdkafkacpp.h"
Expand All @@ -35,7 +36,8 @@ class DataConsumer {
DataConsumer(StreamLoadContext* ctx):
_init(false),
_finished(false),
_cancelled(false) {
_cancelled(false),
_last_visit_time(0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial last_visit_time is 0 ? Will it be clean in the beginning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. the last_visit_time will be set when calling consumer's start(). And the clean thread can not see any active consumer, because active consumers are not in the pool.

}

virtual ~DataConsumer() {
Expand All @@ -56,6 +58,7 @@ class DataConsumer {
virtual bool match(StreamLoadContext* ctx) = 0;

const UniqueId& id() { return _id; }
time_t last_visit_time() { return _last_visit_time; }

protected:
UniqueId _id;
Expand All @@ -65,6 +68,7 @@ class DataConsumer {
bool _init;
bool _finished;
bool _cancelled;
time_t _last_visit_time;
};

class KafkaEventCb : public RdKafka::EventCb {
Expand Down
38 changes: 37 additions & 1 deletion be/src/runtime/routine_load/data_consumer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) {
std::unique_lock<std::mutex> l(_lock);

if (_pool.size() == _max_pool_size) {
VLOG(3) << "data consumer pool is full: " << _pool.size()
VLOG(3) << "data consumer pool is full: " << _pool.size()
<< "-" << _max_pool_size << ", discard the returned consumer: "
<< consumer->id();
return;
Expand All @@ -77,4 +77,40 @@ void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) {
return;
}

Status DataConsumerPool::start_bg_worker() {
_clean_idle_consumer_thread = std::thread(
[this] {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif

uint32_t interval = 60;
while (true) {
_clean_idle_consumer_bg();
sleep(interval);
}
});
_clean_idle_consumer_thread.detach();
return Status::OK;
}

void DataConsumerPool::_clean_idle_consumer_bg() {
const static int32_t max_idle_time_second = 600;

std::unique_lock<std::mutex> l(_lock);
time_t now = time(nullptr);

auto iter = std::begin(_pool);
while (iter != std::end(_pool)) {
if (difftime(now, (*iter)->last_visit_time()) >= max_idle_time_second) {
LOG(INFO) << "remove data consumer " << (*iter)->id()
<< ", since it last visit: " << (*iter)->last_visit_time()
<< ", now: " << now;
iter = _pool.erase(iter);
} else {
++iter;
}
}
}

} // end namespace doris
11 changes: 10 additions & 1 deletion be/src/runtime/routine_load/data_consumer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

#pragma once

#include <ctime>
#include <memory>
#include <mutex>
#include <thread>

#include "runtime/routine_load/data_consumer.h"
#include "util/lru_cache.hpp"
Expand Down Expand Up @@ -48,10 +50,17 @@ class DataConsumerPool {
// erase the specified cache
void return_consumer(std::shared_ptr<DataConsumer> consumer);

protected:
Status start_bg_worker();

private:
void _clean_idle_consumer_bg();

private:
std::mutex _lock;
std::list<std::shared_ptr<DataConsumer>> _pool;
int64_t _max_pool_size;

std::thread _clean_idle_consumer_thread;
};

} // end namespace doris
7 changes: 4 additions & 3 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ void RoutineLoadTaskExecutor::exec_task(

ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos;

// commit txn
HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed");

// return the consumer back to pool
// call this before commit txn, in case the next task can come very fast
consumer_pool->return_consumer(consumer);

// commit txn
HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed");

cb(ctx);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class RoutineLoadTaskExecutor {
_exec_env(exec_env),
_thread_pool(10, 1000),
_data_consumer_pool(10) {

_data_consumer_pool.start_bg_worker();
}

~RoutineLoadTaskExecutor() {
Expand Down