From e351be762afc968683d71ff1b3652500076953b9 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Sun, 23 Oct 2022 10:17:32 +0800 Subject: [PATCH 1/4] Support fqdn mode for doris be node --- docs/en/docs/admin-manual/config/fe-config.md | 12 +- .../docs/admin-manual/config/fe-config.md | 17 +- .../org/apache/doris/alter/SystemHandler.java | 34 ++-- .../apache/doris/analysis/BackendClause.java | 19 +-- .../doris/analysis/CancelAlterSystemStmt.java | 21 +-- .../doris/analysis/DropBackendClause.java | 21 +++ .../java/org/apache/doris/catalog/Env.java | 7 + .../java/org/apache/doris/common/Config.java | 9 + .../org/apache/doris/common/FeConstants.java | 1 + .../doris/common/proc/BackendsProcDir.java | 6 +- .../apache/doris/deploy/DeployManager.java | 12 +- .../httpv2/rest/CheckDecommissionAction.java | 11 +- .../java/org/apache/doris/system/Backend.java | 63 ++----- .../org/apache/doris/system/FQDNManager.java | 73 ++++++++ .../org/apache/doris/system/HeartbeatMgr.java | 1 - .../doris/system/SystemInfoService.java | 159 +++++++++++------- .../doris/cluster/SystemInfoServiceTest.java | 12 +- .../apache/doris/system/FQDNManagerTest.java | 87 ++++++++++ 18 files changed, 397 insertions(+), 168 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 339e4d836b500a..62ae0023d6e6b4 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2346,8 +2346,6 @@ Default: 3 Is it possible to dynamically configure: true -Is it a configuration item unique to the Master FE node: true - ### `enable_storage_policy` Whether to enable the Storage Policy feature. This feature allows users to separate hot and cold data. This feature is still under development. Recommended for test environments only. @@ -2358,3 +2356,13 @@ Is it possible to dynamically configure: true Is it a configuration item unique to the Master FE node: true +### `enable_fqdn_mode` + +This configuration is mainly used in the k8s cluster environment. When enable_fqdn_mode is true, the name of the pod where the be is located will remain unchanged after reconstruction, while the ip can be changed. + +Default: false + +Is it possible to dynamically configure: false + +Is it a configuration item unique to the Master FE node: true + diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index bf211caa1d0813..164746bb39f1bd 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2259,7 +2259,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 ### backend_rpc_timeout_ms - FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。 +FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。 默认值:60000 @@ -2278,10 +2278,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 是否为 Master FE 节点独有的配置项:false +### enable_fqdn_mode - FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。 +此配置用于 k8s 部署环境。当 enable_k8s_detect_container_drift_mode 为 true 时,将允许更改 be 或 broker 的重建 pod的 ip。 -默认值:60000 +默认值: false 是否可以动态配置:false @@ -2340,7 +2341,6 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 是否为 Master FE 节点独有的配置项:true - ### `max_replica_count_when_schema_change` OlapTable在做schema change时,允许的最大副本数,副本数过大会导致FE OOM。 @@ -2414,3 +2414,12 @@ hive partition 的最大缓存数量。 是否为 Master FE 节点独有的配置项:true +### `enable_fqdn_mode` + +此配置用于 k8s 部署环境。当 enable_fqdn_mode 为 true 时,将允许更改 be 的重建 pod的 ip。 + +默认值: false + +是否可以动态配置:false + +是否为 Master FE 节点独有的配置项:true diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 8b6dcd70bad6a5..35083983facb5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -38,7 +38,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.Backend; @@ -48,6 +47,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -125,7 +125,7 @@ public synchronized void process(List alterClauses, String clusterN if (!Strings.isNullOrEmpty(destClusterName) && Env.getCurrentEnv().getCluster(destClusterName) == null) { throw new DdlException("Cluster: " + destClusterName + " does not exist."); } - Env.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(), + Env.getCurrentSystemInfo().addBackends(addBackendClause.getIpHostPortTriples(), addBackendClause.isFree(), addBackendClause.getDestCluster(), addBackendClause.getTagMap()); } else if (alterClause instanceof DropBackendClause) { // drop backend @@ -136,7 +136,7 @@ public synchronized void process(List alterClauses, String clusterN + "All data on this backend will be discarded permanently. " + "If you insist, use DROPP instead of DROP"); } - Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostPortPairs()); + Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getIpHostPortTriples()); } else if (alterClause instanceof DecommissionBackendClause) { // decommission DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause; @@ -197,7 +197,7 @@ private boolean checkTablets(Long beId, List backendTabletIds) { private List checkDecommission(DecommissionBackendClause decommissionBackendClause) throws DdlException { - return checkDecommission(decommissionBackendClause.getHostPortPairs()); + return checkDecommission(decommissionBackendClause.getIpHostPortTriples()); } /* @@ -206,15 +206,18 @@ private List checkDecommission(DecommissionBackendClause decommissionBa * 2. after decommission, the remaining backend num should meet the replication num. * 3. after decommission, The remaining space capacity can store data on decommissioned backends. */ - public static List checkDecommission(List> hostPortPairs) + public static List checkDecommission(List> ipHostPortTriples) throws DdlException { SystemInfoService infoService = Env.getCurrentSystemInfo(); List decommissionBackends = Lists.newArrayList(); // check if exist - for (Pair pair : hostPortPairs) { - Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second); + for (Triple triple : ipHostPortTriples) { + Backend backend = infoService.getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), + triple.getRight()); if (backend == null) { - throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); + throw new DdlException("Backend does not exist[" + + (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) + + ":" + triple.getRight() + "]"); } if (backend.isDecommissioned()) { // already under decommission, ignore it @@ -232,22 +235,23 @@ public static List checkDecommission(List> hostPo @Override public synchronized void cancel(CancelStmt stmt) throws DdlException { CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt; - cancelAlterSystemStmt.getHostPortPairs(); - SystemInfoService infoService = Env.getCurrentSystemInfo(); // check if backends is under decommission List backends = Lists.newArrayList(); - List> hostPortPairs = cancelAlterSystemStmt.getHostPortPairs(); - for (Pair pair : hostPortPairs) { + List> ipHostPortTriples = cancelAlterSystemStmt.getIpHostPortTriples(); + for (Triple triple : ipHostPortTriples) { // check if exist - Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second); + Backend backend = infoService.getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), + triple.getRight()); if (backend == null) { - throw new DdlException("Backend does not exists[" + pair.first + "]"); + throw new DdlException("Backend does not exist[" + + (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) + + ":" + triple.getRight() + "]"); } if (!backend.isDecommissioned()) { // it's ok. just log - LOG.info("backend is not decommissioned[{}]", pair.first); + LOG.info("backend is not decommissioned[{}]", backend.getId()); continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java index 86f6380c7d7a05..a89d0a86c4d6d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java @@ -19,19 +19,19 @@ import org.apache.doris.alter.AlterOpType; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; import org.apache.doris.system.SystemInfoService; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.tuple.Triple; -import java.util.LinkedList; import java.util.List; import java.util.Map; public class BackendClause extends AlterClause { protected List hostPorts; - protected List> hostPortPairs; + protected List> ipHostPortTriples; public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. " + "You can set 'enable_multi_tags=true' in fe.conf to enable this feature."; @@ -41,21 +41,20 @@ public class BackendClause extends AlterClause { protected BackendClause(List hostPorts) { super(AlterOpType.ALTER_OTHER); this.hostPorts = hostPorts; - this.hostPortPairs = new LinkedList>(); + this.ipHostPortTriples = Lists.newArrayList(); } - public List> getHostPortPairs() { - return hostPortPairs; + public List> getIpHostPortTriples() { + return ipHostPortTriples; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { for (String hostPort : hostPorts) { - Pair pair = SystemInfoService.validateHostAndPort(hostPort); - hostPortPairs.add(pair); + Triple triple = SystemInfoService.getIpHostAndPort(hostPort, true); + ipHostPortTriples.add(triple); } - - Preconditions.checkState(!hostPortPairs.isEmpty()); + Preconditions.checkState(!ipHostPortTriples.isEmpty()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java index 4e548c72f8d43f..29ec348baa111a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java @@ -18,36 +18,37 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; +import org.apache.doris.common.Config; import org.apache.doris.system.SystemInfoService; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.Triple; -import java.util.LinkedList; import java.util.List; public class CancelAlterSystemStmt extends CancelStmt { protected List hostPorts; - private List> hostPortPairs; + private List> ipHostPortTriples; public CancelAlterSystemStmt(List hostPorts) { this.hostPorts = hostPorts; - this.hostPortPairs = new LinkedList>(); + this.ipHostPortTriples = Lists.newArrayList(); } - public List> getHostPortPairs() { - return hostPortPairs; + public List> getIpHostPortTriples() { + return ipHostPortTriples; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { for (String hostPort : hostPorts) { - Pair pair = SystemInfoService.validateHostAndPort(hostPort); - this.hostPortPairs.add(pair); + Triple triple = SystemInfoService.getIpHostAndPort(hostPort, + !Config.enable_fqdn_mode); + this.ipHostPortTriples.add(triple); } - - Preconditions.checkState(!this.hostPortPairs.isEmpty()); + Preconditions.checkState(!this.ipHostPortTriples.isEmpty()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java index 146b4f88ab621b..87b2f642e89dbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java @@ -17,6 +17,13 @@ package org.apache.doris.analysis; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.tuple.Triple; + import java.util.List; public class DropBackendClause extends BackendClause { @@ -36,6 +43,20 @@ public boolean isForce() { return force; } + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (Config.enable_fqdn_mode) { + for (String hostPort : hostPorts) { + Triple triple = SystemInfoService.getIpHostAndPort(hostPort, + !Config.enable_fqdn_mode); + ipHostPortTriples.add(triple); + } + Preconditions.checkState(!ipHostPortTriples.isEmpty()); + } else { + super.analyze(analyzer); + } + } + @Override public String toSql() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index df08131e4229a8..f57f262526e67a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -216,6 +216,7 @@ import org.apache.doris.statistics.StatisticsRepository; import org.apache.doris.statistics.StatisticsTaskScheduler; import org.apache.doris.system.Backend; +import org.apache.doris.system.FQDNManager; import org.apache.doris.system.Frontend; import org.apache.doris.system.HeartbeatMgr; import org.apache.doris.system.SystemInfoService; @@ -446,6 +447,8 @@ public class Env { private ExternalMetaCacheMgr extMetaCacheMgr; + private FQDNManager fqdnManager; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -647,6 +650,7 @@ private Env(boolean isCheckpointCatalog) { this.analysisJobScheduler = new AnalysisJobScheduler(); this.statisticsCache = new StatisticsCache(); this.extMetaCacheMgr = new ExternalMetaCacheMgr(); + this.fqdnManager = new FQDNManager(systemInfo); } public static void destroyCheckpoint() { @@ -1431,6 +1435,9 @@ private void startMasterOnlyDaemonThreads() { this.statisticsJobScheduler.start(); this.statisticsTaskScheduler.start(); new InternalSchemaInitializer().start(); + if (Config.enable_fqdn_mode) { + fqdnManager.start(); + } } // start threads that should running on all FE diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index c424350946daf5..08b919b05105c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1925,5 +1925,14 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean enable_storage_policy = false; + + /** + * This config is mainly used in the k8s cluster environment. + * When enable_fqdn_mode is true, the name of the pod where be is located will remain unchanged + * after reconstruction, while the ip can be changed. + */ + @ConfField(mutable = false, masterOnly = true) + public static boolean enable_fqdn_mode = false; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index ee5b43284432ff..1f53f699e29c4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,6 +35,7 @@ public class FeConstants { public static int heartbeat_interval_second = 5; public static int checkpoint_interval_second = 60; // 1 minutes + public static int ip_check_interval_second = 5; // dpp version public static String dpp_version = "3_2_0"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index 0056d2297fbe93..d02a74e06fa00b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -119,7 +119,11 @@ public static List> getClusterBackendInfos(String clusterName) { backendInfo.add(backend.getOwnerClusterName()); backendInfo.add(backend.getHost()); if (Strings.isNullOrEmpty(clusterName)) { - backendInfo.add(NetUtils.getHostnameByIp(backend.getHost())); + if (backend.getHostName() != null) { + backendInfo.add(backend.getHostName()); + } else { + backendInfo.add(NetUtils.getHostnameByIp(backend.getHost())); + } backendInfo.add(String.valueOf(backend.getHeartbeatPort())); backendInfo.add(String.valueOf(backend.getBePort())); backendInfo.add(String.valueOf(backend.getHttpPort())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java index 33806e0e594e31..1adc65acafc928 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java @@ -24,6 +24,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -34,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -552,7 +554,7 @@ private boolean inspectNodeChange(List> remoteHosts, env.dropFrontend(FrontendNodeType.OBSERVER, localIp, localPort); break; case BACKEND: - Env.getCurrentSystemInfo().dropBackend(localIp, localPort); + Env.getCurrentSystemInfo().dropBackend(localIp, null, localPort); break; default: break; @@ -585,8 +587,12 @@ private boolean inspectNodeChange(List> remoteHosts, env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remotePort); break; case BACKEND: - List> newBackends = Lists.newArrayList(); - newBackends.add(Pair.of(remoteIp, remotePort)); + List> newBackends = Lists.newArrayList(); + String hostName = NetUtils.getHostnameByIp(remoteIp); + if (hostName.equals(remoteIp)) { + hostName = null; + } + newBackends.add(Triple.of(remoteIp, hostName, remotePort)); Env.getCurrentSystemInfo().addBackends(newBackends, false); break; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java index a2fded94816ca7..25421154870937 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java @@ -20,7 +20,6 @@ import org.apache.doris.alter.SystemHandler; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.Pair; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -29,6 +28,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.Triple; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @@ -70,19 +70,18 @@ public Object execute(HttpServletRequest request, HttpServletResponse response) return ResponseEntityBuilder.badRequest("No host:port specified"); } - List> hostPortPairs = Lists.newArrayList(); + List> ipHostPortTriples = Lists.newArrayList(); for (String hostPort : hostPortArr) { - Pair pair; try { - pair = SystemInfoService.validateHostAndPort(hostPort); + Triple triple = SystemInfoService.getIpHostAndPort(hostPort, true); + ipHostPortTriples.add(triple); } catch (AnalysisException e) { return ResponseEntityBuilder.badRequest(e.getMessage()); } - hostPortPairs.add(pair); } try { - List backends = SystemHandler.checkDecommission(hostPortPairs); + List backends = SystemHandler.checkDecommission(ipHostPortTriples); List backendsList = backends.stream().map(b -> b.getHost() + ":" + b.getHeartbeatPort()).collect(Collectors.toList()); return ResponseEntityBuilder.ok(backendsList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b102f3ed88813c..a38c755baad1a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -70,7 +69,9 @@ public enum BackendState { @SerializedName("id") private long id; @SerializedName("host") - private String host; + private volatile String host; + @SerializedName("hostName") + private String hostName; private String version; @SerializedName("heartbeatPort") @@ -160,9 +161,14 @@ public Backend() { this.tagMap.put(locationTag.type, locationTag.value); } - public Backend(long id, String host, int heartbeatPort) { + public Backend(long id, String ip, int heartbeatPort) { + this(id, ip, null, heartbeatPort); + } + + public Backend(long id, String ip, String hostName, int heartbeatPort) { this.id = id; - this.host = host; + this.host = ip; + this.hostName = hostName; this.version = ""; this.heartbeatPort = heartbeatPort; this.bePort = -1; @@ -189,6 +195,10 @@ public String getHost() { return host; } + public String getHostName() { + return hostName; + } + public String getVersion() { return version; } @@ -278,6 +288,10 @@ public void setBackendState(BackendState state) { this.backendState = state.ordinal(); } + public void setHost(String host) { + this.host = host; + } + public void setAlive(boolean isAlive) { this.isAlive.set(isAlive); } @@ -350,15 +364,6 @@ public int getHeartbeatFailureCounter() { return heartbeatFailureCounter; } - /** - * backend belong to some cluster - * - * @return - */ - public boolean isUsedByCluster() { - return this.backendState == BackendState.using.ordinal(); - } - /** * backend is free, and it isn't belong to any cluster * @@ -368,16 +373,6 @@ public boolean isFreeFromCluster() { return this.backendState == BackendState.free.ordinal(); } - /** - * backend execute discommission in cluster , and backendState will be free - * finally - * - * @return - */ - public boolean isOffLineFromCluster() { - return this.backendState == BackendState.offline.ordinal(); - } - public ImmutableMap getDisks() { return this.disksRef; } @@ -480,15 +475,6 @@ public boolean diskExceedLimit() { return exceedLimit; } - public String getPathByPathHash(long pathHash) { - for (DiskInfo diskInfo : disksRef.values()) { - if (diskInfo.getPathHash() == pathHash) { - return diskInfo.getRootPath(); - } - } - return null; - } - public void updateDisks(Map backendDisks) { ImmutableMap disks = disksRef; // The very first time to init the path info @@ -800,17 +786,4 @@ public Map getTagMap() { public String getTagMapString() { return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; } - - /** - * Get Tag by type, return Optional.empty if no such tag with given type - * - * @param type - * @return - */ - public Optional getTagByType(String type) { - if (!tagMap.containsKey(type)) { - return Optional.empty(); - } - return Optional.of(Tag.createNotCheck(type, tagMap.get(type))); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java b/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java new file mode 100644 index 00000000000000..7bf8bc43a14d02 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.system; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.thrift.TNetworkAddress; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class FQDNManager extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(FQDNManager.class); + + private SystemInfoService nodeMgr; + + public FQDNManager(SystemInfoService nodeMgr) { + super("FQDN mgr", FeConstants.ip_check_interval_second * 1000L); + this.nodeMgr = nodeMgr; + } + + /** + * At each round: check if ip of be has already been changed + */ + @Override + protected void runAfterCatalogReady() { + for (Backend be : nodeMgr.getIdToBackend().values()) { + if (be.getHostName() != null) { + try { + InetAddress inetAddress = InetAddress.getByName(be.getHostName()); + if (!be.getHost().equalsIgnoreCase(inetAddress.getHostAddress())) { + String ip = be.getHost(); + if (!ip.equalsIgnoreCase("unknown")) { + ClientPool.backendPool.clearPool(new TNetworkAddress(ip, be.getBePort())); + } + be.setHost(inetAddress.getHostAddress()); + Env.getCurrentEnv().getEditLog().logBackendStateChange(be); + LOG.warn("ip for {} of be has been changed from {} to {}", be.getHostName(), ip, be.getHost()); + } + } catch (UnknownHostException e) { + LOG.warn("unknown host name for be, {}", be.getHostName(), e); + if (!be.isAlive() && !be.getHost().equalsIgnoreCase("unknown")) { + String ip = be.getHost(); + ClientPool.backendPool.clearPool(new TNetworkAddress(ip, be.getBePort())); + be.setHost("unknown"); + Env.getCurrentEnv().getEditLog().logBackendStateChange(be); + LOG.warn("ip for {} of be has been changed from {} to {}", be.getHostName(), ip, "unknown"); + } + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 234ebf3eec3971..a938488aa7a726 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -145,7 +145,6 @@ protected void runAfterCatalogReady() { } } catch (InterruptedException | ExecutionException e) { LOG.warn("got exception when doing heartbeat", e); - continue; } } // end for all results diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 17e121eefb0795..8c3af9f5d785cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -31,6 +31,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend.BackendState; @@ -44,6 +45,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -95,27 +97,33 @@ public int compare(List list1, List list2) { }; // for deploy manager - public void addBackends(List> hostPortPairs, boolean isFree) throws UserException { - addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap()); + public void addBackends(List> ipHostPortTriples, boolean isFree) + throws UserException { + addBackends(ipHostPortTriples, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap()); } /** - * @param hostPortPairs : backend's host and port + * @param ipHostPortTriples : backend's ip, hostName and port * @param isFree : if true the backend is not owned by any cluster * @param destCluster : if not null or empty backend will be added to destCluster * @throws DdlException */ - public void addBackends(List> hostPortPairs, boolean isFree, String destCluster, + public void addBackends(List> ipHostPortTriples, boolean isFree, String destCluster, Map tagMap) throws UserException { - for (Pair pair : hostPortPairs) { + for (Triple triple : ipHostPortTriples) { + if (Config.enable_fqdn_mode && triple.getMiddle() == null) { + throw new DdlException("backend's hostName should not be null while enable_fqdn_mode is true"); + } // check is already exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) { - throw new DdlException("Same backend already exists[" + pair.first + ":" + pair.second + "]"); + if (getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), triple.getRight()) != null) { + String backendIdentifier = (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) + ":" + + triple.getRight(); + throw new DdlException("Same backend already exists[" + backendIdentifier + "]"); } } - for (Pair pair : hostPortPairs) { - addBackend(pair.first, pair.second, isFree, destCluster, tagMap); + for (Triple triple : ipHostPortTriples) { + addBackend(triple.getLeft(), triple.getMiddle(), triple.getRight(), isFree, destCluster, tagMap); } } @@ -136,9 +144,9 @@ private void setBackendOwner(Backend backend, String clusterName) { } // Final entry of adding backend - private void addBackend(String host, int heartbeatPort, boolean isFree, String destCluster, + private void addBackend(String ip, String hostName, int heartbeatPort, boolean isFree, String destCluster, Map tagMap) { - Backend newBackend = new Backend(Env.getCurrentEnv().getNextId(), host, heartbeatPort); + Backend newBackend = new Backend(Env.getCurrentEnv().getNextId(), ip, hostName, heartbeatPort); // update idToBackend Map copiedBackends = Maps.newHashMap(idToBackendRef); copiedBackends.put(newBackend.getId(), newBackend); @@ -172,16 +180,17 @@ private void addBackend(String host, int heartbeatPort, boolean isFree, String d MetricRepo.generateBackendsTabletMetrics(); } - public void dropBackends(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { + public void dropBackends(List> ipHostPortTriples) throws DdlException { + for (Triple triple : ipHostPortTriples) { // check is already exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]"); + if (getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), triple.getRight()) == null) { + String backendIdentifier = (triple.getLeft() == null ? triple.getMiddle() : triple.getLeft()) + ":" + + triple.getRight(); + throw new DdlException("backend does not exists[" + backendIdentifier + "]"); } } - - for (Pair pair : hostPortPairs) { - dropBackend(pair.first, pair.second); + for (Triple triple : ipHostPortTriples) { + dropBackend(triple.getLeft(), triple.getMiddle(), triple.getRight()); } } @@ -192,17 +201,16 @@ public void dropBackend(long backendId) throws DdlException { throw new DdlException("Backend[" + backendId + "] does not exist"); } - dropBackend(backend.getHost(), backend.getHeartbeatPort()); + dropBackend(backend.getHost(), backend.getHostName(), backend.getHeartbeatPort()); } // final entry of dropping backend - public void dropBackend(String host, int heartbeatPort) throws DdlException { - if (getBackendWithHeartbeatPort(host, heartbeatPort) == null) { - throw new DdlException("backend does not exists[" + host + ":" + heartbeatPort + "]"); + public void dropBackend(String ip, String hostName, int heartbeatPort) throws DdlException { + Backend droppedBackend = getBackendWithHeartbeatPort(ip, hostName, heartbeatPort); + if (droppedBackend == null) { + throw new DdlException("backend does not exists[" + (ip == null ? hostName : ip) + + ":" + heartbeatPort + "]"); } - - Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); - // update idToBackend Map copiedBackends = Maps.newHashMap(idToBackendRef); copiedBackends.remove(droppedBackend.getId()); @@ -274,10 +282,16 @@ public boolean checkBackendAlive(long backendId) { return true; } - public Backend getBackendWithHeartbeatPort(String host, int heartPort) { + public Backend getBackendWithHeartbeatPort(String ip, String hostName, int heartPort) { ImmutableMap idToBackend = idToBackendRef; for (Backend backend : idToBackend.values()) { - if (backend.getHost().equals(host) && backend.getHeartbeatPort() == heartPort) { + if (hostName != null) { + if (hostName.equals(backend.getHostName()) && backend.getHeartbeatPort() == heartPort) { + return backend; + } + } + + if (backend.getHost().equals(ip) && backend.getHeartbeatPort() == heartPort) { return backend; } } @@ -901,7 +915,8 @@ public void clear() { this.idToReportVersionRef = null; } - public static Pair validateHostAndPort(String hostPort) throws AnalysisException { + public static Triple getIpHostAndPort(String hostPort, boolean strictCheck) + throws AnalysisException { hostPort = hostPort.replaceAll("\\s+", ""); if (hostPort.isEmpty()) { throw new AnalysisException("Invalid host port: " + hostPort); @@ -912,38 +927,50 @@ public static Pair validateHostAndPort(String hostPort) throws throw new AnalysisException("Invalid host port: " + hostPort); } - String host = pair[0]; - if (Strings.isNullOrEmpty(host)) { + String hostName = pair[0]; + String ip = hostName; + if (Strings.isNullOrEmpty(hostName)) { throw new AnalysisException("Host is null"); } int heartbeatPort = -1; try { - // validate host - if (!InetAddressValidator.getInstance().isValid(host)) { - // maybe this is a hostname - // if no IP address for the host could be found, 'getByName' - // will throw - // UnknownHostException - InetAddress inetAddress = InetAddress.getByName(host); - host = inetAddress.getHostAddress(); - } - // validate port heartbeatPort = Integer.parseInt(pair[1]); - if (heartbeatPort <= 0 || heartbeatPort >= 65536) { throw new AnalysisException("Port is out of range: " + heartbeatPort); } - return Pair.of(host, heartbeatPort); + // validate host + if (!InetAddressValidator.getInstance().isValid(ip)) { + // maybe this is a hostname + // if no IP address for the host could be found, 'getByName' + // will throw UnknownHostException + InetAddress inetAddress = InetAddress.getByName(hostName); + ip = inetAddress.getHostAddress(); + } else { + hostName = NetUtils.getHostnameByIp(ip); + if (hostName.equals(ip)) { + hostName = null; + } + } + return Triple.of(ip, hostName, heartbeatPort); } catch (UnknownHostException e) { + if (!strictCheck) { + return Triple.of(null, hostName, heartbeatPort); + } throw new AnalysisException("Unknown host: " + e.getMessage()); } catch (Exception e) { throw new AnalysisException("Encounter unknown exception: " + e.getMessage()); } } + + public static Pair validateHostAndPort(String hostPort) throws AnalysisException { + Triple ipHostPortTriple = getIpHostAndPort(hostPort, true); + return Pair.of(ipHostPortTriple.getLeft(), ipHostPortTriple.getRight()); + } + public void replayAddBackend(Backend newBackend) { // update idToBackend Map copiedBackends = Maps.newHashMap(idToBackendRef); @@ -996,25 +1023,25 @@ public void replayDropBackend(Backend backend) { public void updateBackendState(Backend be) { long id = be.getId(); Backend memoryBe = getBackend(id); - if (memoryBe == null) { - // backend may already be dropped. this may happen when - // 1. SystemHandler drop the decommission backend - // 2. at same time, user try to cancel the decommission of that backend. - // These two operations do not guarantee the order. - return; - } - memoryBe.setBePort(be.getBePort()); - memoryBe.setAlive(be.isAlive()); - memoryBe.setDecommissioned(be.isDecommissioned()); - memoryBe.setHttpPort(be.getHttpPort()); - memoryBe.setBeRpcPort(be.getBeRpcPort()); - memoryBe.setBrpcPort(be.getBrpcPort()); - memoryBe.setLastUpdateMs(be.getLastUpdateMs()); - memoryBe.setLastStartTime(be.getLastStartTime()); - memoryBe.setDisks(be.getDisks()); - memoryBe.setBackendState(be.getBackendState()); - memoryBe.setOwnerClusterName(be.getOwnerClusterName()); - memoryBe.setDecommissionType(be.getDecommissionType()); + // backend may already be dropped. this may happen when + // drop and modify operations do not guarantee the order. + if (memoryBe != null) { + if (be.getHostName() != null && !be.getHost().equalsIgnoreCase(memoryBe.getHost())) { + memoryBe.setHost(be.getHost()); + } + memoryBe.setBePort(be.getBePort()); + memoryBe.setAlive(be.isAlive()); + memoryBe.setDecommissioned(be.isDecommissioned()); + memoryBe.setHttpPort(be.getHttpPort()); + memoryBe.setBeRpcPort(be.getBeRpcPort()); + memoryBe.setBrpcPort(be.getBrpcPort()); + memoryBe.setLastUpdateMs(be.getLastUpdateMs()); + memoryBe.setLastStartTime(be.getLastStartTime()); + memoryBe.setDisks(be.getDisks()); + memoryBe.setBackendState(be.getBackendState()); + memoryBe.setOwnerClusterName(be.getOwnerClusterName()); + memoryBe.setDecommissionType(be.getDecommissionType()); + } } private long getClusterAvailableCapacityB(String clusterName) { @@ -1110,12 +1137,14 @@ public void updatePathInfo(List addedDisks, List removedDisk } public void modifyBackends(ModifyBackendClause alterClause) throws UserException { - List> hostPortPairs = alterClause.getHostPortPairs(); + List> ipHostPortTriples = alterClause.getIpHostPortTriples(); List backends = Lists.newArrayList(); - for (Pair pair : hostPortPairs) { - Backend be = getBackendWithHeartbeatPort(pair.first, pair.second); + for (Triple triple : ipHostPortTriples) { + Backend be = getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), triple.getRight()); if (be == null) { - throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]"); + throw new DdlException("backend does not exists[" + + (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) + + ":" + triple.getRight() + "]"); } backends.add(be); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java index a9f9a1e2113616..87d4d50a1991c2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java @@ -218,19 +218,19 @@ public void addBackendTest() throws UserException { AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); stmt.analyze(analyzer); try { - Env.getCurrentSystemInfo().addBackends(stmt.getHostPortPairs(), true); + Env.getCurrentSystemInfo().addBackends(stmt.getIpHostPortTriples(), true); } catch (DdlException e) { Assert.fail(); } try { - Env.getCurrentSystemInfo().addBackends(stmt.getHostPortPairs(), true); + Env.getCurrentSystemInfo().addBackends(stmt.getIpHostPortTriples(), true); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("already exists")); } Assert.assertNotNull(Env.getCurrentSystemInfo().getBackend(backendId)); - Assert.assertNotNull(Env.getCurrentSystemInfo().getBackendWithHeartbeatPort("192.168.0.1", 1234)); + Assert.assertNotNull(Env.getCurrentSystemInfo().getBackendWithHeartbeatPort("192.168.0.1", null, 1234)); Assert.assertTrue(Env.getCurrentSystemInfo().getBackendIds(false).size() == 1); Assert.assertTrue(Env.getCurrentSystemInfo().getBackendIds(false).get(0) == backendId); @@ -247,7 +247,7 @@ public void removeBackendTest() throws UserException { AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); stmt.analyze(analyzer); try { - Env.getCurrentSystemInfo().addBackends(stmt.getHostPortPairs(), true); + Env.getCurrentSystemInfo().addBackends(stmt.getIpHostPortTriples(), true); } catch (DdlException e) { e.printStackTrace(); } @@ -255,14 +255,14 @@ public void removeBackendTest() throws UserException { DropBackendClause dropStmt = new DropBackendClause(Lists.newArrayList("192.168.0.1:1234")); dropStmt.analyze(analyzer); try { - Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostPortPairs()); + Env.getCurrentSystemInfo().dropBackends(dropStmt.getIpHostPortTriples()); } catch (DdlException e) { e.printStackTrace(); Assert.fail(); } try { - Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostPortPairs()); + Env.getCurrentSystemInfo().dropBackends(dropStmt.getIpHostPortTriples()); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("does not exist")); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java new file mode 100644 index 00000000000000..4de2a615242507 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/system/FQDNManagerTest.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.system; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; + +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class FQDNManagerTest { + @Mocked + private InetAddress inetAddress; + + @Mocked + private Env env; + + private FQDNManager fdqnManager; + + private SystemInfoService systemInfoService; + + @Before + public void setUp() throws UnknownHostException { + new MockUp(InetAddress.class) { + @Mock + public InetAddress getByName(String hostName) { + return inetAddress; + } + }; + + new MockUp(Env.class) { + @Mock + public Env getServingEnv() { + return env; + } + }; + + new Expectations() { + { + env.isReady(); + minTimes = 0; + result = true; + + inetAddress.getHostAddress(); + minTimes = 0; + result = "193.88.67.99"; + } + }; + + Config.enable_fqdn_mode = true; + systemInfoService = new SystemInfoService(); + systemInfoService.addBackend(new Backend(1, "193.88.67.98", "doris.test.domain", 9090)); + fdqnManager = new FQDNManager(systemInfoService); + } + + @Test + public void testBackendIpChanged() throws InterruptedException { + Assert.assertEquals("193.88.67.98", systemInfoService.getBackend(1).getHost()); + fdqnManager.start(); + Thread.sleep(1000); + Assert.assertEquals("193.88.67.99", systemInfoService.getBackend(1).getHost()); + fdqnManager.exit(); + } +} From 1b62ea0a779dbadaae00e4eeb40f196397296f80 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Sun, 27 Nov 2022 19:04:11 +0800 Subject: [PATCH 2/4] fix by review --- .../src/main/java/org/apache/doris/common/Config.java | 1 - .../main/java/org/apache/doris/system/FQDNManager.java | 9 ++++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 08b919b05105c5..6c15ec979634dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1933,6 +1933,5 @@ public class Config extends ConfigBase { */ @ConfField(mutable = false, masterOnly = true) public static boolean enable_fqdn_mode = false; - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java b/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java index 7bf8bc43a14d02..6c1f48f4de5d10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/FQDNManager.java @@ -32,6 +32,8 @@ public class FQDNManager extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(FQDNManager.class); + public static final String UNKNOWN_HOST_IP = "unknown"; + private SystemInfoService nodeMgr; public FQDNManager(SystemInfoService nodeMgr) { @@ -50,7 +52,7 @@ protected void runAfterCatalogReady() { InetAddress inetAddress = InetAddress.getByName(be.getHostName()); if (!be.getHost().equalsIgnoreCase(inetAddress.getHostAddress())) { String ip = be.getHost(); - if (!ip.equalsIgnoreCase("unknown")) { + if (!ip.equalsIgnoreCase(UNKNOWN_HOST_IP)) { ClientPool.backendPool.clearPool(new TNetworkAddress(ip, be.getBePort())); } be.setHost(inetAddress.getHostAddress()); @@ -59,10 +61,11 @@ protected void runAfterCatalogReady() { } } catch (UnknownHostException e) { LOG.warn("unknown host name for be, {}", be.getHostName(), e); - if (!be.isAlive() && !be.getHost().equalsIgnoreCase("unknown")) { + // add be alive check to make ip work when be is still alive and dns has some problem. + if (!be.isAlive() && !be.getHost().equalsIgnoreCase(UNKNOWN_HOST_IP)) { String ip = be.getHost(); ClientPool.backendPool.clearPool(new TNetworkAddress(ip, be.getBePort())); - be.setHost("unknown"); + be.setHost(UNKNOWN_HOST_IP); Env.getCurrentEnv().getEditLog().logBackendStateChange(be); LOG.warn("ip for {} of be has been changed from {} to {}", be.getHostName(), ip, "unknown"); } From 84744516cbaae8b8c49e1375254cf49f44570f47 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Wed, 30 Nov 2022 15:28:47 +0800 Subject: [PATCH 3/4] fix by review --- .../org/apache/doris/alter/SystemHandler.java | 32 +++---- .../apache/doris/analysis/BackendClause.java | 16 ++-- .../doris/analysis/CancelAlterSystemStmt.java | 16 ++-- .../doris/analysis/DropBackendClause.java | 8 +- .../apache/doris/deploy/DeployManager.java | 12 +-- .../httpv2/rest/CheckDecommissionAction.java | 10 +-- .../java/org/apache/doris/system/Backend.java | 18 ++-- .../doris/system/SystemInfoService.java | 86 +++++++++++-------- .../doris/cluster/SystemInfoServiceTest.java | 10 +-- 9 files changed, 112 insertions(+), 96 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 35083983facb5c..0211ff59aee1a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -42,12 +42,12 @@ import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -125,7 +125,7 @@ public synchronized void process(List alterClauses, String clusterN if (!Strings.isNullOrEmpty(destClusterName) && Env.getCurrentEnv().getCluster(destClusterName) == null) { throw new DdlException("Cluster: " + destClusterName + " does not exist."); } - Env.getCurrentSystemInfo().addBackends(addBackendClause.getIpHostPortTriples(), addBackendClause.isFree(), + Env.getCurrentSystemInfo().addBackends(addBackendClause.getHostInfos(), addBackendClause.isFree(), addBackendClause.getDestCluster(), addBackendClause.getTagMap()); } else if (alterClause instanceof DropBackendClause) { // drop backend @@ -136,7 +136,7 @@ public synchronized void process(List alterClauses, String clusterN + "All data on this backend will be discarded permanently. " + "If you insist, use DROPP instead of DROP"); } - Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getIpHostPortTriples()); + Env.getCurrentSystemInfo().dropBackends(dropBackendClause.getHostInfos()); } else if (alterClause instanceof DecommissionBackendClause) { // decommission DecommissionBackendClause decommissionBackendClause = (DecommissionBackendClause) alterClause; @@ -197,7 +197,7 @@ private boolean checkTablets(Long beId, List backendTabletIds) { private List checkDecommission(DecommissionBackendClause decommissionBackendClause) throws DdlException { - return checkDecommission(decommissionBackendClause.getIpHostPortTriples()); + return checkDecommission(decommissionBackendClause.getHostInfos()); } /* @@ -206,18 +206,18 @@ private List checkDecommission(DecommissionBackendClause decommissionBa * 2. after decommission, the remaining backend num should meet the replication num. * 3. after decommission, The remaining space capacity can store data on decommissioned backends. */ - public static List checkDecommission(List> ipHostPortTriples) + public static List checkDecommission(List hostInfos) throws DdlException { SystemInfoService infoService = Env.getCurrentSystemInfo(); List decommissionBackends = Lists.newArrayList(); // check if exist - for (Triple triple : ipHostPortTriples) { - Backend backend = infoService.getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), - triple.getRight()); + for (HostInfo hostInfo : hostInfos) { + Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), + hostInfo.getPort()); if (backend == null) { throw new DdlException("Backend does not exist[" - + (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) - + ":" + triple.getRight() + "]"); + + (Config.enable_fqdn_mode ? hostInfo.getHostName() : hostInfo.getIp()) + + ":" + hostInfo.getPort() + "]"); } if (backend.isDecommissioned()) { // already under decommission, ignore it @@ -238,15 +238,15 @@ public synchronized void cancel(CancelStmt stmt) throws DdlException { SystemInfoService infoService = Env.getCurrentSystemInfo(); // check if backends is under decommission List backends = Lists.newArrayList(); - List> ipHostPortTriples = cancelAlterSystemStmt.getIpHostPortTriples(); - for (Triple triple : ipHostPortTriples) { + List hostInfos = cancelAlterSystemStmt.getHostInfos(); + for (HostInfo hostInfo : hostInfos) { // check if exist - Backend backend = infoService.getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), - triple.getRight()); + Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), + hostInfo.getPort()); if (backend == null) { throw new DdlException("Backend does not exist[" - + (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) - + ":" + triple.getRight() + "]"); + + (Config.enable_fqdn_mode && hostInfo.getHostName() != null ? hostInfo.getHostName() : + hostInfo.getIp()) + ":" + hostInfo.getPort() + "]"); } if (!backend.isDecommissioned()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java index a89d0a86c4d6d0..252cbc8dfbca13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java @@ -20,18 +20,18 @@ import org.apache.doris.alter.AlterOpType; import org.apache.doris.common.AnalysisException; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.lang3.tuple.Triple; import java.util.List; import java.util.Map; public class BackendClause extends AlterClause { protected List hostPorts; - protected List> ipHostPortTriples; + protected List hostInfos; public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. " + "You can set 'enable_multi_tags=true' in fe.conf to enable this feature."; @@ -41,20 +41,20 @@ public class BackendClause extends AlterClause { protected BackendClause(List hostPorts) { super(AlterOpType.ALTER_OTHER); this.hostPorts = hostPorts; - this.ipHostPortTriples = Lists.newArrayList(); + this.hostInfos = Lists.newArrayList(); } - public List> getIpHostPortTriples() { - return ipHostPortTriples; + public List getHostInfos() { + return hostInfos; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { for (String hostPort : hostPorts) { - Triple triple = SystemInfoService.getIpHostAndPort(hostPort, true); - ipHostPortTriples.add(triple); + HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, true); + hostInfos.add(hostInfo); } - Preconditions.checkState(!ipHostPortTriples.isEmpty()); + Preconditions.checkState(!hostInfos.isEmpty()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java index 29ec348baa111a..b5c251706e075f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelAlterSystemStmt.java @@ -20,35 +20,35 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.commons.lang3.tuple.Triple; import java.util.List; public class CancelAlterSystemStmt extends CancelStmt { protected List hostPorts; - private List> ipHostPortTriples; + private List hostInfos; public CancelAlterSystemStmt(List hostPorts) { this.hostPorts = hostPorts; - this.ipHostPortTriples = Lists.newArrayList(); + this.hostInfos = Lists.newArrayList(); } - public List> getIpHostPortTriples() { - return ipHostPortTriples; + public List getHostInfos() { + return hostInfos; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { for (String hostPort : hostPorts) { - Triple triple = SystemInfoService.getIpHostAndPort(hostPort, + HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, !Config.enable_fqdn_mode); - this.ipHostPortTriples.add(triple); + this.hostInfos.add(hostInfo); } - Preconditions.checkState(!this.ipHostPortTriples.isEmpty()); + Preconditions.checkState(!this.hostInfos.isEmpty()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java index 87b2f642e89dbc..c869a236b1abc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropBackendClause.java @@ -20,9 +20,9 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.tuple.Triple; import java.util.List; @@ -47,11 +47,11 @@ public boolean isForce() { public void analyze(Analyzer analyzer) throws AnalysisException { if (Config.enable_fqdn_mode) { for (String hostPort : hostPorts) { - Triple triple = SystemInfoService.getIpHostAndPort(hostPort, + HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, !Config.enable_fqdn_mode); - ipHostPortTriples.add(triple); + hostInfos.add(hostInfo); } - Preconditions.checkState(!ipHostPortTriples.isEmpty()); + Preconditions.checkState(!hostInfos.isEmpty()); } else { super.analyze(analyzer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java index 1adc65acafc928..ab5f253a9f2e23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java @@ -29,13 +29,13 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -587,12 +587,12 @@ private boolean inspectNodeChange(List> remoteHosts, env.addFrontend(FrontendNodeType.OBSERVER, remoteIp, remotePort); break; case BACKEND: - List> newBackends = Lists.newArrayList(); - String hostName = NetUtils.getHostnameByIp(remoteIp); - if (hostName.equals(remoteIp)) { - hostName = null; + List newBackends = Lists.newArrayList(); + String remoteHostName = NetUtils.getHostnameByIp(remoteIp); + if (remoteHostName.equals(remoteIp)) { + remoteHostName = null; } - newBackends.add(Triple.of(remoteIp, hostName, remotePort)); + newBackends.add(new HostInfo(remoteIp, remoteHostName, remotePort)); Env.getCurrentSystemInfo().addBackends(newBackends, false); break; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java index 25421154870937..44fcba133ecd80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckDecommissionAction.java @@ -25,10 +25,10 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import org.apache.commons.lang3.tuple.Triple; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @@ -70,18 +70,18 @@ public Object execute(HttpServletRequest request, HttpServletResponse response) return ResponseEntityBuilder.badRequest("No host:port specified"); } - List> ipHostPortTriples = Lists.newArrayList(); + List hostInfos = Lists.newArrayList(); for (String hostPort : hostPortArr) { try { - Triple triple = SystemInfoService.getIpHostAndPort(hostPort, true); - ipHostPortTriples.add(triple); + HostInfo hostInfo = SystemInfoService.getIpHostAndPort(hostPort, true); + hostInfos.add(hostInfo); } catch (AnalysisException e) { return ResponseEntityBuilder.badRequest(e.getMessage()); } } try { - List backends = SystemHandler.checkDecommission(ipHostPortTriples); + List backends = SystemHandler.checkDecommission(hostInfos); List backendsList = backends.stream().map(b -> b.getHost() + ":" + b.getHeartbeatPort()).collect(Collectors.toList()); return ResponseEntityBuilder.ok(backendsList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index a38c755baad1a4..d1c036b33e426b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -69,7 +69,7 @@ public enum BackendState { @SerializedName("id") private long id; @SerializedName("host") - private volatile String host; + private volatile String ip; @SerializedName("hostName") private String hostName; private String version; @@ -143,7 +143,7 @@ public enum BackendState { private int heartbeatFailureCounter = 0; public Backend() { - this.host = ""; + this.ip = ""; this.version = ""; this.lastUpdateMs = 0; this.lastStartTime = 0; @@ -167,7 +167,7 @@ public Backend(long id, String ip, int heartbeatPort) { public Backend(long id, String ip, String hostName, int heartbeatPort) { this.id = id; - this.host = ip; + this.ip = ip; this.hostName = hostName; this.version = ""; this.heartbeatPort = heartbeatPort; @@ -192,7 +192,7 @@ public long getId() { } public String getHost() { - return host; + return ip; } public String getHostName() { @@ -288,8 +288,8 @@ public void setBackendState(BackendState state) { this.backendState = state.ordinal(); } - public void setHost(String host) { - this.host = host; + public void setHost(String ip) { + this.ip = ip; } public void setAlive(boolean isAlive) { @@ -597,7 +597,7 @@ public void write(DataOutput out) throws IOException { @Override public int hashCode() { - return Objects.hash(id, host, heartbeatPort, bePort, isAlive); + return Objects.hash(id, ip, heartbeatPort, bePort, isAlive); } @Override @@ -611,13 +611,13 @@ public boolean equals(Object obj) { Backend backend = (Backend) obj; - return (id == backend.id) && (host.equals(backend.host)) && (heartbeatPort == backend.heartbeatPort) + return (id == backend.id) && (ip.equals(backend.ip)) && (heartbeatPort == backend.heartbeatPort) && (bePort == backend.bePort) && (isAlive.get() == backend.isAlive.get()); } @Override public String toString() { - return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() + return "Backend [id=" + id + ", ip=" + ip + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() + ", tags: " + tagMap + "]"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 8c3af9f5d785cb..22b27880d6a02b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -45,7 +45,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -76,13 +75,31 @@ public class SystemInfoService { private volatile ImmutableMap idToBackendRef = ImmutableMap.of(); private volatile ImmutableMap idToReportVersionRef = ImmutableMap.of(); - // last backend id used by round robin for sequential selecting backends for replica creation - private Map lastBackendIdForReplicaCreation = Maps.newConcurrentMap(); + private volatile ImmutableMap pathHashToDishInfoRef = ImmutableMap.of(); - private long lastBackendIdForCreation = -1; - private long lastBackendIdForOther = -1; + public static class HostInfo { + public String ip; + public String hostName; + public int port; - private volatile ImmutableMap pathHashToDishInfoRef = ImmutableMap.of(); + public HostInfo(String ip, String hostName, int port) { + this.ip = ip; + this.hostName = hostName; + this.port = port; + } + + public String getIp() { + return ip; + } + + public String getHostName() { + return hostName; + } + + public int getPort() { + return port; + } + } // sort host backends list by num of backends, descending private static final Comparator> hostBackendsListComparator = new Comparator>() { @@ -97,33 +114,33 @@ public int compare(List list1, List list2) { }; // for deploy manager - public void addBackends(List> ipHostPortTriples, boolean isFree) + public void addBackends(List hostInfos, boolean isFree) throws UserException { - addBackends(ipHostPortTriples, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap()); + addBackends(hostInfos, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap()); } /** - * @param ipHostPortTriples : backend's ip, hostName and port + * @param hostInfos : backend's ip, hostName and port * @param isFree : if true the backend is not owned by any cluster * @param destCluster : if not null or empty backend will be added to destCluster * @throws DdlException */ - public void addBackends(List> ipHostPortTriples, boolean isFree, String destCluster, + public void addBackends(List hostInfos, boolean isFree, String destCluster, Map tagMap) throws UserException { - for (Triple triple : ipHostPortTriples) { - if (Config.enable_fqdn_mode && triple.getMiddle() == null) { + for (HostInfo hostInfo : hostInfos) { + if (Config.enable_fqdn_mode && hostInfo.getHostName() == null) { throw new DdlException("backend's hostName should not be null while enable_fqdn_mode is true"); } // check is already exist - if (getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), triple.getRight()) != null) { - String backendIdentifier = (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) + ":" - + triple.getRight(); + if (getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()) != null) { + String backendIdentifier = (Config.enable_fqdn_mode ? hostInfo.getHostName() : hostInfo.getIp()) + ":" + + hostInfo.getPort(); throw new DdlException("Same backend already exists[" + backendIdentifier + "]"); } } - for (Triple triple : ipHostPortTriples) { - addBackend(triple.getLeft(), triple.getMiddle(), triple.getRight(), isFree, destCluster, tagMap); + for (HostInfo hostInfo : hostInfos) { + addBackend(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort(), isFree, destCluster, tagMap); } } @@ -180,17 +197,17 @@ private void addBackend(String ip, String hostName, int heartbeatPort, boolean i MetricRepo.generateBackendsTabletMetrics(); } - public void dropBackends(List> ipHostPortTriples) throws DdlException { - for (Triple triple : ipHostPortTriples) { + public void dropBackends(List hostInfos) throws DdlException { + for (HostInfo hostInfo : hostInfos) { // check is already exist - if (getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), triple.getRight()) == null) { - String backendIdentifier = (triple.getLeft() == null ? triple.getMiddle() : triple.getLeft()) + ":" - + triple.getRight(); + if (getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()) == null) { + String backendIdentifier = Config.enable_fqdn_mode && hostInfo.getHostName() != null + ? hostInfo.getHostName() : hostInfo.getIp() + ":" + hostInfo.getPort(); throw new DdlException("backend does not exists[" + backendIdentifier + "]"); } } - for (Triple triple : ipHostPortTriples) { - dropBackend(triple.getLeft(), triple.getMiddle(), triple.getRight()); + for (HostInfo hostInfo : hostInfos) { + dropBackend(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()); } } @@ -915,7 +932,7 @@ public void clear() { this.idToReportVersionRef = null; } - public static Triple getIpHostAndPort(String hostPort, boolean strictCheck) + public static HostInfo getIpHostAndPort(String hostPort, boolean strictCheck) throws AnalysisException { hostPort = hostPort.replaceAll("\\s+", ""); if (hostPort.isEmpty()) { @@ -954,10 +971,10 @@ public static Triple getIpHostAndPort(String hostPort, hostName = null; } } - return Triple.of(ip, hostName, heartbeatPort); + return new HostInfo(ip, hostName, heartbeatPort); } catch (UnknownHostException e) { if (!strictCheck) { - return Triple.of(null, hostName, heartbeatPort); + return new HostInfo(null, hostName, heartbeatPort); } throw new AnalysisException("Unknown host: " + e.getMessage()); } catch (Exception e) { @@ -967,8 +984,8 @@ public static Triple getIpHostAndPort(String hostPort, public static Pair validateHostAndPort(String hostPort) throws AnalysisException { - Triple ipHostPortTriple = getIpHostAndPort(hostPort, true); - return Pair.of(ipHostPortTriple.getLeft(), ipHostPortTriple.getRight()); + HostInfo hostInfo = getIpHostAndPort(hostPort, true); + return Pair.of(hostInfo.getIp(), hostInfo.getPort()); } public void replayAddBackend(Backend newBackend) { @@ -1137,14 +1154,13 @@ public void updatePathInfo(List addedDisks, List removedDisk } public void modifyBackends(ModifyBackendClause alterClause) throws UserException { - List> ipHostPortTriples = alterClause.getIpHostPortTriples(); + List hostInfos = alterClause.getHostInfos(); List backends = Lists.newArrayList(); - for (Triple triple : ipHostPortTriples) { - Backend be = getBackendWithHeartbeatPort(triple.getLeft(), triple.getMiddle(), triple.getRight()); + for (HostInfo hostInfo : hostInfos) { + Backend be = getBackendWithHeartbeatPort(hostInfo.getIp(), hostInfo.getHostName(), hostInfo.getPort()); if (be == null) { - throw new DdlException("backend does not exists[" - + (Config.enable_fqdn_mode ? triple.getMiddle() : triple.getLeft()) - + ":" + triple.getRight() + "]"); + throw new DdlException("backend does not exists[" + (Config.enable_fqdn_mode && hostInfo.getHostName() + != null ? hostInfo.getHostName() : hostInfo.getIp()) + ":" + hostInfo.getPort() + "]"); } backends.add(be); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java index 87d4d50a1991c2..69a041ba0d4f5e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java @@ -218,13 +218,13 @@ public void addBackendTest() throws UserException { AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); stmt.analyze(analyzer); try { - Env.getCurrentSystemInfo().addBackends(stmt.getIpHostPortTriples(), true); + Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true); } catch (DdlException e) { Assert.fail(); } try { - Env.getCurrentSystemInfo().addBackends(stmt.getIpHostPortTriples(), true); + Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("already exists")); } @@ -247,7 +247,7 @@ public void removeBackendTest() throws UserException { AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); stmt.analyze(analyzer); try { - Env.getCurrentSystemInfo().addBackends(stmt.getIpHostPortTriples(), true); + Env.getCurrentSystemInfo().addBackends(stmt.getHostInfos(), true); } catch (DdlException e) { e.printStackTrace(); } @@ -255,14 +255,14 @@ public void removeBackendTest() throws UserException { DropBackendClause dropStmt = new DropBackendClause(Lists.newArrayList("192.168.0.1:1234")); dropStmt.analyze(analyzer); try { - Env.getCurrentSystemInfo().dropBackends(dropStmt.getIpHostPortTriples()); + Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostInfos()); } catch (DdlException e) { e.printStackTrace(); Assert.fail(); } try { - Env.getCurrentSystemInfo().dropBackends(dropStmt.getIpHostPortTriples()); + Env.getCurrentSystemInfo().dropBackends(dropStmt.getHostInfos()); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("does not exist")); } From c4f4bac1ef195563a195fe427ad4db6cce0e2b60 Mon Sep 17 00:00:00 2001 From: caiconghui1 Date: Wed, 30 Nov 2022 16:11:12 +0800 Subject: [PATCH 4/4] fix ut failed --- fe/fe-core/src/main/java/org/apache/doris/system/Backend.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index d1c036b33e426b..20c6a3785a050a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -617,7 +617,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return "Backend [id=" + id + ", ip=" + ip + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() + return "Backend [id=" + id + ", host=" + ip + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() + ", tags: " + tagMap + "]"; }