From cec81e334313cb45e31bce2806c266348974554d Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Tue, 17 Mar 2020 22:48:05 +0800 Subject: [PATCH 1/2] [Enhanced] Make 'LastStartTime' in backends list as the actual BE start time We use 'LastStartTime' in backends list to check whether there is an unexpected restart of BE, but it will be changed as BE's first heartbeat time after FE restarted, it would be better to set it to BE's actual start time. --- be/src/agent/heartbeat_server.cpp | 22 +++++++++++-------- be/src/agent/heartbeat_server.h | 3 ++- .../java/org/apache/doris/system/Backend.java | 7 +++--- .../doris/system/BackendHbResponse.java | 8 ++++++- .../org/apache/doris/system/HeartbeatMgr.java | 7 ++++-- gensrc/thrift/HeartbeatService.thrift | 1 + 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 2157c42bb642d9..e17f2726a4e47f 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -30,6 +30,7 @@ #include "service/backend_options.h" #include "util/debug_util.h" #include "util/thrift_server.h" +#include "util/time.h" #include "runtime/heartbeat_flags.h" using std::fstream; @@ -42,8 +43,9 @@ namespace doris { HeartbeatServer::HeartbeatServer(TMasterInfo* master_info) : _master_info(master_info), - _epoch(0) { + _fe_epoch(0) { _olap_engine = StorageEngine::instance(); + _be_epoch = GetCurrentTimeMicros() / 1000; } void HeartbeatServer::init_cluster_id() { @@ -71,6 +73,7 @@ void HeartbeatServer::heartbeat( heartbeat_result.backend_info.__set_be_rpc_port(-1); heartbeat_result.backend_info.__set_brpc_port(config::brpc_port); heartbeat_result.backend_info.__set_version(get_short_version()); + heartbeat_result.backend_info.__set_be_start_time(_be_epoch); } } @@ -110,33 +113,34 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { bool need_report = false; if (_master_info->network_address.hostname != master_info.network_address.hostname || _master_info->network_address.port != master_info.network_address.port) { - if (master_info.epoch > _epoch) { + if (master_info.epoch > _fe_epoch) { _master_info->network_address.hostname = master_info.network_address.hostname; _master_info->network_address.port = master_info.network_address.port; - _epoch = master_info.epoch; + _fe_epoch = master_info.epoch; need_report = true; LOG(INFO) << "master change. new master host: " << _master_info->network_address.hostname - << ". port: " << _master_info->network_address.port << ". epoch: " << _epoch; + << ". port: " << _master_info->network_address.port << ". epoch: " << _fe_epoch; } else { LOG(WARNING) << "epoch is not greater than local. ignore heartbeat. host: " << _master_info->network_address.hostname << " port: " << _master_info->network_address.port - << " local epoch: " << _epoch << " received epoch: " << master_info.epoch; + << " local epoch: " << _fe_epoch + << " received epoch: " << master_info.epoch; return Status::InternalError("epoch is not greater than local. ignore heartbeat."); } } else { // when Master FE restarted, host and port remains the same, but epoch will be increased. - if (master_info.epoch > _epoch) { - _epoch = master_info.epoch; + if (master_info.epoch > _fe_epoch) { + _fe_epoch = master_info.epoch; need_report = true; - LOG(INFO) << "master restarted. epoch: " << _epoch; + LOG(INFO) << "master restarted. epoch: " << _fe_epoch; } } if (master_info.__isset.token) { if (!_master_info->__isset.token) { _master_info->__set_token(master_info.token); - LOG(INFO) << "get token. token: " << _master_info->token; + LOG(INFO) << "get token. token: " << _master_info->token; } else if (_master_info->token != master_info.token) { LOG(WARNING) << "invalid token. local_token:" << _master_info->token << ". token:" << master_info.token; diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index c43dea0560c084..67816e45c74467 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -55,12 +55,13 @@ class HeartbeatServer : public HeartbeatServiceIf { Status _heartbeat(const TMasterInfo& master_info); StorageEngine* _olap_engine; + int64_t _be_epoch; // mutex to protect master_info and _epoch std::mutex _hb_mtx; // Not owned. Point to the ExecEnv::_master_info TMasterInfo* _master_info; - int64_t _epoch; + int64_t _fe_epoch; DISALLOW_COPY_AND_ASSIGN(HeartbeatServer); }; // class HeartBeatServer diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 1d735eecdaf31a..b47506ea23b9e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -491,7 +491,6 @@ public void write(DataOutput out) throws IOException { out.writeBoolean(isAlive.get()); out.writeBoolean(isDecommissioned.get()); out.writeLong(lastUpdateMs); - out.writeLong(lastStartTime); ImmutableMap disks = disksRef; @@ -638,11 +637,11 @@ public boolean handleHbResponse(BackendHbResponse hbResponse) { this.lastUpdateMs = hbResponse.getHbTime(); if (!isAlive.get()) { isChanged = true; - this.lastStartTime = hbResponse.getHbTime(); - LOG.info("{} is alive, last start time: {}", this.toString(), hbResponse.getHbTime()); + this.lastStartTime = hbResponse.getBeStartTime(); + LOG.info("{} is alive, last start time: {}", this.toString(), hbResponse.getBeStartTime()); this.isAlive.set(true); } else if (this.lastStartTime <= 0) { - this.lastStartTime = hbResponse.getHbTime(); + this.lastStartTime = hbResponse.getBeStartTime(); } heartbeatErrMsg = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index 8b0c39ca29e98f..9db879012e4ae6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -31,13 +31,14 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { private int bePort; private int httpPort; private int brpcPort; + private long beStartTime; private String version = ""; public BackendHbResponse() { super(HeartbeatResponse.Type.BACKEND); } - public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, String version) { + public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime, String version) { super(HeartbeatResponse.Type.BACKEND); this.beId = beId; this.status = HbStatus.OK; @@ -45,6 +46,7 @@ public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long this.httpPort = httpPort; this.brpcPort = brpcPort; this.hbTime = hbTime; + this.beStartTime = beStartTime; this.version = version; } @@ -71,6 +73,10 @@ public int getBrpcPort() { return brpcPort; } + public long getBeStartTime() { + return beStartTime; + } + public String getVersion() { return version; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index cfcd5f7cd29c14..fab1451a967436 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -242,9 +242,12 @@ public HeartbeatResponse call() { if (tBackendInfo.isSetVersion()) { version = tBackendInfo.getVersion(); } - + long beStartTime = System.currentTimeMillis(); + if (tBackendInfo.isSetBeStartTime()) { + beStartTime = tBackendInfo.getBeStartTime(); + } // backend.updateOnce(bePort, httpPort, beRpcPort, brpcPort); - return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, System.currentTimeMillis(), version); + return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, System.currentTimeMillis(), beStartTime, version); } else { return new BackendHbResponse(backendId, result.getStatus().getErrorMsgs().isEmpty() ? "Unknown error" : result.getStatus().getErrorMsgs().get(0)); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 339d8a0a9b04b7..ef0eb3c740bb0e 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -37,6 +37,7 @@ struct TBackendInfo { 3: optional Types.TPort be_rpc_port 4: optional Types.TPort brpc_port 5: optional string version + 6: optional i64 be_start_time } struct THeartbeatResult { From 0363fd424793c2cdbeed49f0e18bc8dfdb498730 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Tue, 10 Nov 2020 14:53:03 +0800 Subject: [PATCH 2/2] fix --- .../src/main/java/org/apache/doris/system/HeartbeatMgr.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index fab1451a967436..cb63632034a874 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -242,10 +242,7 @@ public HeartbeatResponse call() { if (tBackendInfo.isSetVersion()) { version = tBackendInfo.getVersion(); } - long beStartTime = System.currentTimeMillis(); - if (tBackendInfo.isSetBeStartTime()) { - beStartTime = tBackendInfo.getBeStartTime(); - } + long beStartTime = tBackendInfo.isSetBeStartTime() ? tBackendInfo.getBeStartTime() : System.currentTimeMillis(); // backend.updateOnce(bePort, httpPort, beRpcPort, brpcPort); return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, System.currentTimeMillis(), beStartTime, version); } else {