From b89493226ff5d633b7cee6216dd3cc395cac94fb Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 15 Dec 2023 16:16:25 +0800 Subject: [PATCH 1/2] 1 --- be/src/common/config.cpp | 2 + be/src/common/config.h | 4 ++ be/src/runtime/client_cache.cpp | 2 +- .../apache/doris/common/util/NetUtils.java | 16 ++++-- .../apache/doris/rpc/BackendServiceProxy.java | 49 +++++++++++++++---- .../java/org/apache/doris/system/Backend.java | 18 +++++-- .../org/apache/doris/system/HeartbeatMgr.java | 9 +++- 7 files changed, 82 insertions(+), 18 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 40335a3195620f..25a43a8a2bc169 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -585,6 +585,8 @@ DEFINE_mInt32(memtable_flush_running_count_limit, "2"); DEFINE_Int32(load_process_max_memory_limit_percent, "50"); // 50% +DEFINE_mInt32(thrift_client_open_num_tries, "3"); + // If the memory consumption of load jobs exceed load_process_max_memory_limit, // all load jobs will hang there to wait for memtable flush. We should have a // soft limit which can trigger the memtable flush for the load channel who diff --git a/be/src/common/config.h b/be/src/common/config.h index b7c0d315c51e65..749bfd394d6c68 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -635,6 +635,10 @@ DECLARE_mInt32(memtable_flush_running_count_limit); DECLARE_Int32(load_process_max_memory_limit_percent); // 50% +// Number of open tries, default 1 means only try to open once. +// Retry the Open num_retries time waiting 100 milliseconds between retries. +DECLARE_mInt32(thrift_client_open_num_tries); + // If the memory consumption of load jobs exceed load_process_max_memory_limit, // all load jobs will hang there to wait for memtable flush. We should have a // soft limit which can trigger the memtable flush for the load channel who diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index 3da31caf5c8922..ea7b43b6102123 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -114,7 +114,7 @@ Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport, client_impl->set_conn_timeout(config::thrift_connect_timeout_seconds * 1000); - Status status = client_impl->open(); + Status status = client_impl->open_with_retry(config::thrift_client_open_num_tries, 100); if (!status.ok()) { *client_key = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java index 0c1ac130cdea0a..bfcd7fb6a930d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java @@ -95,9 +95,19 @@ public static String getHostnameByIp(String ip) { return hostName; } - public static String getIpByHost(String host) throws UnknownHostException { - InetAddress inetAddress = InetAddress.getByName(host); - return inetAddress.getHostAddress(); + public static String getIpByHost(String host, int retryTimes) throws UnknownHostException { + InetAddress inetAddress; + while (true) { + try { + inetAddress = InetAddress.getByName(host); + return inetAddress.getHostAddress(); + } catch (UnknownHostException e) { + LOG.warn("NetUtils.getIpByHost failed, hostanme: {}, remaining retryTimes: {}", host, retryTimes); + if (retryTimes-- <= 0) { + throw e; + } + } + } } // This is the implementation is inspired by Apache camel project: diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index d78e055a1aba29..426bf4c99e0901 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -112,28 +112,23 @@ public void removeProxy(TNetworkAddress address) { } private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException { - String realIp = NetUtils.getIpByHost(address.getHostname()); BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address); - if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp) - && serviceClientExtIp.client.isNormalState()) { + if (serviceClientExtIp != null && serviceClientExtIp.client.isNormalState()) { return serviceClientExtIp.client; } - // not exist, create one and return. + // not exist or not normal state, create one and return. BackendServiceClient removedClient = null; + String realIp = NetUtils.getIpByHost(address.getHostname(), 2); lock.lock(); try { serviceClientExtIp = serviceMap.get(address); - if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) { - LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp); - serviceMap.remove(address); - removedClient = serviceClientExtIp.client; - serviceClientExtIp = null; - } if (serviceClientExtIp != null && !serviceClientExtIp.client.isNormalState()) { // At this point we cannot judge the progress of reconnecting the underlying channel. // In the worst case, it may take two minutes. But we can't stand the connection refused // for two minutes, so rebuild the channel directly. + LOG.warn("BackendServiceClient is not normal state, hostname: {}, before ip: {}, curIp: {}", + address.getHostname(), serviceClientExtIp.realIp, realIp); serviceMap.remove(address); removedClient = serviceClientExtIp.client; serviceClientExtIp = null; @@ -151,6 +146,40 @@ private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHos } } + public String checkProxyIP(TNetworkAddress address) { + String realIp; + try { + realIp = NetUtils.getIpByHost(address.getHostname(), 0); + } catch (UnknownHostException e) { + return null; // skip + } + + BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address); + if (serviceClientExtIp != null) { + if (!serviceClientExtIp.realIp.equals(realIp)) { + BackendServiceClient removedClient = null; + lock.lock(); + try { + serviceClientExtIp = serviceMap.get(address); + if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) { + LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp); + serviceMap.remove(address); + removedClient = serviceClientExtIp.client; + BackendServiceClient client = new BackendServiceClient(address, grpcThreadPool); + serviceMap.put(address, new BackendServiceClientExtIp(realIp, client)); + } + } finally { + lock.unlock(); + if (removedClient != null) { + removedClient.shutdown(); + } + } + } + } + + return realIp; + } + public Future execPlanFragmentsAsync(TNetworkAddress address, TExecPlanFragmentParamsList paramsList, boolean twoPhaseExecution) throws TException, RpcException { InternalService.PExecPlanFragmentRequest.Builder builder = diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 6e6268020c8ca9..eb13eb675b3b3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -64,6 +64,8 @@ public class Backend implements Writable { private long id; @SerializedName("host") private volatile String host; + @SerializedName("ip") + private volatile String ip; private String version; @SerializedName("heartbeatPort") @@ -145,6 +147,7 @@ public class Backend implements Writable { public Backend() { this.host = ""; + this.ip = ""; this.version = ""; this.lastUpdateMs = 0; this.lastStartTime = 0; @@ -162,6 +165,7 @@ public Backend() { public Backend(long id, String host, int heartbeatPort) { this.id = id; this.host = host; + this.ip = ""; this.version = ""; this.heartbeatPort = heartbeatPort; this.bePort = -1; @@ -221,6 +225,10 @@ public String getHost() { return host; } + public String getIP() { + return ip; + } + public String getVersion() { return version; } @@ -326,6 +334,10 @@ public void setHost(String host) { this.host = host; } + public void setIP(String ip) { + this.ip = ip; + } + public void setAlive(boolean isAlive) { this.isAlive.set(isAlive); } @@ -715,9 +727,9 @@ public boolean equals(Object obj) { @Override public String toString() { - return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() - + ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime - + ", tags: " + tagMap + "]"; + return "Backend [id=" + id + ", host=" + host + ", ip=" + ip + ", heartbeatPort=" + heartbeatPort + ", alive=" + + isAlive.get() + ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + + lastStartTime + ", tags: " + tagMap + "]"; } public String getHealthyStatus() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index e09c7f2b991a09..8fd3ebf486a686 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -27,6 +27,7 @@ import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.HbPackage; import org.apache.doris.resource.Tag; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FeDiskInfo; import org.apache.doris.service.FrontendOptions; @@ -106,7 +107,12 @@ protected void runAfterCatalogReady() { List feInfos = Env.getCurrentEnv().getFrontendInfos(); List> hbResponses = Lists.newArrayList(); // send backend heartbeat + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); for (Backend backend : nodeMgr.getIdToBackend().values()) { + String ip = proxy.checkProxyIP(new TNetworkAddress(backend.getHost(), backend.getBrpcPort())); + if (ip != null) { + backend.setIP(ip); + } BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend, feInfos); hbResponses.add(executor.submit(handler)); } @@ -219,7 +225,8 @@ public HeartbeatResponse call() { long backendId = backend.getId(); HeartbeatService.Client client = null; - TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort()); + TNetworkAddress beAddr = new TNetworkAddress( + backend.getIP().equals("") ? backend.getHost() : backend.getIP(), backend.getHeartbeatPort()); boolean ok = false; try { TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get()); From 47d3411962993e32856a9099e31b9e12279df495 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 15 Dec 2023 16:23:36 +0800 Subject: [PATCH 2/2] 2 --- be/src/common/config.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 25a43a8a2bc169..c9a7de71ab8923 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -585,7 +585,7 @@ DEFINE_mInt32(memtable_flush_running_count_limit, "2"); DEFINE_Int32(load_process_max_memory_limit_percent, "50"); // 50% -DEFINE_mInt32(thrift_client_open_num_tries, "3"); +DEFINE_mInt32(thrift_client_open_num_tries, "1"); // If the memory consumption of load jobs exceed load_process_max_memory_limit, // all load jobs will hang there to wait for memtable flush. We should have a