Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,9 @@ DEFINE_Bool(enable_metric_calculator, "true");
// max consumer num in one data consumer group, for routine load
DEFINE_mInt32(max_consumer_num_per_group, "3");

// the size of thread pool for routine load task.
// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5)
DEFINE_Int32(routine_load_thread_pool_size, "10");
DEFINE_Int32(max_routine_load_thread_pool_size, "1024");

// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,9 @@ DECLARE_Bool(enable_metric_calculator);
// max consumer num in one data consumer group, for routine load
DECLARE_mInt32(max_consumer_num_per_group);

// the size of thread pool for routine load task.
// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be' (default 5)
DECLARE_Int32(routine_load_thread_pool_size);
DECLARE_Int32(max_routine_load_thread_pool_size);

// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will be cached
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
Expand Down
24 changes: 15 additions & 9 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, MetricUnit::NOUNIT);

RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
: _exec_env(exec_env),
_thread_pool(config::routine_load_thread_pool_size, config::routine_load_thread_pool_size,
"routine_load"),
_data_consumer_pool(config::routine_load_consumer_pool_size) {
: _exec_env(exec_env), _data_consumer_pool(config::routine_load_consumer_pool_size) {
REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
return _task_map.size();
Expand All @@ -79,10 +76,19 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}

Status RoutineLoadTaskExecutor::init() {
return ThreadPoolBuilder("routine_load")
.set_min_threads(0)
.set_max_threads(config::max_routine_load_thread_pool_size)
.set_max_queue_size(config::max_routine_load_thread_pool_size)
.build(&_thread_pool);
}

void RoutineLoadTaskExecutor::stop() {
DEREGISTER_HOOK_METRIC(routine_load_task_count);
_thread_pool.shutdown();
_thread_pool.join();
if (_thread_pool) {
_thread_pool->shutdown();
}
_data_consumer_pool.stop();
}

Expand Down Expand Up @@ -180,10 +186,10 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
return Status::OK();
}

if (_task_map.size() >= config::routine_load_thread_pool_size) {
if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id)
<< ", job id: " << task.job_id
<< ", queue size: " << _thread_pool.get_queue_size()
<< ", queue size: " << _thread_pool->get_queue_size()
<< ", current tasks num: " << _task_map.size();
return Status::TooManyTasks("{}_{}", UniqueId(task.id).to_string(),
BackendOptions::get_localhost());
Expand Down Expand Up @@ -253,7 +259,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
_task_map[ctx->id] = ctx;

// offer the task to thread pool
if (!_thread_pool.offer(std::bind<void>(
if (!_thread_pool->submit_func(std::bind<void>(
&RoutineLoadTaskExecutor::exec_task, this, ctx, &_data_consumer_pool,
[this](std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include <vector>

#include "runtime/routine_load/data_consumer_pool.h"
#include "util/threadpool.h"
#include "util/uid_util.h"
#include "util/work_thread_pool.hpp"

namespace doris {

Expand All @@ -51,6 +51,8 @@ class RoutineLoadTaskExecutor {

~RoutineLoadTaskExecutor();

Status init();

void stop();

// submit a routine load task
Expand Down Expand Up @@ -81,7 +83,7 @@ class RoutineLoadTaskExecutor {

private:
ExecEnv* _exec_env = nullptr;
PriorityThreadPool _thread_pool;
std::unique_ptr<ThreadPool> _thread_pool;
DataConsumerPool _data_consumer_pool;

std::mutex _lock;
Expand Down
6 changes: 4 additions & 2 deletions be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class RoutineLoadTaskExecutorTest : public testing::Test {
_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));

config::routine_load_thread_pool_size = 5;
config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}

Expand Down Expand Up @@ -93,8 +93,10 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
task.__set_kafka_load_info(k_info);

RoutineLoadTaskExecutor executor(&_env);
// submit task
Status st;
st = executor.init();
EXPECT_TRUE(st.ok());
// submit task
st = executor.submit_task(task);
EXPECT_TRUE(st.ok());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,8 +1143,8 @@ public class Config extends ConfigBase {
/**
* the max concurrent routine load task num per BE.
* This is to limit the num of routine load tasks sending to a BE, and it should also less
* than BE config 'routine_load_thread_pool_size'(default 10),
* which is the routine load task thread pool size on BE.
* than BE config 'max_routine_load_thread_pool_size'(default 1024),
* which is the routine load task thread pool max size on BE.
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_routine_load_task_num_per_be = 5;
Expand Down