diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 2157c42bb642d9..e17f2726a4e47f 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -30,6 +30,7 @@ #include "service/backend_options.h" #include "util/debug_util.h" #include "util/thrift_server.h" +#include "util/time.h" #include "runtime/heartbeat_flags.h" using std::fstream; @@ -42,8 +43,9 @@ namespace doris { HeartbeatServer::HeartbeatServer(TMasterInfo* master_info) : _master_info(master_info), - _epoch(0) { + _fe_epoch(0) { _olap_engine = StorageEngine::instance(); + _be_epoch = GetCurrentTimeMicros() / 1000; } void HeartbeatServer::init_cluster_id() { @@ -71,6 +73,7 @@ void HeartbeatServer::heartbeat( heartbeat_result.backend_info.__set_be_rpc_port(-1); heartbeat_result.backend_info.__set_brpc_port(config::brpc_port); heartbeat_result.backend_info.__set_version(get_short_version()); + heartbeat_result.backend_info.__set_be_start_time(_be_epoch); } } @@ -110,33 +113,34 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { bool need_report = false; if (_master_info->network_address.hostname != master_info.network_address.hostname || _master_info->network_address.port != master_info.network_address.port) { - if (master_info.epoch > _epoch) { + if (master_info.epoch > _fe_epoch) { _master_info->network_address.hostname = master_info.network_address.hostname; _master_info->network_address.port = master_info.network_address.port; - _epoch = master_info.epoch; + _fe_epoch = master_info.epoch; need_report = true; LOG(INFO) << "master change. new master host: " << _master_info->network_address.hostname - << ". port: " << _master_info->network_address.port << ". epoch: " << _epoch; + << ". port: " << _master_info->network_address.port << ". epoch: " << _fe_epoch; } else { LOG(WARNING) << "epoch is not greater than local. ignore heartbeat. host: " << _master_info->network_address.hostname << " port: " << _master_info->network_address.port - << " local epoch: " << _epoch << " received epoch: " << master_info.epoch; + << " local epoch: " << _fe_epoch + << " received epoch: " << master_info.epoch; return Status::InternalError("epoch is not greater than local. ignore heartbeat."); } } else { // when Master FE restarted, host and port remains the same, but epoch will be increased. - if (master_info.epoch > _epoch) { - _epoch = master_info.epoch; + if (master_info.epoch > _fe_epoch) { + _fe_epoch = master_info.epoch; need_report = true; - LOG(INFO) << "master restarted. epoch: " << _epoch; + LOG(INFO) << "master restarted. epoch: " << _fe_epoch; } } if (master_info.__isset.token) { if (!_master_info->__isset.token) { _master_info->__set_token(master_info.token); - LOG(INFO) << "get token. token: " << _master_info->token; + LOG(INFO) << "get token. token: " << _master_info->token; } else if (_master_info->token != master_info.token) { LOG(WARNING) << "invalid token. local_token:" << _master_info->token << ". token:" << master_info.token; diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index c43dea0560c084..67816e45c74467 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -55,12 +55,13 @@ class HeartbeatServer : public HeartbeatServiceIf { Status _heartbeat(const TMasterInfo& master_info); StorageEngine* _olap_engine; + int64_t _be_epoch; // mutex to protect master_info and _epoch std::mutex _hb_mtx; // Not owned. Point to the ExecEnv::_master_info TMasterInfo* _master_info; - int64_t _epoch; + int64_t _fe_epoch; DISALLOW_COPY_AND_ASSIGN(HeartbeatServer); }; // class HeartBeatServer diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 1d735eecdaf31a..b47506ea23b9e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -491,7 +491,6 @@ public void write(DataOutput out) throws IOException { out.writeBoolean(isAlive.get()); out.writeBoolean(isDecommissioned.get()); out.writeLong(lastUpdateMs); - out.writeLong(lastStartTime); ImmutableMap disks = disksRef; @@ -638,11 +637,11 @@ public boolean handleHbResponse(BackendHbResponse hbResponse) { this.lastUpdateMs = hbResponse.getHbTime(); if (!isAlive.get()) { isChanged = true; - this.lastStartTime = hbResponse.getHbTime(); - LOG.info("{} is alive, last start time: {}", this.toString(), hbResponse.getHbTime()); + this.lastStartTime = hbResponse.getBeStartTime(); + LOG.info("{} is alive, last start time: {}", this.toString(), hbResponse.getBeStartTime()); this.isAlive.set(true); } else if (this.lastStartTime <= 0) { - this.lastStartTime = hbResponse.getHbTime(); + this.lastStartTime = hbResponse.getBeStartTime(); } heartbeatErrMsg = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index 8b0c39ca29e98f..9db879012e4ae6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -31,13 +31,14 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { private int bePort; private int httpPort; private int brpcPort; + private long beStartTime; private String version = ""; public BackendHbResponse() { super(HeartbeatResponse.Type.BACKEND); } - public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, String version) { + public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime, String version) { super(HeartbeatResponse.Type.BACKEND); this.beId = beId; this.status = HbStatus.OK; @@ -45,6 +46,7 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long this.httpPort = httpPort; this.brpcPort = brpcPort; this.hbTime = hbTime; + this.beStartTime = beStartTime; this.version = version; } @@ -71,6 +73,10 @@ public int getBrpcPort() { return brpcPort; } + public long getBeStartTime() { + return beStartTime; + } + public String getVersion() { return version; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index cfcd5f7cd29c14..cb63632034a874 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -242,9 +242,9 @@ public HeartbeatResponse call() { if (tBackendInfo.isSetVersion()) { version = tBackendInfo.getVersion(); } - + long beStartTime = tBackendInfo.isSetBeStartTime() ? tBackendInfo.getBeStartTime() : System.currentTimeMillis(); // backend.updateOnce(bePort, httpPort, beRpcPort, brpcPort); - return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, System.currentTimeMillis(), version); + return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, System.currentTimeMillis(), beStartTime, version); } else { return new BackendHbResponse(backendId, result.getStatus().getErrorMsgs().isEmpty() ? "Unknown error" : result.getStatus().getErrorMsgs().get(0)); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 339d8a0a9b04b7..ef0eb3c740bb0e 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -37,6 +37,7 @@ struct TBackendInfo { 3: optional Types.TPort be_rpc_port 4: optional Types.TPort brpc_port 5: optional string version + 6: optional i64 be_start_time } struct THeartbeatResult {