Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 59 additions & 36 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ using std::vector;

namespace palo {

const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
const uint32_t TASK_FINISH_MAX_RETRY = 3;
const uint32_t PUSH_MAX_RETRY = 1;
const uint32_t REPORT_TASK_WORKER_COUNT = 1;
const uint32_t REPORT_DISK_STATE_WORKER_COUNT = 1;
const uint32_t REPORT_OLAP_TABLE_WORKER_COUNT = 1;
const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?";
const std::string HTTP_REQUEST_TOKEN_PARAM = "&token=";
const std::string HTTP_REQUEST_FILE_PARAM = "&file=";

std::atomic_ulong TaskWorkerPool::_s_report_version(time(NULL) * 10000);
MutexLock TaskWorkerPool::_s_task_signatures_lock;
MutexLock TaskWorkerPool::_s_running_task_user_count_lock;
Expand Down Expand Up @@ -296,7 +307,7 @@ uint32_t TaskWorkerPool::_get_next_task_index(
if (task.__isset.resource_info) {
user = task.resource_info.user;
}

if (priority == TPriority::HIGH) {
if (task.__isset.priority && task.priority == TPriority::HIGH) {
index = i;
Expand All @@ -305,7 +316,7 @@ uint32_t TaskWorkerPool::_get_next_task_index(
continue;
}
}

if (improper_users.count(user) != 0) {
continue;
}
Expand Down Expand Up @@ -337,12 +348,12 @@ uint32_t TaskWorkerPool::_get_next_task_index(
improper_users.insert(user);
}
}

if (index == -1) {
if (priority == TPriority::HIGH) {
return index;
}

index = 0;
if (tasks[0].__isset.resource_info) {
user = tasks[0].resource_info.user;
Expand All @@ -367,7 +378,7 @@ void* TaskWorkerPool::_create_table_worker_thread_callback(void* arg_this) {
TAgentTaskRequest agent_task_req;
TCreateTabletReq create_tablet_req;
{
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
while (worker_pool_this->_tasks.empty()) {
worker_pool_this->_worker_thread_condition_lock.wait();
}
Expand Down Expand Up @@ -418,7 +429,7 @@ void* TaskWorkerPool::_drop_table_worker_thread_callback(void* arg_this) {
TAgentTaskRequest agent_task_req;
TDropTabletReq drop_tablet_req;
{
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
while (worker_pool_this->_tasks.empty()) {
worker_pool_this->_worker_thread_condition_lock.wait();
}
Expand Down Expand Up @@ -639,7 +650,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) {
// Try to register to cgroups_mgr
CgroupsMgr::apply_system_cgroup();
TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this;

// gen high priority worker thread
TPriority::type priority = TPriority::NORMAL;
int32_t push_worker_count_high_priority = config::push_worker_count_high_priority;
Expand Down Expand Up @@ -676,23 +687,23 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) {
worker_pool_this->_worker_thread_condition_lock.notify();
break;
}

agent_task_req = worker_pool_this->_tasks[index];
if (agent_task_req.__isset.resource_info) {
user = agent_task_req.resource_info.user;
}
push_req = agent_task_req.push_req;
worker_pool_this->_tasks.erase(worker_pool_this->_tasks.begin() + index);
} while (0);

#ifndef BE_TEST
if (index < 0) {
// there is no high priority task in queue
sleep(1);
continue;
}
#endif

OLAP_LOG_INFO("get push task. signature: %ld, user: %s, priority: %d",
agent_task_req.signature, user.c_str(), priority);

Expand Down Expand Up @@ -734,11 +745,11 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) {
} else {
status = PALO_TASK_REQUEST_ERROR;
}

// Return result to fe
vector<string> error_msgs;
TStatus task_status;

TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(worker_pool_this->_backend);
finish_task_request.__set_task_type(agent_task_req.task_type);
Expand Down Expand Up @@ -829,7 +840,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
agent_task_req.signature);
error_msgs.push_back("clone get local root path failed.");
status = PALO_ERROR;
}
}
}

string src_file_path;
Expand Down Expand Up @@ -861,8 +872,8 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
error_msgs.push_back("load header failed.");
status = PALO_ERROR;
}
}
}

