Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ using std::vector;

namespace doris {

AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
: _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
MasterServerClient::create(master_info);
AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info)
: _cluster_info(cluster_info), _topic_subscriber(new TopicSubscriber()) {
MasterServerClient::create(cluster_info);

#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
Expand Down Expand Up @@ -170,7 +170,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); });

_workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
"CLONE", config::clone_worker_count, [&engine, &master_info = _master_info](auto&& task) { return clone_callback(engine, master_info, task); });
"CLONE", config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); });

_workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); });
Expand All @@ -188,13 +188,13 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); }));
"REPORT_TASK", _cluster_info, config::report_task_interval_seconds, [&cluster_info = _cluster_info] { report_task_callback(cluster_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));
"REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, [&engine, &cluster_info = _cluster_info] { report_disk_callback(engine, cluster_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); }));
"REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info] { report_tablet_callback(engine, cluster_info); }));
// clang-format on
}

Expand All @@ -217,18 +217,20 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
"DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));
"REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
[&cluster_info = _cluster_info] { report_task_callback(cluster_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds,
[&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));
"REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds,
[&engine, &cluster_info = _cluster_info] {
report_disk_callback(engine, cluster_info);
}));

if (config::enable_cloud_tablet_report) {
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,
[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info);
"REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,
[&engine, &cluster_info = _cluster_info] {
report_tablet_callback(engine, cluster_info);
}));
}
}
Expand All @@ -239,8 +241,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
const std::vector<TAgentTaskRequest>& tasks) {
Status ret_st;

// TODO check master_info here if it is the same with that of heartbeat rpc
if (_master_info.network_address.hostname.empty() || _master_info.network_address.port == 0) {
// TODO check cluster_info here if it is the same with that of heartbeat rpc
if (_cluster_info->master_fe_addr.hostname.empty() || _cluster_info->master_fe_addr.port == 0) {
Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
ret_st.to_thrift(&agent_result.status);
return;
Expand Down
8 changes: 4 additions & 4 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ class ExecEnv;
class TAgentPublishRequest;
class TAgentResult;
class TAgentTaskRequest;
class TMasterInfo;
class ClusterInfo;
class TSnapshotRequest;
class StorageEngine;
class CloudStorageEngine;

// Each method corresponds to one RPC from FE Master, see BackendService.
class AgentServer {
public:
explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
explicit AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info);
~AgentServer();

void start_workers(StorageEngine& engine, ExecEnv* exec_env);
Expand All @@ -63,8 +63,8 @@ class AgentServer {
void stop_report_workers();

private:
// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
// Reference to the ExecEnv::_cluster_info
const ClusterInfo* _cluster_info;

std::unordered_map<int64_t /* TTaskType */, std::unique_ptr<TaskWorkerPoolIf>> _workers;

Expand Down
63 changes: 39 additions & 24 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
#include "runtime/cluster_info.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/heartbeat_flags.h"
Expand All @@ -49,23 +50,23 @@ class TProcessor;

namespace doris {

HeartbeatServer::HeartbeatServer(TMasterInfo* master_info)
HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info)
: _engine(ExecEnv::GetInstance()->storage_engine()),
_master_info(master_info),
_cluster_info(cluster_info),
_fe_epoch(0) {
_be_epoch = GetCurrentTimeMicros() / 1000;
}

void HeartbeatServer::init_cluster_id() {
_master_info->cluster_id = _engine.effective_cluster_id();
_cluster_info->cluster_id = _engine.effective_cluster_id();
}

void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
const TMasterInfo& master_info) {
//print heartbeat in every minute
LOG_EVERY_N(INFO, 12) << "get heartbeat from FE."
<< "host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", rpc port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
Expand Down Expand Up @@ -108,22 +109,23 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
std::lock_guard<std::mutex> lk(_hb_mtx);

// Check cluster id
if (_master_info->cluster_id == -1) {
if (_cluster_info->cluster_id == -1) {
LOG(INFO) << "get first heartbeat. update cluster id";
// write and update cluster id
RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id));

_master_info->cluster_id = master_info.cluster_id;
_cluster_info->cluster_id = master_info.cluster_id;
LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname
<< ". port: " << master_info.network_address.port
<< ". cluster id: " << master_info.cluster_id
<< ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos);
} else {
if (_master_info->cluster_id != master_info.cluster_id) {
if (_cluster_info->cluster_id != master_info.cluster_id) {
return Status::InternalError(
"invalid cluster id. ignore. Record cluster id ={}, record frontend info {}. "
"Invalid cluster_id={}, invalid frontend info {}",
_master_info->cluster_id, PrintFrontendInfos(_master_info->frontend_infos),
_cluster_info->cluster_id,
PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()),
master_info.cluster_id, PrintFrontendInfos(master_info.frontend_infos));
}
}
Expand Down Expand Up @@ -183,22 +185,22 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

bool need_report = false;
if (_master_info->network_address.hostname != master_info.network_address.hostname ||
_master_info->network_address.port != master_info.network_address.port) {
if (_cluster_info->master_fe_addr.hostname != master_info.network_address.hostname ||
_cluster_info->master_fe_addr.port != master_info.network_address.port) {
if (master_info.epoch > _fe_epoch) {
_master_info->network_address.hostname = master_info.network_address.hostname;
_master_info->network_address.port = master_info.network_address.port;
_cluster_info->master_fe_addr.hostname = master_info.network_address.hostname;
_cluster_info->master_fe_addr.port = master_info.network_address.port;
_fe_epoch = master_info.epoch;
need_report = true;
LOG(INFO) << "master change. new master host: "
<< _master_info->network_address.hostname
<< ". port: " << _master_info->network_address.port
<< _cluster_info->master_fe_addr.hostname
<< ". port: " << _cluster_info->master_fe_addr.port
<< ". epoch: " << _fe_epoch;
} else {
return Status::InternalError(
"epoch is not greater than local. ignore heartbeat. host: {}, port: {}, local "
"epoch: {}, received epoch: {}",
_master_info->network_address.hostname, _master_info->network_address.port,
_cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
_fe_epoch, master_info.epoch);
}
} else {
Expand All @@ -211,16 +213,17 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (master_info.__isset.token) {
if (!_master_info->__isset.token) {
_master_info->__set_token(master_info.token);
LOG(INFO) << "get token. token: " << _master_info->token;
} else if (_master_info->token != master_info.token) {
return Status::InternalError("invalid token");
if (_cluster_info->token == "") {
_cluster_info->token = master_info.token;
LOG(INFO) << "get token. token: " << _cluster_info->token;
} else if (_cluster_info->token != master_info.token) {
return Status::InternalError("invalid token. local: {}, master: {}",
_cluster_info->token, master_info.token);
}
}

if (master_info.__isset.http_port) {
_master_info->__set_http_port(master_info.http_port);
_cluster_info->master_fe_http_port = master_info.http_port;
}

if (master_info.__isset.heartbeat_flags) {
Expand All @@ -229,7 +232,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (master_info.__isset.backend_id) {
_master_info->__set_backend_id(master_info.backend_id);
_cluster_info->backend_id = master_info.backend_id;
BackendOptions::set_backend_id(master_info.backend_id);
}
if (master_info.__isset.frontend_infos) {
Expand Down Expand Up @@ -281,6 +284,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
master_info.tablet_report_inactive_duration_ms;
}

if (master_info.__isset.auth_token) {
if (_cluster_info->curr_auth_token == "") {
_cluster_info->curr_auth_token = master_info.auth_token;
LOG(INFO) << "set new auth token: " << master_info.auth_token;
} else if (_cluster_info->curr_auth_token != master_info.auth_token) {
LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token
<< "set new auth token: " << master_info.auth_token;
_cluster_info->last_auth_token = _cluster_info->curr_auth_token;
_cluster_info->curr_auth_token = master_info.auth_token;
}
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
Expand All @@ -291,8 +306,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {

Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
std::unique_ptr<ThriftServer>* thrift_server,
uint32_t worker_thread_num, TMasterInfo* local_master_info) {
HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info);
uint32_t worker_thread_num, ClusterInfo* cluster_info) {
HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use auto when initializing with new to avoid duplicating the type name [modernize-use-auto]

Suggested change
HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info);
auto* heartbeat_server = new HeartbeatServer(cluster_info);

if (heartbeat_server == nullptr) {
return Status::InternalError("Get heartbeat server failed");
}
Expand Down
13 changes: 7 additions & 6 deletions be/src/agent/heartbeat_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"

namespace doris {
class ClusterInfo;
class ExecEnv;
class THeartbeatResult;
class TMasterInfo;
Expand All @@ -36,15 +37,15 @@ class ThriftServer;

class HeartbeatServer : public HeartbeatServiceIf {
public:
explicit HeartbeatServer(TMasterInfo* master_info);
explicit HeartbeatServer(ClusterInfo* cluster_info);
~HeartbeatServer() override = default;

void init_cluster_id();

// Master send heartbeat to this server
//
// Input parameters:
// * master_info: The struct of master info, contains host ip and port
// * master_info: The struct of master info, contains cluster info from Master FE
//
// Output parameters:
// * heartbeat_result: The result of heartbeat set
Expand All @@ -56,16 +57,16 @@ class HeartbeatServer : public HeartbeatServiceIf {
BaseStorageEngine& _engine;
int64_t _be_epoch;

// mutex to protect master_info and _epoch
// mutex to protect cluster_info and _epoch
std::mutex _hb_mtx;
// Not owned. Point to the ExecEnv::_master_info
TMasterInfo* _master_info = nullptr;
// Not owned. Point to the ExecEnv::_cluster_info
ClusterInfo* _cluster_info = nullptr;
int64_t _fe_epoch;

DISALLOW_COPY_AND_ASSIGN(HeartbeatServer);
}; // class HeartBeatServer

Status create_heartbeat_server(ExecEnv* exec_env, uint32_t heartbeat_server_port,
std::unique_ptr<ThriftServer>* heart_beat_server,
uint32_t worker_thread_num, TMasterInfo* local_master_info);
uint32_t worker_thread_num, ClusterInfo* cluster_info);
} // namespace doris
Loading