Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ enum Phase {
ELECTION
}

enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}

private static class ResultAndTerm {
private final Result result;
Expand Down Expand Up @@ -331,6 +331,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,

switch (r.getResult()) {
case PASSED:
case SINGLE_MODE_PASSED:
return true;
case NOT_IN_CONF:
case SHUTDOWN:
Expand Down Expand Up @@ -379,6 +380,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());

while (waitForNum > 0 && shouldRun(electionTerm)) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
Expand All @@ -387,6 +389,9 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
// if some higher priority peer did not response when timeout, but candidate get majority,
// candidate pass vote
return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else if (singleMode) {
// if candidate is in single mode, candidate pass vote.
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
} else {
return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
}
Expand Down Expand Up @@ -418,7 +423,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
}

// If any peer with higher priority rejects vote, candidate can not pass vote
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) {
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) && !singleMode) {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}

Expand Down Expand Up @@ -447,6 +452,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else if (singleMode) {
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
} else {
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ Collection<RaftPeer> getOtherPeers(RaftPeerId selfId) {
return others;
}

/**
* @return true if the new peers number reaches half of new conf peers number or the group is
* changing from single mode to HA mode.
*/
boolean changeMajority(Collection<RaftPeer> newMembers) {
Preconditions.assertNull(oldConf, "oldConf");
final long newPeersCount = newMembers.stream().map(RaftPeer::getId).filter(id -> conf.getPeer(id) == null).count();

if (conf.size() == 1 && newMembers.size() == 2 && newPeersCount == 1) {
// Change from single peer to HA mode. This is a special case, skip majority verification.
return false;
}

// If newPeersCount reaches majority number of new conf size, the cluster may end with infinity
// election. See https://issues.apache.org/jira/browse/RATIS-1912 for more details.
final long oldPeersCount = newMembers.size() - newPeersCount;
return newPeersCount >= oldPeersCount;
}

/** @return True if the selfId is in single mode. */
boolean isSingleMode(RaftPeerId selfId) {
if (isStable()) {
return conf.size() == 1;
} else {
return oldConf.size() == 1 && oldConf.contains(selfId) && conf.size() == 2 && conf.contains(selfId);
}
}

/** @return true if the self id together with the others are in the majority. */
boolean hasMajority(Collection<RaftPeerId> others, RaftPeerId selfId) {
Preconditions.assertTrue(!others.contains(selfId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,10 @@ public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfiguration
pending.setReply(newSuccessReply(request));
return pending.getFuture();
}
if (current.changeMajority(serversInNewConf)) {
throw new SetConfigurationException("Failed to set configuration: request " + request
+ " changes a majority set of the current configuration " + current);
}

getRaftServer().addRaftPeers(serversInNewConf);
getRaftServer().addRaftPeers(listenersInNewConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
Expand Down Expand Up @@ -111,7 +112,8 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));

RaftServerTestUtil
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -242,7 +243,8 @@ private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Except
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));

RaftServerTestUtil
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
Expand Down Expand Up @@ -391,7 +393,8 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
RaftServerTestUtil
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);

Expand Down Expand Up @@ -478,7 +481,8 @@ private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws Exception
// add one new peer
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));

RaftServerTestUtil
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
Expand Down Expand Up @@ -556,7 +560,8 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));

RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);

Expand All @@ -567,8 +572,8 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
}

// Make sure each new peer got one snapshot notification.
Assert.assertEquals(2, numSnapshotRequests.get());
// Make sure each new peer got at least one snapshot notification.
Assert.assertTrue(2 <= numSnapshotRequests.get());
} finally {
cluster.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
Expand Down Expand Up @@ -140,7 +141,8 @@ void runTestRetryOnNewLeader(CLUSTER cluster) throws Exception {
RaftPeer[] allPeers = cluster.removePeers(2, true,
asList(change.newPeers)).allPeersInNewConf;
// trigger setConfiguration
cluster.setConfiguration(allPeers);
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(allPeers),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));

final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ public static <T extends Throwable> void runMultiGroupTest(
}
LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId()));
try (final RaftClient client = cluster.createClient(groups[chosen])) {
client.admin().setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
RaftServerTestUtil.runWithMinorityPeers(cluster, allPeers,
peers -> client.admin().setConfiguration(peers.toArray(RaftPeer.emptyArray())));
}

Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import static java.util.Arrays.asList;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
import static org.junit.Assert.assertThrows;

public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
Expand Down Expand Up @@ -145,6 +146,58 @@ public void testAddPeers() throws Exception {
});
}