#ifndef BE_TEST
// Clean useless dir, if failed, ignore it.
if (status != PALO_SUCCESS && status != PALO_CREATE_TABLE_EXIST) {
Expand Down Expand Up @@ -910,7 +921,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
&& (tablet_info.version < clone_req.committed_version ||
(tablet_info.version == clone_req.committed_version
&& tablet_info.version_hash != clone_req.committed_version_hash))) {

// we need to check if this cloned table's version is what we expect.
// if not, maybe this is a stale remaining table which is waiting for drop.
// we drop it.
Expand All @@ -922,7 +933,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
agent_task_req.signature,
tablet_info.version, tablet_info.version_hash,
clone_req.committed_version, clone_req.committed_version_hash);

TDropTabletReq drop_req;
drop_req.tablet_id = clone_req.tablet_id;
drop_req.schema_hash = clone_req.schema_hash;
Expand All @@ -932,7 +943,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
OLAP_LOG_WARNING(
"drop stale cloned table failed! tabelt id: %ld", clone_req.tablet_id);
}

status = PALO_ERROR;
} else {
OLAP_LOG_INFO("clone get tablet info success. "
Expand Down Expand Up @@ -971,7 +982,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
#ifndef BE_TEST
}
#endif
#endif

return (void*)0;
}
Expand All @@ -985,6 +996,15 @@ AgentStatus TaskWorkerPool::_clone_copy(
vector<string>* error_msgs) {
AgentStatus status = PALO_SUCCESS;


std::string token;
{
uint32_t cluster_id = OLAPRootPath::get_instance()->effective_cluster_id();
stringstream token_stream;
token_stream << cluster_id;
token = token_stream.str();
}

for (auto src_backend : clone_req.src_backends) {
stringstream http_host_stream;
http_host_stream << "http://" << src_backend.host << ":" << src_backend.http_port;
Expand Down Expand Up @@ -1067,7 +1087,9 @@ AgentStatus TaskWorkerPool::_clone_copy(

// Get remove dir file list
FileDownloader::FileDownloaderParam downloader_param;
downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX + src_file_full_path;
downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX
+ HTTP_REQUEST_TOKEN_PARAM + token
+ HTTP_REQUEST_FILE_PARAM + src_file_full_path;
downloader_param.curl_opt_timeout = LIST_REMOTE_FILE_TIMEOUT;

#ifndef BE_TEST
Expand Down Expand Up @@ -1149,8 +1171,9 @@ AgentStatus TaskWorkerPool::_clone_copy(
// Get copy from remote
for (auto file_name : file_name_list) {
download_retry_time = 0;
downloader_param.remote_file_path =
http_host + HTTP_REQUEST_PREFIX + src_file_full_path + file_name;
downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX
+ HTTP_REQUEST_TOKEN_PARAM + token
+ HTTP_REQUEST_FILE_PARAM + src_file_full_path + file_name;
downloader_param.local_file_path = local_file_full_path + file_name;

// Get file length
Expand Down Expand Up @@ -1200,7 +1223,7 @@ AgentStatus TaskWorkerPool::_clone_copy(
status = PALO_ERROR;
break;
}

estimate_time_out = file_size / config::download_low_speed_limit_kbps / 1024;
if (estimate_time_out < config::download_low_speed_time) {
estimate_time_out = config::download_low_speed_time;
Expand Down Expand Up @@ -1240,7 +1263,7 @@ AgentStatus TaskWorkerPool::_clone_copy(
src_host->host.c_str(),
downloader_param.remote_file_path.c_str(),
signature, file_size, local_file_size);
download_status = PALO_FILE_DOWNLOAD_FAILED;
download_status = PALO_FILE_DOWNLOAD_FAILED;
} else {
chmod(downloader_param.local_file_path.c_str(), S_IRUSR | S_IWUSR);
break;
Expand Down Expand Up @@ -1287,7 +1310,7 @@ AgentStatus TaskWorkerPool::_clone_copy(
if (status == PALO_SUCCESS) {
break;
}
} // clone copy from one backend
} // clone copy from one backend
return status;
}

Expand All @@ -1302,7 +1325,7 @@ void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t
TAgentTaskRequest agent_task_req;
TStorageMediumMigrateReq storage_medium_migrate_req;
{
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
while (worker_pool_this->_tasks.empty()) {
worker_pool_this->_worker_thread_condition_lock.wait();
}
Expand Down Expand Up @@ -1410,7 +1433,7 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this)
TAgentTaskRequest agent_task_req;
TCheckConsistencyReq check_consistency_req;
{
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
lock_guard<MutexLock> worker_thread_lock(worker_pool_this->_worker_thread_lock);
while (worker_pool_this->_tasks.empty()) {
worker_pool_this->_worker_thread_condition_lock.wait();
}
Expand Down Expand Up @@ -1479,7 +1502,7 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) {
worker_pool_this->_master_info.network_address.port);
TMasterResult result;
AgentStatus status = worker_pool_this->_master_client->report(request, &result);

if (status == PALO_SUCCESS) {
OLAP_LOG_INFO("finish report task success. return code: %d",
result.status.status_code);
Expand Down Expand Up @@ -1677,7 +1700,7 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) {
worker_pool_this->_finish_task(finish_task_request);
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
#ifndef BE_TEST
}
}
#endif
return (void*)0;
}
Expand Down Expand Up @@ -1725,7 +1748,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) {
OLAP_LOG_WARNING("Write remote source info to file failed. Path: %s",
info_file_path.c_str());
}

// Get local disk to restore from olap
string local_shard_root_path;
if (status_code == TStatusCode::OK) {
Expand All @@ -1742,7 +1765,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) {
stringstream local_file_path_stream;
local_file_path_stream << local_shard_root_path << "/" << restore_request.tablet_id << "/";
string local_file_path(local_file_path_stream.str());

// Download files from remote source
if (status_code == TStatusCode::OK) {
string command = "sh " + config::trans_file_tool_path + " " + label + " download " +
Expand All @@ -1757,7 +1780,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) {
OLAP_LOG_WARNING("Download file failed. Error: %s", errmsg.c_str());
}
}

// Delete tmp file
boost::filesystem::path file_path(info_file_path);
if (boost::filesystem::exists(file_path)) {
Expand Down Expand Up @@ -1790,7 +1813,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) {
file_path_suffix != ".dat") {
continue;
}

// Get new file name
stringstream new_file_name_stream;
char sperator = '_';
Expand Down Expand Up @@ -1837,7 +1860,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) {
restore_request.schema_hash,
agent_task_req.signature,
&tablet_info);

