From 861cbdf33cd481886088a9de2db6c82e664773c3 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Thu, 26 Oct 2023 16:28:51 +0800 Subject: [PATCH 01/10] v0.1 --- .../ratis/server/impl/LeaderElection.java | 3 + .../server/impl/RaftConfigurationImpl.java | 34 ++++++ .../ratis/server/impl/RaftServerImpl.java | 4 + .../ratis/server/impl/RaftServerProxy.java | 18 ++- .../impl/RaftReconfigurationBaseTest.java | 103 +++++++++++++----- .../server/impl/TestRaftConfiguration.java | 94 +++++++++++++++- 6 files changed, 221 insertions(+), 35 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 302f848719..b2d0778300 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -387,6 +387,9 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt // if some higher priority peer did not response when timeout, but candidate get majority, // candidate pass vote return logAndReturn(phase, Result.PASSED, responses, exceptions); + } else if (conf.isSingleMode(server.getId())) { + // if candidate is in single mode, candidate pass vote. + return logAndReturn(phase, Result.PASSED, responses, exceptions); } else { return logAndReturn(phase, Result.TIMEOUT, responses, exceptions); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index aba5ae176e..837dafcef9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -232,6 +232,40 @@ Collection getOtherPeers(RaftPeerId selfId) { return others; } + /** + * @return true if the changed peers number reaches majority or the group is changing from single + * mode to HA mode. + */ + boolean changeMajority(Collection newMembers) { + Preconditions.assertNull(oldConf, "Conf must be stable."); + int newPeersCount = 0; + for (RaftPeer peer : newMembers) { + final RaftPeer inConf = conf.getPeer(peer.getId()); + if (inConf == null) { + newPeersCount++; + } + } + + int removedPeersCount = conf.size() - newMembers.size() + newPeersCount; + int changed = Math.max(newPeersCount, removedPeersCount); + + if (conf.size() == 1 && newMembers.size() == 2 && changed == 1) { + // Change from single peer to HA mode. This is a special case, skip majority verification. + return false; + } + return changed > (conf.size() / 2); + } + + /** @return True if the selfId is in single mode. */ + boolean isSingleMode(RaftPeerId selfId) { + if (isStable()) { + return conf.size() == 1; + } else { + return oldConf.size() == 1 && oldConf.contains(selfId) && conf.size() == 2 && conf.contains( + selfId); + } + } + /** @return true if the self id together with the others are in the majority. */ boolean hasMajority(Collection others, RaftPeerId selfId) { Preconditions.assertTrue(!others.contains(selfId)); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 667e611b42..bc0c443884 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1311,6 +1311,10 @@ public CompletableFuture setConfigurationAsync(SetConfiguration pending.setReply(newSuccessReply(request)); return pending.getFuture(); } + if (current.changeMajority(serversInNewConf)) { + throw new SetConfigurationException("Failed to set configuration: request " + request + + " changes a majority set of the current configuration " + current); + } getRaftServer().addRaftPeers(serversInNewConf); getRaftServer().addRaftPeers(listenersInNewConf); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index fd80d6938f..8d32f55f23 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -41,6 +41,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.ServerFactory; import org.apache.ratis.server.storage.RaftStorage.StartupOption; +import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.ConcurrentUtils; import org.apache.ratis.util.JvmPauseMonitor; import org.apache.ratis.server.RaftServerConfigKeys; @@ -368,7 +369,7 @@ private RaftServerImpl getImpl(RaftRpcRequestProto proto) throws IOException { return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId())); } - private RaftServerImpl getImpl(RaftGroupId groupId) throws IOException { + public RaftServerImpl getImpl(RaftGroupId groupId) throws IOException { Objects.requireNonNull(groupId, "groupId == null"); return IOUtils.getFromFuture(getImplFuture(groupId), this::getId); } @@ -391,6 +392,21 @@ public LifeCycle.State getLifeCycleState() { return lifeCycle.getCurrentState(); } + @VisibleForTesting + void setRaftConf(RaftGroupId groupId, RaftConfigurationImpl conf) throws IOException { + getImpl(groupId).getState().setRaftConf(conf); + } + + @VisibleForTesting + void triggerElection(RaftGroupId groupId) throws IOException { + RaftServerImpl impl = getImpl(groupId); + Preconditions.assertTrue(impl.getInfo().isLeader(), + "Only leader can perform trigger election."); + + long newTerm = impl.getState().getCurrentTerm() + 1; + impl.changeToFollowerAndPersistMetadata(newTerm, true, "Trigger election for test."); + } + ThreadGroup getThreadGroup() { return threadGroup; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index b43c696c73..a3d253d44d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -61,6 +61,8 @@ import static java.util.Arrays.asList; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; public abstract class RaftReconfigurationBaseTest extends BaseTest @@ -127,7 +129,7 @@ public void testRestorePriority() throws Exception { } /** - * add 2 new peers (3 peers -> 5 peers), no leader change + * add 1 new peers (3 peers -> 4 peers), no leader change */ @Test public void testAddPeers() throws Exception { @@ -135,7 +137,7 @@ public void testAddPeers() throws Exception { RaftTestUtil.waitForLeader(cluster); // add new peers - RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf; + RaftPeer[] allPeers = cluster.addNewPeers(1, true).allPeersInNewConf; // trigger setConfiguration cluster.setConfiguration(allPeers); @@ -145,6 +147,37 @@ public void testAddPeers() throws Exception { }); } + /** + * Test leader election when changing cluster from single mode to HA mode. + */ + @Test + public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception { + runWithNewCluster(1, cluster -> { + RaftTestUtil.waitForLeader(cluster); + + RaftGroup group = cluster.getGroup(); + RaftGroupId groupId = group.getGroupId(); + + RaftPeer curPeer = cluster.getGroup().getPeers().iterator().next(); + RaftPeer newPeer = RaftPeer.newBuilder() + .setId(RaftPeerId.valueOf("testChangeFromSingleToHA")).build(); + + RaftServerProxy leaderServer = cluster.getServer(curPeer.getId()); + + // Update leader conf to transitional single mode. + RaftConfigurationImpl oldNewConf = RaftConfigurationImpl.newBuilder() + .setOldConf(new PeerConfiguration(Arrays.asList(curPeer))) + .setConf(new PeerConfiguration(Arrays.asList(curPeer, newPeer))) + .setLogEntryIndex(Long.MAX_VALUE / 2) + .build(); + assertTrue(oldNewConf.isSingleMode(curPeer.getId())); + leaderServer.setRaftConf(groupId, oldNewConf); + leaderServer.triggerElection(groupId); + + RaftTestUtil.waitForLeader(cluster); + }); + } + /** * remove 2 peers (5 peers -> 3 peers), no leader change */ @@ -206,13 +239,13 @@ private void runTestSetConfigurationInAddMode(CLUSTER cluster) throws Exception try (final RaftClient client = cluster.createClient(leader.getId())) { for (int i = 0; i < 10; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); } RaftClientReply reply = client.admin().setConfiguration( SetConfigurationRequest.Arguments.newBuilder() .setServersInNewConf(peers) .setMode(SetConfigurationRequest.Mode.ADD).build()); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); } cluster.close(); @@ -233,7 +266,7 @@ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception try (final RaftClient client = cluster.createClient(leader.getId())) { for (int i = 0; i < 10; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); } testFailureCase("Can't set configuration in CAS mode ", @@ -250,7 +283,7 @@ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception .setServersInCurrentConf(oldPeers) .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET) .build()); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); } cluster.close(); @@ -260,7 +293,7 @@ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception @Test(timeout = 30000) public void testReconfTwice() throws Exception { - runWithNewCluster(3, this::runTestReconfTwice); + runWithNewCluster(5, this::runTestReconfTwice); } void runTestReconfTwice(CLUSTER cluster) throws Exception { @@ -270,7 +303,7 @@ void runTestReconfTwice(CLUSTER cluster) throws Exception { // submit some msgs before reconf for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); } final AtomicBoolean reconf1 = new AtomicBoolean(false); @@ -304,8 +337,8 @@ void runTestReconfTwice(CLUSTER cluster) throws Exception { clientThread.start(); latch.await(); - Assert.assertTrue(reconf1.get()); - Assert.assertTrue(reconf2.get()); + assertTrue(reconf1.get()); + assertTrue(reconf2.get()); waitAndCheckNewConf(cluster, finalPeers.get(), 2, null); final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId(); @@ -325,7 +358,7 @@ void runTestReconfTwice(CLUSTER cluster) throws Exception { @Test public void testReconfTimeout() throws Exception { // originally 3 peers - runWithNewCluster(3, this::runTestReconfTimeout); + runWithNewCluster(5, this::runTestReconfTimeout); } void runTestReconfTimeout(CLUSTER cluster) throws Exception { @@ -345,7 +378,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { RaftClientReply reply = sender.sendRequest(request); Assert.fail("did not get expected exception " + reply.toString()); } catch (IOException e) { - Assert.assertTrue("Got exception " + e, + assertTrue("Got exception " + e, e instanceof ReconfigurationTimeoutException); } @@ -358,7 +391,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { sender.sendRequest(request); Assert.fail("did not get expected exception"); } catch (IOException e) { - Assert.assertTrue("Got exception " + e, + assertTrue("Got exception " + e, e instanceof ReconfigurationTimeoutException); } @@ -367,7 +400,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { for (RaftPeer np : c1.newPeers) { cluster.restartServer(np.getId(), false); } - Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess()); + assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess()); } } @@ -380,13 +413,23 @@ public void testBootstrapReconfWithSingleNodeAddOne() throws Exception { @Test public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception { // originally 1 peer, add 2 more - runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster)); + runWithNewCluster(1, cluster -> { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + try (final RaftClient client = cluster.createClient(leaderId)) { + final PeerChanges c1 = cluster.addNewPeers(2, true); + + assertThrows("Expect change majority error.", SetConfigurationException.class, + () -> client.admin().setConfiguration(c1.allPeersInNewConf)); + } + }); } @Test public void testBootstrapReconf() throws Exception { // originally 3 peers, add 2 more - runWithNewCluster(3, cluster -> runTestBootstrapReconf(2, true, cluster)); + runWithNewCluster(3, cluster -> runTestBootstrapReconf(1, true, cluster)); } void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluster) throws Exception { @@ -399,7 +442,7 @@ void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluste // submit some msgs before reconf for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); } final PeerChanges c1 = cluster.addNewPeers(numNewPeer, startNewPeer); @@ -493,7 +536,7 @@ void runTestKillLeaderDuringReconf(CLUSTER cluster) throws Exception { LOG.info("{}", LogProtoUtils.toLogEntryString(e)); } final long commitIndex = leaderLog.getLastCommittedIndex(); - Assert.assertTrue("commitIndex = " + commitIndex + " > 2", commitIndex <= 2); + assertTrue("commitIndex = " + commitIndex + " > 2", commitIndex <= 2); } final RaftPeerId killed = RaftTestUtil.waitAndKillLeader(cluster); @@ -549,11 +592,11 @@ void runTestNoChangeRequest(CLUSTER cluster) throws Exception { // no real configuration change in the request final RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray())); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); final long newCommittedIndex = leaderLog.getLastCommittedIndex(); for(long i = committedIndex + 1; i <= newCommittedIndex; i++) { final LogEntryProto e = leaderLog.get(i); - Assert.assertTrue(e.hasMetadataEntry()); + assertTrue(e.hasMetadataEntry()); } Assert.assertSame(confBefore, cluster.getLeader().getRaftConf()); } @@ -587,7 +630,7 @@ void runTestOverlappedSetConfRequests(CLUSTER cluster) throws Exception { final RaftPeerId leaderId = cluster.getLeader().getId(); - RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf; + RaftPeer[] newPeers = cluster.addNewPeers(1, true).allPeersInNewConf; // delay every peer's logSync so that the setConf request is delayed cluster.getPeers() @@ -625,8 +668,8 @@ void runTestOverlappedSetConfRequests(CLUSTER cluster) throws Exception { for (int i = 0; i < 10 && !confChanged.get(); i++) { Thread.sleep(1000); } - Assert.assertTrue(confChanged.get()); - Assert.assertTrue(caughtException.get()); + assertTrue(confChanged.get()); + assertTrue(caughtException.get()); } finally { logSyncDelay.clear(); } @@ -692,11 +735,11 @@ void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception { // wait till the old leader persist the new conf JavaUtils.attemptRepeatedly(() -> { - Assert.assertTrue(log.getFlushIndex() >= confIndex); + assertTrue(log.getFlushIndex() >= confIndex); return null; }, 10, sleepTime, "FLUSH", LOG); final long committed = log.getLastCommittedIndex(); - Assert.assertTrue(committed < confIndex); + assertTrue(committed < confIndex); // unblock the old leader BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString()); @@ -704,14 +747,14 @@ void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception { // the client should get NotLeaderException clientThread.join(5000); - Assert.assertTrue(gotNotLeader.get()); + assertTrue(gotNotLeader.get()); // the old leader should have truncated the setConf from the log JavaUtils.attemptRepeatedly(() -> { - Assert.assertTrue(log.getLastCommittedIndex() >= confIndex); + assertTrue(log.getLastCommittedIndex() >= confIndex); return null; }, 10, ONE_SECOND, "COMMIT", LOG); - Assert.assertTrue(log.get(confIndex).hasConfigurationEntry()); + assertTrue(log.get(confIndex).hasConfigurationEntry()); log2 = null; } finally { RaftStorageTestUtils.printLog(log2, s -> LOG.info(s)); @@ -767,8 +810,8 @@ public void testLeaderNotReadyException() throws Exception { for (int i = 0; !success.get() && i < 5; i++) { Thread.sleep(1000); } - Assert.assertTrue(success.get()); - Assert.assertTrue(caughtNotReady.get()); + assertTrue(success.get()); + assertTrue(caughtNotReady.get()); } finally { leaderPlaceHolderDelay.clear(); cluster.shutdown(); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java index a5470787c0..003b812ecc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java @@ -22,13 +22,14 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftConfiguration; -import org.junit.Assert; import org.junit.Test; import java.util.Arrays; import java.util.Collection; import java.util.stream.Collectors; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestRaftConfiguration extends BaseTest { @Test @@ -42,15 +43,15 @@ public void testIsHighestPriority() { RaftProtos.RaftPeerRole.FOLLOWER).size()]); // First member should not have highest priority - Assert.assertFalse(RaftServerTestUtil.isHighestPriority(config, + assertFalse(RaftServerTestUtil.isHighestPriority(config, allRaftPeers[0].getId())); // Last member should have highest priority - Assert.assertTrue(RaftServerTestUtil.isHighestPriority(config, + assertTrue(RaftServerTestUtil.isHighestPriority(config, allRaftPeers[allRaftPeers.length - 1].getId())); // Should return false for non existent peer id - Assert.assertFalse(RaftServerTestUtil.isHighestPriority(config, RaftPeerId.valueOf("123456789"))); + assertFalse(RaftServerTestUtil.isHighestPriority(config, RaftPeerId.valueOf("123456789"))); } private Collection raftPeersWithPriority(Integer... voters) { @@ -58,4 +59,89 @@ private Collection raftPeersWithPriority(Integer... voters) { .map(id -> RaftPeer.newBuilder().setPriority(id).setId(id.toString()).build()) .collect(Collectors.toSet()); } + + @Test + public void testSingleMode() { + RaftConfigurationImpl config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1))) + .build(); + assertTrue("Peer is in single mode.", config.isSingleMode(RaftPeerId.valueOf("1"))); + + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(0, 1))) + .setOldConf(new PeerConfiguration(raftPeersWithPriority(0))) + .build(); + assertTrue("Peer is in single mode.", config.isSingleMode(RaftPeerId.valueOf("0"))); + assertFalse("Peer is a new peer.", config.isSingleMode(RaftPeerId.valueOf("1"))); + + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(0, 1))) + .build(); + assertFalse("Peer is in ha mode.", config.isSingleMode(RaftPeerId.valueOf("0"))); + assertFalse("Peer is in ha mode.", config.isSingleMode(RaftPeerId.valueOf("1"))); + + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(0, 1))) + .setOldConf(new PeerConfiguration(raftPeersWithPriority(2, 3))) + .build(); + assertFalse("Peer is in ha mode.", config.isSingleMode(RaftPeerId.valueOf("0"))); + assertFalse("Peer is in ha mode.", config.isSingleMode(RaftPeerId.valueOf("1"))); + assertFalse("Peer is in ha mode.", config.isSingleMode(RaftPeerId.valueOf("3"))); + assertFalse("Peer is in ha mode.", config.isSingleMode(RaftPeerId.valueOf("4"))); + } + + @Test + public void testChangeMajority() { + // Case 1: {1} --> {1, 2}. + RaftConfigurationImpl config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1))) + .build(); + assertFalse("Change from single mode to ha mode is not considered as changing majority.", + config.changeMajority(raftPeersWithPriority(1, 2))); + + // Case 2: {1} --> {2}. + assertTrue(config.changeMajority(raftPeersWithPriority(2))); + + // Case 3: {1, 2, 3} --> {1, 2, 4, 5}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3))) + .build(); + assertTrue(config.changeMajority(raftPeersWithPriority(1, 2, 4, 5))); + + // Case 4: {1, 2, 3} --> {1, 4, 5}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3))) + .build(); + assertTrue(config.changeMajority(raftPeersWithPriority(1, 4, 5))); + + // Case 5: {1, 2, 3} --> {1, 2, 4, 5}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3))) + .build(); + assertTrue(config.changeMajority(raftPeersWithPriority(1, 2, 4, 5))); + + // Case 6: {1, 2, 3, 4, 5} --> {1, 2}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3, 4, 5))) + .build(); + assertTrue(config.changeMajority(raftPeersWithPriority(1, 2))); + + // Case 7: {1, 2, 3, 4, 5} --> {1, 2, 3}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3, 4, 5))) + .build(); + assertFalse(config.changeMajority(raftPeersWithPriority(1, 2, 3))); + + // Case 8: {1, 2, 3} --> {1, 2, 3, 4}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3))) + .build(); + assertFalse(config.changeMajority(raftPeersWithPriority(1, 2, 3, 4))); + + // Case 9: {1, 2, 3, 4, 5} --> {1, 2, 3, 4, 6, 7}. + config = RaftConfigurationImpl.newBuilder() + .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3, 4, 5))) + .build(); + assertFalse(config.changeMajority(raftPeersWithPriority(1, 2, 3, 4, 6, 7))); + } } \ No newline at end of file From 8c0076089aaa564f134ccd83bb174eda985f333a Mon Sep 17 00:00:00 2001 From: lijinglun Date: Fri, 27 Oct 2023 14:31:40 +0800 Subject: [PATCH 02/10] v0.2 --- .../ratis/server/impl/LeaderElection.java | 9 ++- .../server/impl/RaftConfigurationImpl.java | 12 ++-- .../impl/RaftReconfigurationBaseTest.java | 68 +++++++++---------- .../server/impl/TestRaftConfiguration.java | 6 +- 4 files changed, 47 insertions(+), 48 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index b2d0778300..197b21a72c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -51,6 +51,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.Set; import java.util.stream.Collectors; @@ -379,6 +380,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt Collection votedPeers = new ArrayList<>(); Collection rejectedPeers = new ArrayList<>(); Set higherPriorityPeers = getHigherPriorityPeers(conf); + final boolean singleMode = conf.isSingleMode(server.getId()); while (waitForNum > 0 && shouldRun(electionTerm)) { final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n); @@ -387,7 +389,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt // if some higher priority peer did not response when timeout, but candidate get majority, // candidate pass vote return logAndReturn(phase, Result.PASSED, responses, exceptions); - } else if (conf.isSingleMode(server.getId())) { + } else if (singleMode) { // if candidate is in single mode, candidate pass vote. return logAndReturn(phase, Result.PASSED, responses, exceptions); } else { @@ -421,7 +423,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt } // If any peer with higher priority rejects vote, candidate can not pass vote - if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) { + if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) + && !singleMode) { return logAndReturn(phase, Result.REJECTED, responses, exceptions); } @@ -450,6 +453,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt // received all the responses if (conf.hasMajority(votedPeers, server.getId())) { return logAndReturn(phase, Result.PASSED, responses, exceptions); + } else if (singleMode) { + return logAndReturn(phase, Result.PASSED, responses, exceptions); } else { return logAndReturn(phase, Result.REJECTED, responses, exceptions); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index 837dafcef9..d0a0224ba0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -233,8 +233,8 @@ Collection getOtherPeers(RaftPeerId selfId) { } /** - * @return true if the changed peers number reaches majority or the group is changing from single - * mode to HA mode. + * @return true if the new peers number reaches half of new peers number or the group is changing + * from single mode to HA mode. */ boolean changeMajority(Collection newMembers) { Preconditions.assertNull(oldConf, "Conf must be stable."); @@ -246,14 +246,12 @@ boolean changeMajority(Collection newMembers) { } } - int removedPeersCount = conf.size() - newMembers.size() + newPeersCount; - int changed = Math.max(newPeersCount, removedPeersCount); - - if (conf.size() == 1 && newMembers.size() == 2 && changed == 1) { + if (conf.size() == 1 && newMembers.size() == 2 && newPeersCount == 1) { // Change from single peer to HA mode. This is a special case, skip majority verification. return false; } - return changed > (conf.size() / 2); + // Make sure peers in conf don't need new peer's vote to reach majority. + return newPeersCount >= newMembers.size() / 2 + newMembers.size() % 2; } /** @return True if the selfId is in single mode. */ diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index a3d253d44d..f1ac5777db 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -62,7 +62,6 @@ import static java.util.Arrays.asList; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; public abstract class RaftReconfigurationBaseTest extends BaseTest @@ -129,7 +128,7 @@ public void testRestorePriority() throws Exception { } /** - * add 1 new peers (3 peers -> 4 peers), no leader change + * add 2 new peers (3 peers -> 5 peers), no leader change */ @Test public void testAddPeers() throws Exception { @@ -137,7 +136,7 @@ public void testAddPeers() throws Exception { RaftTestUtil.waitForLeader(cluster); // add new peers - RaftPeer[] allPeers = cluster.addNewPeers(1, true).allPeersInNewConf; + RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf; // trigger setConfiguration cluster.setConfiguration(allPeers); @@ -155,12 +154,9 @@ public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception { runWithNewCluster(1, cluster -> { RaftTestUtil.waitForLeader(cluster); - RaftGroup group = cluster.getGroup(); - RaftGroupId groupId = group.getGroupId(); - + RaftGroupId groupId = cluster.getGroup().getGroupId(); RaftPeer curPeer = cluster.getGroup().getPeers().iterator().next(); - RaftPeer newPeer = RaftPeer.newBuilder() - .setId(RaftPeerId.valueOf("testChangeFromSingleToHA")).build(); + RaftPeer newPeer = cluster.addNewPeers(1, true, true).newPeers[0]; RaftServerProxy leaderServer = cluster.getServer(curPeer.getId()); @@ -170,7 +166,7 @@ public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception { .setConf(new PeerConfiguration(Arrays.asList(curPeer, newPeer))) .setLogEntryIndex(Long.MAX_VALUE / 2) .build(); - assertTrue(oldNewConf.isSingleMode(curPeer.getId())); + Assert.assertTrue(oldNewConf.isSingleMode(curPeer.getId())); leaderServer.setRaftConf(groupId, oldNewConf); leaderServer.triggerElection(groupId); @@ -239,13 +235,13 @@ private void runTestSetConfigurationInAddMode(CLUSTER cluster) throws Exception try (final RaftClient client = cluster.createClient(leader.getId())) { for (int i = 0; i < 10; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); } RaftClientReply reply = client.admin().setConfiguration( SetConfigurationRequest.Arguments.newBuilder() .setServersInNewConf(peers) .setMode(SetConfigurationRequest.Mode.ADD).build()); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); } cluster.close(); @@ -266,7 +262,7 @@ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception try (final RaftClient client = cluster.createClient(leader.getId())) { for (int i = 0; i < 10; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); } testFailureCase("Can't set configuration in CAS mode ", @@ -283,7 +279,7 @@ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception .setServersInCurrentConf(oldPeers) .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET) .build()); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); } cluster.close(); @@ -293,7 +289,7 @@ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception @Test(timeout = 30000) public void testReconfTwice() throws Exception { - runWithNewCluster(5, this::runTestReconfTwice); + runWithNewCluster(3, this::runTestReconfTwice); } void runTestReconfTwice(CLUSTER cluster) throws Exception { @@ -303,7 +299,7 @@ void runTestReconfTwice(CLUSTER cluster) throws Exception { // submit some msgs before reconf for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); } final AtomicBoolean reconf1 = new AtomicBoolean(false); @@ -337,8 +333,8 @@ void runTestReconfTwice(CLUSTER cluster) throws Exception { clientThread.start(); latch.await(); - assertTrue(reconf1.get()); - assertTrue(reconf2.get()); + Assert.assertTrue(reconf1.get()); + Assert.assertTrue(reconf2.get()); waitAndCheckNewConf(cluster, finalPeers.get(), 2, null); final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId(); @@ -358,7 +354,7 @@ void runTestReconfTwice(CLUSTER cluster) throws Exception { @Test public void testReconfTimeout() throws Exception { // originally 3 peers - runWithNewCluster(5, this::runTestReconfTimeout); + runWithNewCluster(3, this::runTestReconfTimeout); } void runTestReconfTimeout(CLUSTER cluster) throws Exception { @@ -378,7 +374,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { RaftClientReply reply = sender.sendRequest(request); Assert.fail("did not get expected exception " + reply.toString()); } catch (IOException e) { - assertTrue("Got exception " + e, + Assert.assertTrue("Got exception " + e, e instanceof ReconfigurationTimeoutException); } @@ -391,7 +387,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { sender.sendRequest(request); Assert.fail("did not get expected exception"); } catch (IOException e) { - assertTrue("Got exception " + e, + Assert.assertTrue("Got exception " + e, e instanceof ReconfigurationTimeoutException); } @@ -400,7 +396,7 @@ void runTestReconfTimeout(CLUSTER cluster) throws Exception { for (RaftPeer np : c1.newPeers) { cluster.restartServer(np.getId(), false); } - assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess()); + Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess()); } } @@ -429,7 +425,7 @@ public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception { @Test public void testBootstrapReconf() throws Exception { // originally 3 peers, add 2 more - runWithNewCluster(3, cluster -> runTestBootstrapReconf(1, true, cluster)); + runWithNewCluster(3, cluster -> runTestBootstrapReconf(2, true, cluster)); } void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluster) throws Exception { @@ -442,7 +438,7 @@ void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluste // submit some msgs before reconf for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) { RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); } final PeerChanges c1 = cluster.addNewPeers(numNewPeer, startNewPeer); @@ -536,7 +532,7 @@ void runTestKillLeaderDuringReconf(CLUSTER cluster) throws Exception { LOG.info("{}", LogProtoUtils.toLogEntryString(e)); } final long commitIndex = leaderLog.getLastCommittedIndex(); - assertTrue("commitIndex = " + commitIndex + " > 2", commitIndex <= 2); + Assert.assertTrue("commitIndex = " + commitIndex + " > 2", commitIndex <= 2); } final RaftPeerId killed = RaftTestUtil.waitAndKillLeader(cluster); @@ -592,11 +588,11 @@ void runTestNoChangeRequest(CLUSTER cluster) throws Exception { // no real configuration change in the request final RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray())); - assertTrue(reply.isSuccess()); + Assert.assertTrue(reply.isSuccess()); final long newCommittedIndex = leaderLog.getLastCommittedIndex(); for(long i = committedIndex + 1; i <= newCommittedIndex; i++) { final LogEntryProto e = leaderLog.get(i); - assertTrue(e.hasMetadataEntry()); + Assert.assertTrue(e.hasMetadataEntry()); } Assert.assertSame(confBefore, cluster.getLeader().getRaftConf()); } @@ -630,7 +626,7 @@ void runTestOverlappedSetConfRequests(CLUSTER cluster) throws Exception { final RaftPeerId leaderId = cluster.getLeader().getId(); - RaftPeer[] newPeers = cluster.addNewPeers(1, true).allPeersInNewConf; + RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf; // delay every peer's logSync so that the setConf request is delayed cluster.getPeers() @@ -668,8 +664,8 @@ void runTestOverlappedSetConfRequests(CLUSTER cluster) throws Exception { for (int i = 0; i < 10 && !confChanged.get(); i++) { Thread.sleep(1000); } - assertTrue(confChanged.get()); - assertTrue(caughtException.get()); + Assert.assertTrue(confChanged.get()); + Assert.assertTrue(caughtException.get()); } finally { logSyncDelay.clear(); } @@ -735,11 +731,11 @@ void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception { // wait till the old leader persist the new conf JavaUtils.attemptRepeatedly(() -> { - assertTrue(log.getFlushIndex() >= confIndex); + Assert.assertTrue(log.getFlushIndex() >= confIndex); return null; }, 10, sleepTime, "FLUSH", LOG); final long committed = log.getLastCommittedIndex(); - assertTrue(committed < confIndex); + Assert.assertTrue(committed < confIndex); // unblock the old leader BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString()); @@ -747,14 +743,14 @@ void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception { // the client should get NotLeaderException clientThread.join(5000); - assertTrue(gotNotLeader.get()); + Assert.assertTrue(gotNotLeader.get()); // the old leader should have truncated the setConf from the log JavaUtils.attemptRepeatedly(() -> { - assertTrue(log.getLastCommittedIndex() >= confIndex); + Assert.assertTrue(log.getLastCommittedIndex() >= confIndex); return null; }, 10, ONE_SECOND, "COMMIT", LOG); - assertTrue(log.get(confIndex).hasConfigurationEntry()); + Assert.assertTrue(log.get(confIndex).hasConfigurationEntry()); log2 = null; } finally { RaftStorageTestUtils.printLog(log2, s -> LOG.info(s)); @@ -810,8 +806,8 @@ public void testLeaderNotReadyException() throws Exception { for (int i = 0; !success.get() && i < 5; i++) { Thread.sleep(1000); } - assertTrue(success.get()); - assertTrue(caughtNotReady.get()); + Assert.assertTrue(success.get()); + Assert.assertTrue(caughtNotReady.get()); } finally { leaderPlaceHolderDelay.clear(); cluster.shutdown(); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java index 003b812ecc..14e0030e6d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestRaftConfiguration.java @@ -114,17 +114,17 @@ public void testChangeMajority() { .build(); assertTrue(config.changeMajority(raftPeersWithPriority(1, 4, 5))); - // Case 5: {1, 2, 3} --> {1, 2, 4, 5}. + // Case 5: {1, 2, 3} --> {1, 2, 3, 4, 5}. config = RaftConfigurationImpl.newBuilder() .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3))) .build(); - assertTrue(config.changeMajority(raftPeersWithPriority(1, 2, 4, 5))); + assertFalse(config.changeMajority(raftPeersWithPriority(1, 2, 3, 4, 5))); // Case 6: {1, 2, 3, 4, 5} --> {1, 2}. config = RaftConfigurationImpl.newBuilder() .setConf(new PeerConfiguration(raftPeersWithPriority(1, 2, 3, 4, 5))) .build(); - assertTrue(config.changeMajority(raftPeersWithPriority(1, 2))); + assertFalse(config.changeMajority(raftPeersWithPriority(1, 2))); // Case 7: {1, 2, 3, 4, 5} --> {1, 2, 3}. config = RaftConfigurationImpl.newBuilder() From ac6de8fa23f91151db715abaec96a05adfa954e4 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Mon, 30 Oct 2023 11:13:15 +0800 Subject: [PATCH 03/10] v0.3 --- .../apache/ratis/server/impl/RaftConfigurationImpl.java | 7 ++++--- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index d0a0224ba0..40f8abfeff 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -233,8 +233,8 @@ Collection getOtherPeers(RaftPeerId selfId) { } /** - * @return true if the new peers number reaches half of new peers number or the group is changing - * from single mode to HA mode. + * @return true if the new peers number reaches half of new conf peers number or the group is + * changing from single mode to HA mode. */ boolean changeMajority(Collection newMembers) { Preconditions.assertNull(oldConf, "Conf must be stable."); @@ -250,7 +250,8 @@ boolean changeMajority(Collection newMembers) { // Change from single peer to HA mode. This is a special case, skip majority verification. return false; } - // Make sure peers in conf don't need new peer's vote to reach majority. + // If newPeersCount reaches majority number of new conf size, the cluster may end with infinity + // election. See https://issues.apache.org/jira/browse/RATIS-1912 for more details. return newPeersCount >= newMembers.size() / 2 + newMembers.size() % 2; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index bc0c443884..44e254f3bf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1311,7 +1311,8 @@ public CompletableFuture setConfigurationAsync(SetConfiguration pending.setReply(newSuccessReply(request)); return pending.getFuture(); } - if (current.changeMajority(serversInNewConf)) { + if (arguments.getMode() != SetConfigurationRequest.Mode.SET_UNCONDITIONALLY + && current.changeMajority(serversInNewConf)) { throw new SetConfigurationException("Failed to set configuration: request " + request + " changes a majority set of the current configuration " + current); } From 88aa0af1d1c19ceaaa8db86fc5d4f3e6cd42e9c7 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Mon, 30 Oct 2023 13:46:33 +0800 Subject: [PATCH 04/10] v0.4 --- .../impl/RaftReconfigurationBaseTest.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index f1ac5777db..c68a82a450 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -174,6 +174,26 @@ public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception { }); } + @Test + public void testChangeMajority() throws Exception { + runWithNewCluster(1, cluster -> { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + try (final RaftClient client = cluster.createClient(leaderId)) { + final PeerChanges c1 = cluster.addNewPeers(2, true); + + SetConfigurationRequest.Arguments arguments = SetConfigurationRequest.Arguments.newBuilder() + .setServersInCurrentConf(cluster.getPeers()) + .setServersInNewConf(c1.allPeersInNewConf) + .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET) + .build(); + assertThrows("Expect change majority error.", SetConfigurationException.class, + () -> client.admin().setConfiguration(arguments)); + } + }); + } + /** * remove 2 peers (5 peers -> 3 peers), no leader change */ @@ -409,17 +429,7 @@ public void testBootstrapReconfWithSingleNodeAddOne() throws Exception { @Test public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception { // originally 1 peer, add 2 more - runWithNewCluster(1, cluster -> { - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - - try (final RaftClient client = cluster.createClient(leaderId)) { - final PeerChanges c1 = cluster.addNewPeers(2, true); - - assertThrows("Expect change majority error.", SetConfigurationException.class, - () -> client.admin().setConfiguration(c1.allPeersInNewConf)); - } - }); + runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster)); } @Test From feeac8dfd45337b5e68ecf9c4c445f0f75535997 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Tue, 31 Oct 2023 13:53:13 +0800 Subject: [PATCH 05/10] v0.5 remove unused import --- .../main/java/org/apache/ratis/server/impl/LeaderElection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 197b21a72c..2b62927dcc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -51,7 +51,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.Set; import java.util.stream.Collectors; From d3b013415a7a4e6835485c6451b0719ee4894213 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Wed, 1 Nov 2023 20:51:49 +0800 Subject: [PATCH 06/10] v0.6 --- .../ratis/server/impl/LeaderElection.java | 10 +++++----- .../server/impl/RaftConfigurationImpl.java | 17 ++++++----------- .../ratis/server/impl/RaftServerImpl.java | 3 +-- .../ratis/server/impl/RaftServerProxy.java | 18 +----------------- .../impl/RaftReconfigurationBaseTest.java | 10 +++++++--- .../ratis/server/impl/RaftServerTestUtil.java | 6 ++++++ 6 files changed, 26 insertions(+), 38 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 2b62927dcc..43b7780578 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -104,7 +104,7 @@ enum Phase { ELECTION } - enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF} + enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF} private static class ResultAndTerm { private final Result result; @@ -331,6 +331,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException, switch (r.getResult()) { case PASSED: + case SINGLE_MODE_PASSED: return true; case NOT_IN_CONF: case SHUTDOWN: @@ -390,7 +391,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt return logAndReturn(phase, Result.PASSED, responses, exceptions); } else if (singleMode) { // if candidate is in single mode, candidate pass vote. - return logAndReturn(phase, Result.PASSED, responses, exceptions); + return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions); } else { return logAndReturn(phase, Result.TIMEOUT, responses, exceptions); } @@ -422,8 +423,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt } // If any peer with higher priority rejects vote, candidate can not pass vote - if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) - && !singleMode) { + if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) && !singleMode) { return logAndReturn(phase, Result.REJECTED, responses, exceptions); } @@ -453,7 +453,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt if (conf.hasMajority(votedPeers, server.getId())) { return logAndReturn(phase, Result.PASSED, responses, exceptions); } else if (singleMode) { - return logAndReturn(phase, Result.PASSED, responses, exceptions); + return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions); } else { return logAndReturn(phase, Result.REJECTED, responses, exceptions); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index 40f8abfeff..d609264af5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -237,22 +237,18 @@ Collection getOtherPeers(RaftPeerId selfId) { * changing from single mode to HA mode. */ boolean changeMajority(Collection newMembers) { - Preconditions.assertNull(oldConf, "Conf must be stable."); - int newPeersCount = 0; - for (RaftPeer peer : newMembers) { - final RaftPeer inConf = conf.getPeer(peer.getId()); - if (inConf == null) { - newPeersCount++; - } - } + Preconditions.assertNull(oldConf, "oldConf"); + final long newPeersCount = newMembers.stream().map(RaftPeer::getId).filter(id -> conf.getPeer(id) == null).count(); if (conf.size() == 1 && newMembers.size() == 2 && newPeersCount == 1) { // Change from single peer to HA mode. This is a special case, skip majority verification. return false; } + // If newPeersCount reaches majority number of new conf size, the cluster may end with infinity // election. See https://issues.apache.org/jira/browse/RATIS-1912 for more details. - return newPeersCount >= newMembers.size() / 2 + newMembers.size() % 2; + final long oldPeersCount = newMembers.size() - newPeersCount; + return newPeersCount >= oldPeersCount; } /** @return True if the selfId is in single mode. */ @@ -260,8 +256,7 @@ boolean isSingleMode(RaftPeerId selfId) { if (isStable()) { return conf.size() == 1; } else { - return oldConf.size() == 1 && oldConf.contains(selfId) && conf.size() == 2 && conf.contains( - selfId); + return oldConf.size() == 1 && oldConf.contains(selfId) && conf.size() == 2 && conf.contains(selfId); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 44e254f3bf..bc0c443884 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1311,8 +1311,7 @@ public CompletableFuture setConfigurationAsync(SetConfiguration pending.setReply(newSuccessReply(request)); return pending.getFuture(); } - if (arguments.getMode() != SetConfigurationRequest.Mode.SET_UNCONDITIONALLY - && current.changeMajority(serversInNewConf)) { + if (current.changeMajority(serversInNewConf)) { throw new SetConfigurationException("Failed to set configuration: request " + request + " changes a majority set of the current configuration " + current); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 8d32f55f23..fd80d6938f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -41,7 +41,6 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.ServerFactory; import org.apache.ratis.server.storage.RaftStorage.StartupOption; -import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.ConcurrentUtils; import org.apache.ratis.util.JvmPauseMonitor; import org.apache.ratis.server.RaftServerConfigKeys; @@ -369,7 +368,7 @@ private RaftServerImpl getImpl(RaftRpcRequestProto proto) throws IOException { return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId())); } - public RaftServerImpl getImpl(RaftGroupId groupId) throws IOException { + private RaftServerImpl getImpl(RaftGroupId groupId) throws IOException { Objects.requireNonNull(groupId, "groupId == null"); return IOUtils.getFromFuture(getImplFuture(groupId), this::getId); } @@ -392,21 +391,6 @@ public LifeCycle.State getLifeCycleState() { return lifeCycle.getCurrentState(); } - @VisibleForTesting - void setRaftConf(RaftGroupId groupId, RaftConfigurationImpl conf) throws IOException { - getImpl(groupId).getState().setRaftConf(conf); - } - - @VisibleForTesting - void triggerElection(RaftGroupId groupId) throws IOException { - RaftServerImpl impl = getImpl(groupId); - Preconditions.assertTrue(impl.getInfo().isLeader(), - "Only leader can perform trigger election."); - - long newTerm = impl.getState().getCurrentTerm() + 1; - impl.changeToFollowerAndPersistMetadata(newTerm, true, "Trigger election for test."); - } - ThreadGroup getThreadGroup() { return threadGroup; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index c68a82a450..45cf50c40f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -167,10 +167,14 @@ public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception { .setLogEntryIndex(Long.MAX_VALUE / 2) .build(); Assert.assertTrue(oldNewConf.isSingleMode(curPeer.getId())); - leaderServer.setRaftConf(groupId, oldNewConf); - leaderServer.triggerElection(groupId); + RaftServerTestUtil.setRaftConf(leaderServer, groupId, oldNewConf); + try(RaftClient client = cluster.createClient()) { + client.admin().transferLeadership(null, leaderServer.getId(), 1000); + } - RaftTestUtil.waitForLeader(cluster); + final RaftServer.Division newLeader = RaftTestUtil.waitForLeader(cluster); + Assert.assertEquals(leaderServer.getId(), newLeader.getId()); + Assert.assertEquals(oldNewConf, newLeader.getRaftConf()); }); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 73482dcf8d..7b9bc07b21 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -32,6 +32,7 @@ import org.apache.ratis.server.leader.LogAppender; import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; @@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.event.Level; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -135,6 +137,10 @@ public static RaftConfiguration newRaftConfiguration(Collection peers) return RaftConfigurationImpl.newBuilder().setConf(peers).build(); } + public static void setRaftConf(RaftServer proxy, RaftGroupId groupId, RaftConfiguration conf) { + ((RaftServerImpl)getDivision(proxy, groupId)).getState().setRaftConf(conf); + } + public static RaftServerRpc getServerRpc(RaftServer.Division server) { return ((RaftServerImpl)server).getRaftServer().getServerRpc(); } From 61744a7106fb4a1a24abae0bbf34cc9d35827a9d Mon Sep 17 00:00:00 2001 From: lijinglun Date: Wed, 1 Nov 2023 21:07:34 +0800 Subject: [PATCH 07/10] v0.7 --- .../server/impl/RaftReconfigurationBaseTest.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 45cf50c40f..3b8e206deb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -433,7 +433,17 @@ public void testBootstrapReconfWithSingleNodeAddOne() throws Exception { @Test public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception { // originally 1 peer, add 2 more - runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster)); + runWithNewCluster(1, cluster -> { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + try (final RaftClient client = cluster.createClient(leaderId)) { + final PeerChanges c1 = cluster.addNewPeers(2, true); + + assertThrows("Expect change majority error.", SetConfigurationException.class, + () -> client.admin().setConfiguration(c1.allPeersInNewConf)); + } + }); } @Test From 1080e9935ded89fd8de47fd65c0587f92e2a9392 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Thu, 2 Nov 2023 11:48:37 +0800 Subject: [PATCH 08/10] v0.8 fix unit tests. --- .../ratis/InstallSnapshotFromLeaderTests.java | 4 +- .../InstallSnapshotNotificationTests.java | 13 ++++-- .../org/apache/ratis/RetryCacheTests.java | 4 +- .../server/impl/GroupManagementBaseTest.java | 3 +- .../ratis/server/impl/RaftServerTestUtil.java | 46 ++++++++++++++++++- .../statemachine/RaftSnapshotBaseTest.java | 8 +++- 6 files changed, 68 insertions(+), 10 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java index b7eb752040..e51c98548e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java @@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -111,7 +112,8 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception { final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, true); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); RaftServerTestUtil .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 2d77519803..28c75852bc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -49,6 +49,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -242,7 +243,8 @@ private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Except final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, true); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); RaftServerTestUtil .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); @@ -391,7 +393,8 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, true); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); RaftServerTestUtil .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); @@ -478,7 +481,8 @@ private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws Exception // add one new peer final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); RaftServerTestUtil .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); @@ -556,7 +560,8 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, true); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index f729dcd2d5..ba36d7078e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -28,6 +28,7 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.raftlog.RaftLogIOException; @@ -140,7 +141,8 @@ void runTestRetryOnNewLeader(CLUSTER cluster) throws Exception { RaftPeer[] allPeers = cluster.removePeers(2, true, asList(change.newPeers)).allPeersInNewConf; // trigger setConfiguration - cluster.setConfiguration(allPeers); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(allPeers), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> { final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId(); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java index f40afa0884..311a2150d6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java @@ -329,7 +329,8 @@ public static void runMultiGroupTest( } LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId())); try (final RaftClient client = cluster.createClient(groups[chosen])) { - client.admin().setConfiguration(allPeers.toArray(RaftPeer.emptyArray())); + RaftServerTestUtil.runWithMinorityPeers(cluster, allPeers, + peers -> client.admin().setConfiguration(peers.toArray(RaftPeer.emptyArray()))); } Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster)); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 7b9bc07b21..198e33fd77 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -19,6 +19,7 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; @@ -32,7 +33,6 @@ import org.apache.ratis.server.leader.LogAppender; import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; @@ -46,7 +46,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; public class RaftServerTestUtil { @@ -202,4 +206,44 @@ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, D public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId peerId) { return ((RaftConfigurationImpl)config).isHighestPriority(peerId); } + + public interface ConsumerWithIOException { + void accept(Collection peesToSetConf) throws IOException; + } + + public static void runWithMinorityPeers(MiniRaftCluster cluster, Collection peersInNewConf, + ConsumerWithIOException consumer) throws IOException { + Collection peers = parseMinorityPeers(cluster, peersInNewConf); + while (peers != null) { + consumer.accept(peers); + peers = parseMinorityPeers(cluster, peersInNewConf); + } + } + + private static Collection parseMinorityPeers(MiniRaftCluster cluster, Collection peersInNewConf) { + RaftConfigurationImpl conf = (RaftConfigurationImpl) cluster.getLeader().getRaftConf(); + Set peers = new HashSet<>(conf.getCurrentPeers()); + + // Add new peers to construct minority conf peers. + List peersToAdd = peersInNewConf.stream().filter( + peer -> !conf.containsInConf(peer.getId(), RaftProtos.RaftPeerRole.FOLLOWER)).collect(Collectors.toList()); + if (!peersToAdd.isEmpty()) { + for (RaftPeer peer : peersToAdd) { + if (peers.add(peer) && conf.changeMajority(peers)) { + peers.remove(peer); + break; + } + } + return peers; + } + + // All new peers has been added. Handle the removed peers. + List peersToRemove = peers.stream().filter(peer -> !peersInNewConf.contains(peer)).collect(Collectors.toList()); + if (!peersToRemove.isEmpty()) { + return peersInNewConf; + } + + // The peers in new conf are the same as current conf, return null. + return null; + } } diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 9cce49a646..fe1a97ddca 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -25,6 +25,7 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.metrics.LongCounter; import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; @@ -57,6 +58,7 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -239,7 +241,8 @@ public void testBasicInstallSnapshot() throws Exception { MiniRaftCluster.PeerChanges change = cluster.addNewPeers( newPeers, true, false); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); for (String newPeer : newPeers) { final RaftServer.Division s = cluster.getDivision(RaftPeerId.valueOf(newPeer)); @@ -301,7 +304,8 @@ public void testInstallSnapshotDuringBootstrap() throws Exception { MiniRaftCluster.PeerChanges change = cluster.addNewPeers( newPeers, true, false); // trigger setConfiguration - cluster.setConfiguration(change.allPeersInNewConf); + RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), + peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); for (String newPeer : newPeers) { final RaftServer.Division s = cluster.getDivision(RaftPeerId.valueOf(newPeer)); From 449d4859ec2b10cab9809e620174d263cb7a4442 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Fri, 3 Nov 2023 11:09:00 +0800 Subject: [PATCH 09/10] v0.9 --- .../org/apache/ratis/server/impl/RaftServerTestUtil.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 198e33fd77..2927ec3495 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -36,6 +36,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.CheckedConsumer; import org.junit.Assert; import org.mockito.Mockito; import org.slf4j.Logger; @@ -207,12 +208,8 @@ public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId pee return ((RaftConfigurationImpl)config).isHighestPriority(peerId); } - public interface ConsumerWithIOException { - void accept(Collection peesToSetConf) throws IOException; - } - public static void runWithMinorityPeers(MiniRaftCluster cluster, Collection peersInNewConf, - ConsumerWithIOException consumer) throws IOException { + CheckedConsumer, IOException> consumer) throws IOException { Collection peers = parseMinorityPeers(cluster, peersInNewConf); while (peers != null) { consumer.accept(peers); From f93e9cc4e5f93113b60b6fd22654ca35c357602a Mon Sep 17 00:00:00 2001 From: lijinglun Date: Fri, 3 Nov 2023 14:53:30 +0800 Subject: [PATCH 10/10] v0.10 --- .../org/apache/ratis/InstallSnapshotNotificationTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 28c75852bc..72ddd06f26 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -572,8 +572,8 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower)); } - // Make sure each new peer got one snapshot notification. - Assert.assertEquals(2, numSnapshotRequests.get()); + // Make sure each new peer got at least one snapshot notification. + Assert.assertTrue(2 <= numSnapshotRequests.get()); } finally { cluster.shutdown(); }