From e5f41788f4743b6023df1ebea073a4cf8ceeab37 Mon Sep 17 00:00:00 2001 From: yintao <373141588@qq.com> Date: Mon, 6 Mar 2023 15:48:27 +0800 Subject: [PATCH] [Fix](FQDN) fix slow when ip changed --- .../java/org/apache/doris/catalog/Env.java | 10 --- .../doris/common/proc/FrontendsProcNode.java | 11 +-- .../main/java/org/apache/doris/ha/BDBHA.java | 68 ------------------- .../java/org/apache/doris/ha/HAProtocol.java | 25 ++----- .../doris/journal/bdbje/BDBEnvironment.java | 40 +++-------- .../org/apache/doris/system/Frontend.java | 5 ++ 6 files changed, 21 insertions(+), 138 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index af74e0f0c437b0..e253316feebeca 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1456,15 +1456,6 @@ private void transferToNonMaster(FrontendNodeType newType) { // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER - // add helper sockets - if (Config.edit_log_type.equalsIgnoreCase("bdb")) { - for (Frontend fe : frontends.values()) { - if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { - ((BDBHA) getHaProtocol()).addHelperSocket(fe.getIp(), fe.getEditLogPort()); - } - } - } - if (replayer == null) { createReplayer(); replayer.start(); @@ -2468,7 +2459,6 @@ public void addFrontend(FrontendNodeType role, String ip, String hostname, int e frontends.put(nodeName, fe); BDBHA bdbha = (BDBHA) haProtocol; if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) { - bdbha.addHelperSocket(ip, editLogPort); helperNodes.add(new HostInfo(ip, hostname, editLogPort)); bdbha.addUnReadyElectableNode(nodeName, getFollowerCount()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index 8a83d50bebe893..74354a05e6a05f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.Pair; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.ConnectContext; @@ -142,7 +141,7 @@ public static void getFrontendsInfo(Env env, List> infos) { } private static boolean isHelperNode(List helperNodes, Frontend fe) { - return helperNodes.stream().anyMatch(p -> p.getIp().equals(fe.getIp()) && p.getPort() == fe.getEditLogPort()); + return helperNodes.stream().anyMatch(p -> fe.toHostInfo().isSame(p)); } private static boolean isJoin(List allFeHosts, Frontend fe) { @@ -169,12 +168,4 @@ private static boolean isJoin(List allFeHosts, Frontend fe) { } return false; } - - private static List> convertToHostPortPair(List addrs) { - List> hostPortPair = Lists.newArrayList(); - for (InetSocketAddress addr : addrs) { - hostPortPair.add(Pair.of(addr.getAddress().getHostAddress(), addr.getPort())); - } - return hostPortPair; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java index bb4d6020c78af4..3ce28aa59fba49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java @@ -21,7 +21,6 @@ import org.apache.doris.journal.bdbje.BDBEnvironment; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; @@ -68,11 +67,6 @@ public BDBHA(BDBEnvironment env, String nodeName) { this.nodeName = nodeName; } - @Override - public long getEpochNumber() { - return 0; - } - @Override public boolean fencing() { Database epochDb = environment.getEpochDB(); @@ -162,47 +156,6 @@ public InetSocketAddress getLeader() { return rn.getSocketAddress(); } - @Override - public List getNoneLeaderNodes() { - ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - if (replicationGroupAdmin == null) { - return null; - } - List ret = new ArrayList(); - try { - ReplicationGroup replicationGroup = replicationGroupAdmin.getGroup(); - for (ReplicationNode replicationNode : replicationGroup.getSecondaryNodes()) { - ret.add(replicationNode.getSocketAddress()); - } - for (ReplicationNode replicationNode : replicationGroup.getElectableNodes()) { - if (!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) { - ret.add(replicationNode.getSocketAddress()); - } - } - } catch (UnknownMasterException e) { - LOG.warn("Catch UnknownMasterException when calling getNoneLeaderNodes.", e); - return null; - } - return ret; - } - - @Override - public void transferToMaster() { - - } - - @Override - public void transferToNonMaster() { - - } - - @Override - public boolean isLeader() { - ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - String leaderName = replicationGroupAdmin.getMasterNodeName(); - return leaderName.equals(nodeName); - } - @Override public boolean removeElectableNode(String nodeName) { ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); @@ -238,27 +191,6 @@ public boolean updateNodeAddress(String nodeName, String newHostName, int port) return true; } - // When new Follower FE is added to the cluster, it should also be added to the - // helper sockets in - // ReplicationGroupAdmin, in order to fix the following case: - // 1. A Observer starts with helper of master FE. - // 2. Master FE is dead, new Master is elected. - // 3. Observer's helper sockets only contains the info of the dead master FE. - // So when you try to get frontends' info from this Observer, it will throw the - // Exception: - // "Could not determine master from helpers at:[/dead master FE host:port]" - public void addHelperSocket(String ip, Integer port) { - ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - Set helperSockets = - Sets.newHashSet(replicationGroupAdmin.getHelperSockets()); - InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port); - if (!helperSockets.contains(newHelperSocket)) { - helperSockets.add(newHelperSocket); - environment.setNewReplicationGroupAdmin(helperSockets); - LOG.info("add {}:{} to helper sockets", ip, port); - } - } - public void removeConflictNodeIfExist(String host, int port) { ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); if (replicationGroupAdmin == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java b/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java index 8160c0b9f1344a..6f44f260a3c972 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java @@ -21,33 +21,18 @@ import java.util.List; public interface HAProtocol { - // get current epoch number - public long getEpochNumber(); - // increase epoch number by one - public boolean fencing(); + boolean fencing(); // get observer nodes in the current group - public List getObserverNodes(); + List getObserverNodes(); // get replica nodes in the current group - public List getElectableNodes(boolean leaderIncluded); + List getElectableNodes(boolean leaderIncluded); // get the leader of current group - public InetSocketAddress getLeader(); - - // get all the nodes except leader in the current group - public List getNoneLeaderNodes(); - - // transfer from nonMaster(unknown, follower or init) to master - public void transferToMaster(); - - // transfer to non-master - public void transferToNonMaster(); - - // check if the current node is leader - public boolean isLeader(); + InetSocketAddress getLeader(); // remove a node from the group - public boolean removeElectableNode(String nodeName); + boolean removeElectableNode(String nodeName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java index a656b4196d7e1e..4f6faf1b13007c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java @@ -21,7 +21,9 @@ import org.apache.doris.common.Config; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.BDBStateChangeListener; +import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; +import org.apache.doris.system.Frontend; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -51,11 +53,11 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /* this class contains the reference to bdb environment. * including all the opened databases and the replicationGroupAdmin. @@ -73,7 +75,6 @@ public class BDBEnvironment { private ReplicationConfig replicationConfig; private DatabaseConfig dbConfig; private Database epochDB = null; // used for fencing - private ReplicationGroupAdmin replicationGroupAdmin = null; private ReentrantReadWriteLock lock; private List openedDatabases; @@ -153,29 +154,6 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort, // open the environment replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig); - // get replicationGroupAdmin object. - Set adminNodes = new HashSet(); - // 1. add helper node - // If host is ipv6 address there will be more than one colon in host str - int helperColonIdx = helperHostPort.lastIndexOf(":"); - String helperHost = helperHostPort.substring(0, helperColonIdx); - int helperPort = Integer.parseInt(helperHostPort.substring(helperColonIdx + 1)); - InetSocketAddress helper = new InetSocketAddress(helperHost, helperPort); - adminNodes.add(helper); - LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort); - - // 2. add self if is electable - if (!selfNodeHostPort.equals(helperHostPort) && Env.getCurrentEnv().isElectable()) { - int selfColonIdx = selfNodeHostPort.lastIndexOf(":"); - String selfHost = selfNodeHostPort.substring(0, selfColonIdx); - int selfPort = Integer.parseInt(selfNodeHostPort.substring(selfColonIdx + 1)); - InetSocketAddress self = new InetSocketAddress(selfHost, selfPort); - adminNodes.add(self); - LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort); - } - - replicationGroupAdmin = new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, adminNodes); - // get a BDBHA object and pass the reference to Catalog HAProtocol protocol = new BDBHA(this, selfNodeName); Env.getCurrentEnv().setHaProtocol(protocol); @@ -211,11 +189,13 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort, } public ReplicationGroupAdmin getReplicationGroupAdmin() { - return this.replicationGroupAdmin; - } - - public void setNewReplicationGroupAdmin(Set newHelperNodes) { - this.replicationGroupAdmin = new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, newHelperNodes); + Set addresses = Env.getCurrentEnv() + .getFrontends(FrontendNodeType.FOLLOWER) + .stream() + .filter(Frontend::isAlive) + .map(fe -> new InetSocketAddress(fe.getIp(), fe.getEditLogPort())) + .collect(Collectors.toSet()); + return new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, addresses); } // Return a handle to the epochDB diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index d1a7c55ae56e90..486a5a052ffbe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -26,6 +26,7 @@ import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.gson.annotations.SerializedName; @@ -195,4 +196,8 @@ public String toString() { public void setIp(String ip) { this.ip = ip; } + + public HostInfo toHostInfo() { + return new HostInfo(ip, hostName, editLogPort); + } }