diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 747607f633..0f2515cbc9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -50,8 +51,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.ratis.util.LifeCycle.State.NEW; import static org.apache.ratis.util.LifeCycle.State.RUNNING; @@ -104,7 +105,7 @@ enum Phase { ELECTION } - enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN} + enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF} private static class ResultAndTerm { private final Result result; @@ -271,6 +272,9 @@ private boolean shouldRun(long electionTerm) { private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationImpl conf, long electionTerm) throws InterruptedException { + if (!conf.containsInConf(server.getId())) { + return new ResultAndTerm(Result.NOT_IN_CONF, electionTerm); + } final ResultAndTerm r; final Collection others = conf.getOtherPeers(server.getId()); if (others.isEmpty()) { @@ -315,6 +319,7 @@ private boolean askForVotes(Phase phase) throws InterruptedException, IOExceptio switch (r.getResult()) { case PASSED: return true; + case NOT_IN_CONF: case SHUTDOWN: server.getRaftServer().close(); return false; @@ -345,18 +350,12 @@ private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry, } private Set getHigherPriorityPeers(RaftConfiguration conf) { - Set higherPriorityPeers = new HashSet<>(); - - int currPriority = conf.getPeer(server.getId()).getPriority(); - final Collection peers = conf.getAllPeers(); - - for (RaftPeer peer : peers) { - if (peer.getPriority() > currPriority) { - higherPriorityPeers.add(peer.getId()); - } - } - - return higherPriorityPeers; + final Optional priority = Optional.ofNullable(conf.getPeer(server.getId())) + .map(RaftPeer::getPriority); + return conf.getAllPeers().stream() + .filter(peer -> priority.filter(p -> peer.getPriority() > p).isPresent()) + .map(RaftPeer::getId) + .collect(Collectors.toSet()); } private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 03850114eb..3181625c97 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -912,17 +912,25 @@ private void yieldLeaderToHigherPriorityPeer() { } final RaftConfigurationImpl conf = server.getRaftConf(); - int leaderPriority = conf.getPeer(server.getId()).getPriority(); + final RaftPeer leader = conf.getPeer(server.getId()); + if (leader == null) { + LOG.error("{} the leader {} is not in the conf {}", this, server.getId(), conf); + return; + } + int leaderPriority = leader.getPriority(); for (LogAppender logAppender : senders.getSenders()) { - FollowerInfo followerInfo = logAppender.getFollower(); - RaftPeerId followerID = followerInfo.getPeer().getId(); - int followerPriority = conf.getPeer(followerID).getPriority(); - + final FollowerInfo followerInfo = logAppender.getFollower(); + final RaftPeerId followerID = followerInfo.getPeer().getId(); + final RaftPeer follower = conf.getPeer(followerID); + if (follower == null) { + LOG.error("{} the follower {} is not in the conf {}", this, server.getId(), conf); + continue; + } + final int followerPriority = follower.getPriority(); if (followerPriority <= leaderPriority) { continue; } - final TermIndex leaderLastEntry = server.getState().getLastEntry(); if (leaderLastEntry == null) { LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " + diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java index 1ef721aadc..2b7d96975f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java @@ -144,7 +144,11 @@ boolean decideVote(RaftPeer candidate, TermIndex candidateLastEntry) { } // Check priority - final int priority = impl.getRaftConf().getPeer(impl.getId()).getPriority(); + final RaftPeer peer = conf.getPeer(impl.getId()); + if (peer == null) { + return reject("our server " + impl.getId() + " is not in the conf " + conf); + } + final int priority = peer.getPriority(); if (priority <= candidate.getPriority()) { return log(true, "our priority " + priority + " <= candidate's priority " + candidate.getPriority()); } else {