Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache;

TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env,
const TMasterInfo& master_info)
: _master_info(master_info),
: _name(strings::Substitute("TaskWorkerPool.$0", TYPE_STRING(_task_worker_type))),
_master_info(master_info),
_agent_utils(new AgentUtils()),
_master_client(new MasterServerClient(_master_info, &_master_service_client_cache)),
_env(env),
Expand Down Expand Up @@ -186,8 +187,7 @@ void TaskWorkerPool::start() {
}

#ifndef BE_TEST
// TODO(yingchun): need a better name
ThreadPoolBuilder(strings::Substitute("TaskWorkerPool.$0", _task_worker_type))
ThreadPoolBuilder(_name)
.set_min_threads(_worker_count)
.set_max_threads(_worker_count)
.build(&_thread_pool);
Expand Down Expand Up @@ -233,6 +233,11 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
}
}

void TaskWorkerPool::notify_thread() {
_worker_thread_condition_variable.notify_one();
LOG(INFO) << "notify task worker pool: " << _name;
}

bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) {
lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
set<int64_t>& signature_set = _s_task_signatures[task_type];
Expand Down Expand Up @@ -970,7 +975,7 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() {
<< ", signature: " << agent_task_req.signature;
status_code = TStatusCode::RUNTIME_ERROR;
} else {
LOG(INFO) << "check consistency success. status:" << res
LOG(INFO) << "check consistency success. status: " << res
<< ", signature:" << agent_task_req.signature << ", checksum:" << checksum;
}

Expand Down Expand Up @@ -1007,13 +1012,18 @@ void TaskWorkerPool::_report_task_worker_thread_callback() {

if (status != DORIS_SUCCESS) {
DorisMetrics::instance()->report_task_requests_failed->increment(1);
LOG(WARNING) << "finish report task failed. status:" << status << ", master host:"
LOG(WARNING) << "report task failed. status: " << status << ", master host: "
<< _master_info.network_address.hostname
<< "port:" << _master_info.network_address.port;
<< "port: " << _master_info.network_address.port;
} else {
LOG(INFO) << "finish report task. master host: "
<< _master_info.network_address.hostname
<< "port: " << _master_info.network_address.port;
}
} while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::report_task_interval_seconds)));
}

/// disk state report thread will report disk state at a configurable fix interval.
void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
StorageEngine::instance()->register_report_listener(this);

Expand All @@ -1029,15 +1039,11 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
continue;
}

lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
_worker_thread_condition_variable.wait();
}
// wait at most report_disk_state_interval_seconds, or being notified
_worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_disk_state_interval_seconds));
if (!_is_work) {
return;
break;
}
TAgentTaskRequest agent_task_req = _tasks.front();
_tasks.pop_front();

vector<DataDirInfo> data_dir_infos;
_env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* update */);
Expand All @@ -1062,11 +1068,14 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {

if (status != DORIS_SUCCESS) {
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:"
LOG(WARNING) << "report disk state failed. status: " << status << ", master host: "
<< _master_info.network_address.hostname
<< ", port:" << _master_info.network_address.port;
<< ", port: " << _master_info.network_address.port;
} else {
LOG(INFO) << "finish report disk state. master host: "
<< _master_info.network_address.hostname
<< ", port: " << _master_info.network_address.port;
}
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
StorageEngine::instance()->deregister_report_listener(this);
}
Expand All @@ -1088,17 +1097,12 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() {
continue;
}

lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
_worker_thread_condition_variable.wait();
}
// wait at most report_tablet_interval_seconds, or being notified
_worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_tablet_interval_seconds));
if (!_is_work) {
return;
break;
}

TAgentTaskRequest agent_task_req = _tasks.front();
_tasks.pop_front();

request.tablets.clear();
OLAPStatus report_all_tablets_info_status =
StorageEngine::instance()->tablet_manager()->report_all_tablets_info(
Expand All @@ -1117,12 +1121,14 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() {
AgentStatus status = _master_client->report(request, &result);
if (status != DORIS_SUCCESS) {
DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1);
LOG(WARNING) << "finish report olap table state failed. status:" << status
<< ", master host:"
<< _master_info.network_address.hostname
LOG(WARNING) << "report tablets failed. status: " << status
<< ", master host: " << _master_info.network_address.hostname
<< ", port:" << _master_info.network_address.port;
} else {
LOG(INFO) << "finish report tablets. master host: "
<< _master_info.network_address.hostname
<< ", port: " << _master_info.network_address.port;
}
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
StorageEngine::instance()->deregister_report_listener(this);
}
Expand Down
36 changes: 36 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ThreadPool;

class TaskWorkerPool {
public:
// You need to modify the content in TYPE_STRING at the same time,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not easy to keep order consistency some days later, how about use a function to get name of type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

enum TaskWorkerType {
CREATE_TABLE,
DROP_TABLE,
Expand Down Expand Up @@ -71,6 +72,35 @@ class TaskWorkerPool {
UPDATE_TABLET_META_INFO
};

inline const std::string TYPE_STRING(TaskWorkerType type) {
switch(type) {
case CREATE_TABLE: return "CREATE_TABLE";
case DROP_TABLE: return "DROP_TABLE";
case PUSH: return "PUSH";
case REALTIME_PUSH: return "REALTIME_PUSH";
case PUBLISH_VERSION: return "PUBLISH_VERSION";
case CLEAR_ALTER_TASK: return "CLEAR_ALTER_TASK";
case CLEAR_TRANSACTION_TASK: return "CLEAR_TRANSACTION_TASK";
case DELETE: return "DELETE";
case ALTER_TABLE: return "ALTER_TABLE";
case QUERY_SPLIT_KEY: return "QUERY_SPLIT_KEY";
case CLONE: return "CLONE";
case STORAGE_MEDIUM_MIGRATE: return "STORAGE_MEDIUM_MIGRATE";
case CHECK_CONSISTENCY: return "CHECK_CONSISTENCY";
case REPORT_TASK: return "REPORT_TASK";
case REPORT_DISK_STATE: return "REPORT_DISK_STATE";
case REPORT_OLAP_TABLE: return "REPORT_OLAP_TABLE";
case UPLOAD: return "UPLOAD";
case DOWNLOAD: return "DOWNLOAD";
case MAKE_SNAPSHOT: return "MAKE_SNAPSHOT";
case RELEASE_SNAPSHOT: return "RELEASE_SNAPSHOT";
case MOVE: return "MOVE";
case RECOVER_TABLET: return "RECOVER_TABLET";
case UPDATE_TABLET_META_INFO: return "UPDATE_TABLET_META_INFO";
default: return "Unknown";
}
}

TaskWorkerPool(
const TaskWorkerType task_worker_type,
ExecEnv* env,
Expand All @@ -89,6 +119,9 @@ class TaskWorkerPool {
// * task: the task need callback thread to do
virtual void submit_task(const TAgentTaskRequest& task);

// notify the worker. currently for task/disk/tablet report thread
void notify_thread();

private:
bool _register_task_info(const TTaskType::type task_type, int64_t signature);
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
Expand Down Expand Up @@ -135,6 +168,9 @@ class TaskWorkerPool {
bool overwrite,
std::vector<std::string>* error_msgs);

private:
std::string _name;

// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
TBackend _backend;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,7 @@ void StorageEngine::deregister_report_listener(TaskWorkerPool* listener) {
void StorageEngine::notify_listeners() {
std::lock_guard<std::mutex> l(_report_mtx);
for (auto& listener : _report_listeners) {
TAgentTaskRequest task;
listener->submit_task(task);
listener->notify_thread();
}
}

Expand Down