/**
* Test leader election when changing cluster from single mode to HA mode.
*/
@Test
public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception {
runWithNewCluster(1, cluster -> {
RaftTestUtil.waitForLeader(cluster);

RaftGroupId groupId = cluster.getGroup().getGroupId();
RaftPeer curPeer = cluster.getGroup().getPeers().iterator().next();
RaftPeer newPeer = cluster.addNewPeers(1, true, true).newPeers[0];

RaftServerProxy leaderServer = cluster.getServer(curPeer.getId());

// Update leader conf to transitional single mode.
RaftConfigurationImpl oldNewConf = RaftConfigurationImpl.newBuilder()
.setOldConf(new PeerConfiguration(Arrays.asList(curPeer)))
.setConf(new PeerConfiguration(Arrays.asList(curPeer, newPeer)))
.setLogEntryIndex(Long.MAX_VALUE / 2)
.build();
Assert.assertTrue(oldNewConf.isSingleMode(curPeer.getId()));
RaftServerTestUtil.setRaftConf(leaderServer, groupId, oldNewConf);
try(RaftClient client = cluster.createClient()) {
client.admin().transferLeadership(null, leaderServer.getId(), 1000);
}

final RaftServer.Division newLeader = RaftTestUtil.waitForLeader(cluster);
Assert.assertEquals(leaderServer.getId(), newLeader.getId());
Assert.assertEquals(oldNewConf, newLeader.getRaftConf());
});
}

@Test
public void testChangeMajority() throws Exception {
runWithNewCluster(1, cluster -> {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();

try (final RaftClient client = cluster.createClient(leaderId)) {
final PeerChanges c1 = cluster.addNewPeers(2, true);

SetConfigurationRequest.Arguments arguments = SetConfigurationRequest.Arguments.newBuilder()
.setServersInCurrentConf(cluster.getPeers())
.setServersInNewConf(c1.allPeersInNewConf)
.setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
.build();
assertThrows("Expect change majority error.", SetConfigurationException.class,
() -> client.admin().setConfiguration(arguments));
}
});
}

/**
* remove 2 peers (5 peers -> 3 peers), no leader change
*/
Expand Down Expand Up @@ -380,7 +433,17 @@ public void testBootstrapReconfWithSingleNodeAddOne() throws Exception {
@Test
public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception {
// originally 1 peer, add 2 more
runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster));
runWithNewCluster(1, cluster -> {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();

try (final RaftClient client = cluster.createClient(leaderId)) {
final PeerChanges c1 = cluster.addNewPeers(2, true);

assertThrows("Expect change majority error.", SetConfigurationException.class,
() -> client.admin().setConfiguration(c1.allPeersInNewConf));
}
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
Expand All @@ -35,16 +36,22 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class RaftServerTestUtil {
Expand Down Expand Up @@ -135,6 +142,10 @@ public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer> peers)
return RaftConfigurationImpl.newBuilder().setConf(peers).build();
}

public static void setRaftConf(RaftServer proxy, RaftGroupId groupId, RaftConfiguration conf) {
((RaftServerImpl)getDivision(proxy, groupId)).getState().setRaftConf(conf);
}

public static RaftServerRpc getServerRpc(RaftServer.Division server) {
return ((RaftServerImpl)server).getRaftServer().getServerRpc();
}
Expand Down Expand Up @@ -196,4 +207,40 @@ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, D
public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId peerId) {
return ((RaftConfigurationImpl)config).isHighestPriority(peerId);
}

public static void runWithMinorityPeers(MiniRaftCluster cluster, Collection<RaftPeer> peersInNewConf,
CheckedConsumer<Collection<RaftPeer>, IOException> consumer) throws IOException {
Collection<RaftPeer> peers = parseMinorityPeers(cluster, peersInNewConf);
while (peers != null) {
consumer.accept(peers);
peers = parseMinorityPeers(cluster, peersInNewConf);
}
}

private static Collection<RaftPeer> parseMinorityPeers(MiniRaftCluster cluster, Collection<RaftPeer> peersInNewConf) {
RaftConfigurationImpl conf = (RaftConfigurationImpl) cluster.getLeader().getRaftConf();
Set<RaftPeer> peers = new HashSet<>(conf.getCurrentPeers());

// Add new peers to construct minority conf peers.
List<RaftPeer> peersToAdd = peersInNewConf.stream().filter(
peer -> !conf.containsInConf(peer.getId(), RaftProtos.RaftPeerRole.FOLLOWER)).collect(Collectors.toList());
if (!peersToAdd.isEmpty()) {
for (RaftPeer peer : peersToAdd) {
if (peers.add(peer) && conf.changeMajority(peers)) {
peers.remove(peer);
break;
}
}
return peers;
}

// All new peers has been added. Handle the removed peers.
List<RaftPeer> peersToRemove = peers.stream().filter(peer -> !peersInNewConf.contains(peer)).collect(Collectors.toList());
if (!peersToRemove.isEmpty()) {
return peersInNewConf;
}

// The peers in new conf are the same as current conf, return null.
return null;
}
}
Loading