diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index d6ce141ca0aff0..cb1d9b428fe89e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1011,7 +1011,7 @@ private void getClusterIdAndRole() throws IOException { Thread.sleep(5000); continue; } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("", e); System.exit(-1); } } @@ -1464,15 +1464,6 @@ private void transferToNonMaster(FrontendNodeType newType) { // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER - // add helper sockets - if (Config.edit_log_type.equalsIgnoreCase("bdb")) { - for (Frontend fe : frontends.values()) { - if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { - ((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort()); - } - } - } - if (replayer == null) { createReplayer(); replayer.start(); @@ -2566,8 +2557,7 @@ public void addFrontend(FrontendNodeType role, String host, int editLogPort) thr frontends.put(nodeName, fe); BDBHA bdbha = (BDBHA) haProtocol; if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) { - bdbha.addHelperSocket(host, editLogPort); - helperNodes.add(Pair.of(host, editLogPort)); + helperNodes.add(new HostInfo(ip, hostname, editLogPort)); bdbha.addUnReadyElectableNode(nodeName, getFollowerCount()); } bdbha.removeConflictNodeIfExist(host, editLogPort); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index 1371fa598a4a12..9016f8c1ecfcaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.Pair; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.ConnectContext; @@ -140,24 +139,32 @@ public static void getFrontendsInfo(Env env, List> infos) { } } - private static boolean isHelperNode(List> helperNodes, Frontend fe) { - return helperNodes.stream().anyMatch(p -> p.first.equals(fe.getHost()) && p.second == fe.getEditLogPort()); + private static boolean isHelperNode(List helperNodes, Frontend fe) { + return helperNodes.stream().anyMatch(p -> p.getIp().equals(fe.getIp()) && p.getPort() == fe.getEditLogPort()); } - private static boolean isJoin(List> allFeHosts, Frontend fe) { - for (Pair pair : allFeHosts) { - if (fe.getHost().equals(pair.first) && fe.getEditLogPort() == pair.second) { + private static boolean isJoin(List allFeHosts, Frontend fe) { + for (InetSocketAddress addr : allFeHosts) { + if (fe.getEditLogPort() != addr.getPort()) { + continue; + } + if (!Strings.isNullOrEmpty(addr.getHostName())) { + if (addr.getHostName().equals(fe.getHostName()) + || addr.getHostName().equals(fe.getIp())) { + return true; + } + } + // if hostname of InetSocketAddress is ip, addr.getHostName() may be not equal to fe.getIp() + // so we need to compare fe.getIp() with address.getHostAddress() + InetAddress address = addr.getAddress(); + if (null == address) { + LOG.warn("Failed to get InetAddress {}", addr); + continue; + } + if (fe.getIp().equals(address.getHostAddress())) { return true; } } return false; } - - private static List> convertToHostPortPair(List addrs) { - List> hostPortPair = Lists.newArrayList(); - for (InetSocketAddress addr : addrs) { - hostPortPair.add(Pair.of(addr.getAddress().getHostAddress(), addr.getPort())); - } - return hostPortPair; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java index 1c3a99c1dce804..b72c6ac689d6b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java @@ -21,7 +21,6 @@ import org.apache.doris.journal.bdbje.BDBEnvironment; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; @@ -68,11 +67,6 @@ public BDBHA(BDBEnvironment env, String nodeName) { this.nodeName = nodeName; } - @Override - public long getEpochNumber() { - return 0; - } - @Override public boolean fencing() { Database epochDb = environment.getEpochDB(); @@ -101,7 +95,7 @@ public boolean fencing() { try { Thread.sleep(2000); } catch (InterruptedException e1) { - e1.printStackTrace(); + LOG.warn("", e1); } } } @@ -162,47 +156,6 @@ public InetSocketAddress getLeader() { return rn.getSocketAddress(); } - @Override - public List getNoneLeaderNodes() { - ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - if (replicationGroupAdmin == null) { - return null; - } - List ret = new ArrayList(); - try { - ReplicationGroup replicationGroup = replicationGroupAdmin.getGroup(); - for (ReplicationNode replicationNode : replicationGroup.getSecondaryNodes()) { - ret.add(replicationNode.getSocketAddress()); - } - for (ReplicationNode replicationNode : replicationGroup.getElectableNodes()) { - if (!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) { - ret.add(replicationNode.getSocketAddress()); - } - } - } catch (UnknownMasterException e) { - LOG.warn("Catch UnknownMasterException when calling getNoneLeaderNodes.", e); - return null; - } - return ret; - } - - @Override - public void transferToMaster() { - - } - - @Override - public void transferToNonMaster() { - - } - - @Override - public boolean isLeader() { - ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - String leaderName = replicationGroupAdmin.getMasterNodeName(); - return leaderName.equals(nodeName); - } - @Override public boolean removeElectableNode(String nodeName) { ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); @@ -221,27 +174,6 @@ public boolean removeElectableNode(String nodeName) { return true; } - // When new Follower FE is added to the cluster, it should also be added to the - // helper sockets in - // ReplicationGroupAdmin, in order to fix the following case: - // 1. A Observer starts with helper of master FE. - // 2. Master FE is dead, new Master is elected. - // 3. Observer's helper sockets only contains the info of the dead master FE. - // So when you try to get frontends' info from this Observer, it will throw the - // Exception: - // "Could not determine master from helpers at:[/dead master FE host:port]" - public void addHelperSocket(String ip, Integer port) { - ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - Set helperSockets = - Sets.newHashSet(replicationGroupAdmin.getHelperSockets()); - InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port); - if (!helperSockets.contains(newHelperSocket)) { - helperSockets.add(newHelperSocket); - environment.setNewReplicationGroupAdmin(helperSockets); - LOG.info("add {}:{} to helper sockets", ip, port); - } - } - public void removeConflictNodeIfExist(String host, int port) { ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); if (replicationGroupAdmin == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java b/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java index 8160c0b9f1344a..6f44f260a3c972 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java @@ -21,33 +21,18 @@ import java.util.List; public interface HAProtocol { - // get current epoch number - public long getEpochNumber(); - // increase epoch number by one - public boolean fencing(); + boolean fencing(); // get observer nodes in the current group - public List getObserverNodes(); + List getObserverNodes(); // get replica nodes in the current group - public List getElectableNodes(boolean leaderIncluded); + List getElectableNodes(boolean leaderIncluded); // get the leader of current group - public InetSocketAddress getLeader(); - - // get all the nodes except leader in the current group - public List getNoneLeaderNodes(); - - // transfer from nonMaster(unknown, follower or init) to master - public void transferToMaster(); - - // transfer to non-master - public void transferToNonMaster(); - - // check if the current node is leader - public boolean isLeader(); + InetSocketAddress getLeader(); // remove a node from the group - public boolean removeElectableNode(String nodeName); + boolean removeElectableNode(String nodeName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java index 14e28a8539989c..b7bf0e8b66b61a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java @@ -21,7 +21,9 @@ import org.apache.doris.common.Config; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.BDBStateChangeListener; +import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; +import org.apache.doris.system.Frontend; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -51,11 +53,11 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /* this class contains the reference to bdb environment. * including all the opened databases and the replicationGroupAdmin. @@ -73,7 +75,6 @@ public class BDBEnvironment { private ReplicationConfig replicationConfig; private DatabaseConfig dbConfig; private Database epochDB = null; // used for fencing - private ReplicationGroupAdmin replicationGroupAdmin = null; private ReentrantReadWriteLock lock; private List openedDatabases; @@ -153,23 +154,6 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort, // open the environment replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig); - // get replicationGroupAdmin object. - Set adminNodes = new HashSet(); - // 1. add helper node - InetSocketAddress helper = new InetSocketAddress(helperHostPort.split(":")[0], - Integer.parseInt(helperHostPort.split(":")[1])); - adminNodes.add(helper); - LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort); - // 2. add self if is electable - if (!selfNodeHostPort.equals(helperHostPort) && Env.getCurrentEnv().isElectable()) { - InetSocketAddress self = new InetSocketAddress(selfNodeHostPort.split(":")[0], - Integer.parseInt(selfNodeHostPort.split(":")[1])); - adminNodes.add(self); - LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort); - } - - replicationGroupAdmin = new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, adminNodes); - // get a BDBHA object and pass the reference to Catalog HAProtocol protocol = new BDBHA(this, selfNodeName); Env.getCurrentEnv().setHaProtocol(protocol); @@ -177,7 +161,6 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort, // start state change listener StateChangeListener listener = new BDBStateChangeListener(); replicatedEnvironment.setStateChangeListener(listener); - // open epochDB. the first parameter null means auto-commit epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig); break; @@ -195,7 +178,7 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort, try { Thread.sleep(5 * 1000); } catch (InterruptedException e1) { - e1.printStackTrace(); + LOG.warn("", e1); } } else { LOG.error("error to open replicated environment. will exit.", e); @@ -206,11 +189,13 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort, } public ReplicationGroupAdmin getReplicationGroupAdmin() { - return this.replicationGroupAdmin; - } - - public void setNewReplicationGroupAdmin(Set newHelperNodes) { - this.replicationGroupAdmin = new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, newHelperNodes); + Set addresses = Env.getCurrentEnv() + .getFrontends(FrontendNodeType.FOLLOWER) + .stream() + .filter(Frontend::isAlive) + .map(fe -> new InetSocketAddress(fe.getIp(), fe.getEditLogPort())) + .collect(Collectors.toSet()); + return new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, addresses); } // Return a handle to the epochDB diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 88448e49c6e72c..dd04f817f264a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -24,6 +24,7 @@ import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.system.SystemInfoService.HostInfo; import java.io.DataInput; import java.io.DataOutput; @@ -162,4 +163,8 @@ public String toString() { sb.append(", ").append(host).append(":").append(editLogPort); return sb.toString(); } + + public HostInfo toHostInfo() { + return new HostInfo(ip, hostName, editLogPort); + } }