Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
93 changes: 43 additions & 50 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

#include "agent/agent_server.h"

#include <string>

#include <boost/filesystem.hpp>
#include <string>

#include "agent/task_worker_pool.h"
#include "agent/topic_subscriber.h"
#include "agent/user_resource_listener.h"
#include "common/status.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/snapshot_manager.h"
#include "runtime/etl_job_mgr.h"
Expand All @@ -35,10 +34,8 @@ using std::vector;

namespace doris {

AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
_exec_env(exec_env),
_master_info(master_info),
_topic_subscriber(new TopicSubscriber()) {
AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
: _exec_env(exec_env), _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
for (auto& path : exec_env->store_paths()) {
try {
string dpp_download_path_str = path.path + DPP_PREFIX;
Expand All @@ -55,11 +52,9 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
// to make code to be more readable.

#ifndef BE_TEST
#define CREATE_AND_START_POOL(type, pool_name) \
pool_name.reset(new TaskWorkerPool( \
TaskWorkerPool::TaskWorkerType::type, \
_exec_env, \
master_info)); \
#define CREATE_AND_START_POOL(type, pool_name) \
pool_name.reset( \
new TaskWorkerPool(TaskWorkerPool::TaskWorkerType::type, _exec_env, master_info)); \
pool_name->start();
#else
#define CREATE_AND_START_POOL(type, pool_name)
Expand Down Expand Up @@ -95,11 +90,12 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
#endif
}

AgentServer::~AgentServer() { }
AgentServer::~AgentServer() {}

// TODO(lingbin): each task in the batch may have it own status or FE must check and
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
void AgentServer::submit_tasks(TAgentResult& agent_result, const std::vector<TAgentTaskRequest>& tasks) {
void AgentServer::submit_tasks(TAgentResult& agent_result,
const std::vector<TAgentTaskRequest>& tasks) {
Status ret_st;

// TODO check master_info here if it is the same with that of heartbeat rpc
Expand All @@ -126,27 +122,24 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const std::vector<TAg

// TODO(lingbin): It still too long, divided these task types into several categories
switch (task_type) {
HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
_clear_transaction_task_workers,
clear_transaction_task_req);
HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
_storage_medium_migrate_workers,
storage_medium_migrate_req);
HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
_check_consistency_workers,
check_consistency_req);
HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
_update_tablet_meta_info_workers,
update_tablet_meta_info_req);
HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers,
clear_transaction_task_req);
HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers,
storage_medium_migrate_req);
HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY, _check_consistency_workers,
check_consistency_req);
HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers,
release_snapshot_req);
HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers,
update_tablet_meta_info_req);

case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand All @@ -155,16 +148,16 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const std::vector<TAg
"task(signature=$0) has wrong request member", signature));
break;
}
if (task.push_req.push_type == TPushType::LOAD
|| task.push_req.push_type == TPushType::LOAD_DELETE
|| task.push_req.push_type == TPushType::LOAD_V2) {
if (task.push_req.push_type == TPushType::LOAD ||
task.push_req.push_type == TPushType::LOAD_DELETE ||
task.push_req.push_type == TPushType::LOAD_V2) {
_push_workers->submit_task(task);
} else if (task.push_req.push_type == TPushType::DELETE) {
_delete_workers->submit_task(task);
} else {
ret_st = Status::InvalidArgument(strings::Substitute(
"task(signature=$0, type=$1, push_type=$2) has wrong push_type",
signature, task_type, task.push_req.push_type));
"task(signature=$0, type=$1, push_type=$2) has wrong push_type", signature,
task_type, task.push_req.push_type));
}
break;
case TTaskType::ALTER:
Expand All @@ -184,7 +177,7 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const std::vector<TAg

if (!ret_st.ok()) {
LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
<< ", task: " << task;
<< ", task: " << task;
// For now, all tasks in the batch share one status, so if any task
// was failed to submit, we can only return error to FE(even when some
// tasks have already been successfully submitted).
Expand All @@ -211,8 +204,8 @@ void AgentServer::make_snapshot(TAgentResult& t_agent_result,
LOG(WARNING) << "fail to make_snapshot. tablet_id=" << snapshot_request.tablet_id
<< ", schema_hash=" << snapshot_request.schema_hash
<< ", error_code=" << err_code;
ret_st = Status::RuntimeError(strings::Substitute(
"fail to make_snapshot. err_code=$0", err_code));
ret_st = Status::RuntimeError(
strings::Substitute("fail to make_snapshot. err_code=$0", err_code));
} else {
LOG(INFO) << "success to make_snapshot. tablet_id=" << snapshot_request.tablet_id
<< ", schema_hash=" << snapshot_request.schema_hash
Expand All @@ -233,8 +226,8 @@ void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::stri
if (err_code != OLAP_SUCCESS) {
LOG(WARNING) << "failed to release_snapshot. snapshot_path: " << snapshot_path
<< ", err_code: " << err_code;
ret_st = Status::RuntimeError(strings::Substitute(
"fail to release_snapshot. err_code=$0", err_code));
ret_st = Status::RuntimeError(
strings::Substitute("fail to release_snapshot. err_code=$0", err_code));
} else {
LOG(INFO) << "success to release_snapshot. snapshot_path=" << snapshot_path
<< ", err_code=" << err_code;
Expand Down Expand Up @@ -268,9 +261,9 @@ void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]";
}

VLOG_RPC << "success to get job state. [id=" << request.mini_load_id << ", status="
<< t_agent_result.status.status_code << ", etl_state=" << t_agent_result.etl_state
<< ", files=";
VLOG_RPC << "success to get job state. [id=" << request.mini_load_id
<< ", status=" << t_agent_result.status.status_code
<< ", etl_state=" << t_agent_result.etl_state << ", files=";
for (auto& item : t_agent_result.file_map) {
VLOG_RPC << item.first << ":" << item.second << ";";
}
Expand All @@ -282,11 +275,11 @@ void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
Status status = _exec_env->etl_job_mgr()->erase_job(request);
if (!status.ok()) {
LOG(WARNING) << "fail to delete etl files. because " << status.get_error_msg()
<< " with request " << request;
<< " with request " << request;
}

VLOG_RPC << "success to delete etl files. request=" << request;
status.to_thrift(&t_agent_result.status);
}

} // namespace doris
} // namespace doris
5 changes: 2 additions & 3 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class AgentServer {
std::unique_ptr<TopicSubscriber> _topic_subscriber;
};

} // end namespace doris

#endif // DORIS_BE_SRC_AGENT_AGENT_SERVER_H
} // end namespace doris

#endif // DORIS_BE_SRC_AGENT_AGENT_SERVER_H
Loading