Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 55 additions & 60 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,66 +82,6 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
std::lock_guard<std::mutex> lk(_hb_mtx);

if (master_info.__isset.backend_ip) {
// master_info.backend_ip may be an IP or domain name, and it should be renamed 'backend_host', as it requires compatibility with historical versions, the name is still 'backend_ ip'
if (master_info.backend_ip != BackendOptions::get_localhost()) {
LOG(INFO) << master_info.backend_ip << " not equal to to backend localhost "
<< BackendOptions::get_localhost();
// step1: check master_info.backend_ip is IP or FQDN
if (is_valid_ip(master_info.backend_ip)) {
LOG(WARNING) << "backend ip saved in master does not equal to backend local ip"
<< master_info.backend_ip << " vs. "
<< BackendOptions::get_localhost();
std::stringstream ss;
ss << "actual backend local ip: " << BackendOptions::get_localhost();
// if master_info.backend_ip is IP,and not equal with BackendOptions::get_localhost(),return error
return Status::InternalError(ss.str());
}

//step2: resolve FQDN to IP
std::string ip;
Status status = hostname_to_ip(master_info.backend_ip, ip);
if (!status.ok()) {
std::stringstream ss;
ss << "can not get ip from fqdn: " << status.to_string();
LOG(WARNING) << ss.str();
return status;
}

//step3: get all ips of the interfaces on this machine
std::vector<InetAddress> hosts;
status = get_hosts(&hosts);
if (!status.ok() || hosts.empty()) {
std::stringstream ss;
ss << "the status was not ok when get_hosts_v4, error is " << status.to_string();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

//step4: check if the IP of FQDN belongs to the current machine and update BackendOptions._s_localhost
bool set_new_localhost = false;
for (auto& addr : hosts) {
if (addr.get_host_address() == ip) {
BackendOptions::set_localhost(master_info.backend_ip);
set_new_localhost = true;
break;
}
}

if (!set_new_localhost) {
std::stringstream ss;
ss << "the host recorded in master is " << master_info.backend_ip
<< ", but we cannot found the local ip that mapped to that host."
<< BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

LOG(WARNING) << "update localhost done, the new localhost is "
<< BackendOptions::get_localhost();
}
}

// Check cluster id
if (_master_info->cluster_id == -1) {
LOG(INFO) << "get first heartbeat. update cluster id";
Expand All @@ -163,6 +103,61 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}
}

if (master_info.__isset.backend_ip) {
// master_info.backend_ip may be an IP or domain name, and it should be renamed 'backend_host', as it requires compatibility with historical versions, the name is still 'backend_ ip'
if (master_info.backend_ip != BackendOptions::get_localhost()) {
LOG(INFO) << master_info.backend_ip << " not equal to to backend localhost "
<< BackendOptions::get_localhost();
// step1: check master_info.backend_ip is IP or FQDN
if (!is_valid_ip(master_info.backend_ip)) {
//step2: resolve FQDN to IP
std::string ip;
Status status = hostname_to_ip(master_info.backend_ip, ip);
if (!status.ok()) {
std::stringstream ss;
ss << "can not get ip from fqdn: " << status.to_string();
LOG(WARNING) << ss.str();
return status;
}

//step3: get all ips of the interfaces on this machine
std::vector<InetAddress> hosts;
status = get_hosts(&hosts);
if (!status.ok() || hosts.empty()) {
std::stringstream ss;
ss << "the status was not ok when get_hosts, error is " << status.to_string();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

//step4: check if the IP of FQDN belongs to the current machine and update BackendOptions._s_localhost
bool set_new_localhost = false;
for (auto& addr : hosts) {
if (addr.get_host_address() == ip) {
BackendOptions::set_localhost(master_info.backend_ip);
set_new_localhost = true;
break;
}
}

if (!set_new_localhost) {
std::stringstream ss;
ss << "the host recorded in master is " << master_info.backend_ip
<< ", but we cannot found the local ip that mapped to that host."
<< BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
} else {
// if is ip,not check anything,use it
BackendOptions::set_localhost(master_info.backend_ip);
}

LOG(WARNING) << "update localhost done, the new localhost is "
<< BackendOptions::get_localhost();
}
}

bool need_report = false;
if (_master_info->network_address.hostname != master_info.network_address.hostname ||
_master_info->network_address.port != master_info.network_address.port) {
Expand Down