Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1456,15 +1456,6 @@ private void transferToNonMaster(FrontendNodeType newType) {

// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER

// add helper sockets
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
((BDBHA) getHaProtocol()).addHelperSocket(fe.getIp(), fe.getEditLogPort());
}
}
}

if (replayer == null) {
createReplayer();
replayer.start();
Expand Down Expand Up @@ -2468,7 +2459,6 @@ public void addFrontend(FrontendNodeType role, String ip, String hostname, int e
frontends.put(nodeName, fe);
BDBHA bdbha = (BDBHA) haProtocol;
if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) {
bdbha.addHelperSocket(ip, editLogPort);
helperNodes.add(new HostInfo(ip, hostname, editLogPort));
bdbha.addUnReadyElectableNode(nodeName, getFollowerCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -142,7 +141,7 @@ public static void getFrontendsInfo(Env env, List<List<String>> infos) {
}

private static boolean isHelperNode(List<HostInfo> helperNodes, Frontend fe) {
return helperNodes.stream().anyMatch(p -> p.getIp().equals(fe.getIp()) && p.getPort() == fe.getEditLogPort());
return helperNodes.stream().anyMatch(p -> fe.toHostInfo().isSame(p));
}

private static boolean isJoin(List<InetSocketAddress> allFeHosts, Frontend fe) {
Expand All @@ -169,12 +168,4 @@ private static boolean isJoin(List<InetSocketAddress> allFeHosts, Frontend fe) {
}
return false;
}

private static List<Pair<String, Integer>> convertToHostPortPair(List<InetSocketAddress> addrs) {
List<Pair<String, Integer>> hostPortPair = Lists.newArrayList();
for (InetSocketAddress addr : addrs) {
hostPortPair.add(Pair.of(addr.getAddress().getHostAddress(), addr.getPort()));
}
return hostPortPair;
}
}
68 changes: 0 additions & 68 deletions fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.journal.bdbje.BDBEnvironment;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
Expand Down Expand Up @@ -68,11 +67,6 @@ public BDBHA(BDBEnvironment env, String nodeName) {
this.nodeName = nodeName;
}

@Override
public long getEpochNumber() {
return 0;
}

@Override
public boolean fencing() {
Database epochDb = environment.getEpochDB();
Expand Down Expand Up @@ -162,47 +156,6 @@ public InetSocketAddress getLeader() {
return rn.getSocketAddress();
}

@Override
public List<InetSocketAddress> getNoneLeaderNodes() {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
if (replicationGroupAdmin == null) {
return null;
}
List<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
try {
ReplicationGroup replicationGroup = replicationGroupAdmin.getGroup();
for (ReplicationNode replicationNode : replicationGroup.getSecondaryNodes()) {
ret.add(replicationNode.getSocketAddress());
}
for (ReplicationNode replicationNode : replicationGroup.getElectableNodes()) {
if (!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) {
ret.add(replicationNode.getSocketAddress());
}
}
} catch (UnknownMasterException e) {
LOG.warn("Catch UnknownMasterException when calling getNoneLeaderNodes.", e);
return null;
}
return ret;
}

@Override
public void transferToMaster() {

}

@Override
public void transferToNonMaster() {

}

@Override
public boolean isLeader() {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
String leaderName = replicationGroupAdmin.getMasterNodeName();
return leaderName.equals(nodeName);
}

@Override
public boolean removeElectableNode(String nodeName) {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
Expand Down Expand Up @@ -238,27 +191,6 @@ public boolean updateNodeAddress(String nodeName, String newHostName, int port)
return true;
}

// When new Follower FE is added to the cluster, it should also be added to the
// helper sockets in
// ReplicationGroupAdmin, in order to fix the following case:
// 1. A Observer starts with helper of master FE.
// 2. Master FE is dead, new Master is elected.
// 3. Observer's helper sockets only contains the info of the dead master FE.
// So when you try to get frontends' info from this Observer, it will throw the
// Exception:
// "Could not determine master from helpers at:[/dead master FE host:port]"
public void addHelperSocket(String ip, Integer port) {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
Set<InetSocketAddress> helperSockets =
Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port);
if (!helperSockets.contains(newHelperSocket)) {
helperSockets.add(newHelperSocket);
environment.setNewReplicationGroupAdmin(helperSockets);
LOG.info("add {}:{} to helper sockets", ip, port);
}
}

public void removeConflictNodeIfExist(String host, int port) {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
if (replicationGroupAdmin == null) {
Expand Down
25 changes: 5 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/ha/HAProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,18 @@
import java.util.List;

public interface HAProtocol {
// get current epoch number
public long getEpochNumber();

// increase epoch number by one
public boolean fencing();
boolean fencing();

// get observer nodes in the current group
public List<InetSocketAddress> getObserverNodes();
List<InetSocketAddress> getObserverNodes();

// get replica nodes in the current group
public List<InetSocketAddress> getElectableNodes(boolean leaderIncluded);
List<InetSocketAddress> getElectableNodes(boolean leaderIncluded);

// get the leader of current group
public InetSocketAddress getLeader();

// get all the nodes except leader in the current group
public List<InetSocketAddress> getNoneLeaderNodes();

// transfer from nonMaster(unknown, follower or init) to master
public void transferToMaster();

// transfer to non-master
public void transferToNonMaster();

// check if the current node is leader
public boolean isLeader();
InetSocketAddress getLeader();

// remove a node from the group
public boolean removeElectableNode(String nodeName);
boolean removeElectableNode(String nodeName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.doris.common.Config;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.BDBStateChangeListener;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.ha.HAProtocol;
import org.apache.doris.system.Frontend;

import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
Expand Down Expand Up @@ -51,11 +53,11 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/* this class contains the reference to bdb environment.
* including all the opened databases and the replicationGroupAdmin.
Expand All @@ -73,7 +75,6 @@ public class BDBEnvironment {
private ReplicationConfig replicationConfig;
private DatabaseConfig dbConfig;
private Database epochDB = null; // used for fencing
private ReplicationGroupAdmin replicationGroupAdmin = null;
private ReentrantReadWriteLock lock;
private List<Database> openedDatabases;

Expand Down Expand Up @@ -153,29 +154,6 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
// open the environment
replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);

// get replicationGroupAdmin object.
Set<InetSocketAddress> adminNodes = new HashSet<InetSocketAddress>();
// 1. add helper node
// If host is ipv6 address there will be more than one colon in host str
int helperColonIdx = helperHostPort.lastIndexOf(":");
String helperHost = helperHostPort.substring(0, helperColonIdx);
int helperPort = Integer.parseInt(helperHostPort.substring(helperColonIdx + 1));
InetSocketAddress helper = new InetSocketAddress(helperHost, helperPort);
adminNodes.add(helper);
LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort);

// 2. add self if is electable
if (!selfNodeHostPort.equals(helperHostPort) && Env.getCurrentEnv().isElectable()) {
int selfColonIdx = selfNodeHostPort.lastIndexOf(":");
String selfHost = selfNodeHostPort.substring(0, selfColonIdx);
int selfPort = Integer.parseInt(selfNodeHostPort.substring(selfColonIdx + 1));
InetSocketAddress self = new InetSocketAddress(selfHost, selfPort);
adminNodes.add(self);
LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort);
}

replicationGroupAdmin = new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, adminNodes);

// get a BDBHA object and pass the reference to Catalog
HAProtocol protocol = new BDBHA(this, selfNodeName);
Env.getCurrentEnv().setHaProtocol(protocol);
Expand Down Expand Up @@ -211,11 +189,13 @@ public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
}

public ReplicationGroupAdmin getReplicationGroupAdmin() {
return this.replicationGroupAdmin;
}

public void setNewReplicationGroupAdmin(Set<InetSocketAddress> newHelperNodes) {
this.replicationGroupAdmin = new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, newHelperNodes);
Set<InetSocketAddress> addresses = Env.getCurrentEnv()
.getFrontends(FrontendNodeType.FOLLOWER)
.stream()
.filter(Frontend::isAlive)
.map(fe -> new InetSocketAddress(fe.getIp(), fe.getEditLogPort()))
.collect(Collectors.toSet());
return new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, addresses);
}

// Return a handle to the epochDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.gson.annotations.SerializedName;

Expand Down Expand Up @@ -195,4 +196,8 @@ public String toString() {
public void setIp(String ip) {
this.ip = ip;
}

public HostInfo toHostInfo() {
return new HostInfo(ip, hostName, editLogPort);
}
}