if (get_tablet_info_status != PALO_SUCCESS) {
OLAP_LOG_WARNING("Restore success, but get new tablet info failed."
"tablet_id: %ld, schema_hash: %ld, signature: %ld.",
Expand All @@ -1861,7 +1884,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) {
worker_pool_this->_finish_task(finish_task_request);
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
#ifndef BE_TEST
}
}
#endif
return (void*)0;
}
Expand Down Expand Up @@ -1969,7 +1992,7 @@ void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) {
OLAP_LOG_INFO("release_snapshot success. snapshot_path: %s, status: %d",
snapshot_path.c_str(), release_snapshot_status);
}

task_status.__set_status_code(status_code);
task_status.__set_error_msgs(error_msgs);

Expand Down
9 changes: 0 additions & 9 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@

namespace palo {

const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
const uint32_t TASK_FINISH_MAX_RETRY = 3;
const uint32_t PUSH_MAX_RETRY = 1;
const uint32_t REPORT_TASK_WORKER_COUNT = 1;
const uint32_t REPORT_DISK_STATE_WORKER_COUNT = 1;
const uint32_t REPORT_OLAP_TABLE_WORKER_COUNT = 1;
const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?file=";

class TaskWorkerPool {
public:
enum TaskWorkerType {
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ namespace config {
// "If true, Kudu features will be disabled."
CONF_Bool(disable_kudu, "false")

// to forward compatibility, will be removed later
CONF_Bool(enable_token_check, "true");

} // namespace config

} // namespace palo
Expand Down
Loading