diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 0d5702f85ed82..99605a7794461 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -135,11 +135,15 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, randomElectionTimeoutMs() ); } else if (election.isLeader(localId)) { - initialState = new LeaderState( - localId, + // If we were previously a leader, then we will start out as unattached + // in the same epoch. This protects the invariant that each record + // is uniquely identified by offset and epoch, which might otherwise + // be violated if unflushed data is lost after restarting. + initialState = new UnattachedState( + time, election.epoch, - logEndOffsetAndEpoch.offset, - voters + voters, + randomElectionTimeoutMs() ); } else if (election.isCandidate(localId)) { initialState = new CandidateState( diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 6a755588bb70d..7182ccd8aab66 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -140,22 +140,42 @@ private KafkaRaftClient buildClient(Set voters, Metrics metrics) throws @Test public void testInitializeSingleMemberQuorum() throws IOException { - KafkaRaftClient client = buildClient(Collections.singleton(localId)); - assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), quorumStateStore.readElectionState()); - client.poll(); - assertEquals(0, channel.drainSendQueue().size()); + buildClient(Collections.singleton(localId)); + assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), + quorumStateStore.readElectionState()); + } + + @Test + public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception { + // Start off as leader. We should still bump the epoch after initialization + + int initialEpoch = 2; + Set voters = Collections.singleton(localId); + quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters)); + + KafkaRaftClient client = buildClient(voters); + assertEquals(1L, log.endOffset().offset); + assertEquals(initialEpoch + 1, log.lastFetchedEpoch()); + assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), + client.currentLeaderAndEpoch()); + assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters), + quorumStateStore.readElectionState()); } @Test - public void testInitializeAsLeaderFromStateStore() throws IOException { + public void testInitializeAsLeaderFromStateStore() throws Exception { Set voters = Utils.mkSet(localId, 1); int epoch = 2; + Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); KafkaRaftClient client = buildClient(voters); - assertEquals(1L, log.endOffset().offset); + assertEquals(0L, log.endOffset().offset); + assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState()); - // FIXME: Is this test useful? + time.sleep(electionTimeoutMs); + pollUntilSend(client); + assertSentVoteRequest(epoch + 1, 0, 0L); } @Test @@ -293,8 +313,7 @@ public void testEndQuorumIgnoredIfAlreadyLeader() throws Exception { Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt()); Set voters = Utils.mkSet(localId, voter2, voter3); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); - KafkaRaftClient client = buildClient(voters); + KafkaRaftClient client = initializeAsLeader(voters, epoch); // One of the voters may have sent EndEpoch as a candidate because it // had not yet been notified that the local node was the leader. @@ -833,12 +852,7 @@ public void testLeaderIgnoreVoteRequestOnSameEpoch() throws Exception { int leaderEpoch = 2; Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(leaderEpoch, localId, voters)); - - KafkaRaftClient client = buildClient(voters); - - pollUntilSend(client); - + KafkaRaftClient client = initializeAsLeader(voters, leaderEpoch); deliverRequest(voteRequest(leaderEpoch, otherNodeId, leaderEpoch - 1, 1)); client.poll(); @@ -1317,11 +1331,41 @@ public void testPurgatoryFetchCompletedByFollowerTransition() throws Exception { assertEquals(0, fetchedRecords.sizeInBytes()); } + private void expectLeaderElection( + KafkaRaftClient client, + Set voters, + int epoch + ) throws Exception { + pollUntilSend(client); + + List voteRequests = collectVoteRequests(epoch, + log.lastFetchedEpoch(), log.endOffset().offset); + + for (RaftRequest.Outbound request : voteRequests) { + VoteResponseData voteResponse = voteResponse(true, Optional.empty(), epoch); + deliverResponse(request.correlationId, request.destinationId(), voteResponse); + } + + client.poll(); + assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), + quorumStateStore.readElectionState()); + } + private KafkaRaftClient initializeAsLeader(Set voters, int epoch) throws Exception { - ElectionState leaderElectionState = ElectionState.withElectedLeader(epoch, localId, voters); - quorumStateStore.writeElectionState(leaderElectionState); + if (epoch <= 0) { + throw new IllegalArgumentException("Cannot become leader in epoch " + epoch); + } + + Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); + + ElectionState electionState = ElectionState.withUnknownLeader(epoch - 1, voters); + quorumStateStore.writeElectionState(electionState); KafkaRaftClient client = buildClient(voters); - assertEquals(leaderElectionState, quorumStateStore.readElectionState()); + assertEquals(electionState, quorumStateStore.readElectionState()); + + // Advance the clock so that we become a candidate + time.sleep(electionTimeoutMs); + expectLeaderElection(client, voters, epoch); // Handle BeginEpoch pollUntilSend(client); @@ -1329,6 +1373,7 @@ private KafkaRaftClient initializeAsLeader(Set voters, int epoch) throw BeginQuorumEpochResponseData beginEpochResponse = beginEpochResponse(epoch, localId); deliverResponse(request.correlationId, request.destinationId(), beginEpochResponse); } + client.poll(); return client; } @@ -1821,9 +1866,12 @@ public void testFetchShouldBeTreatedAsLeaderEndorsement() throws Exception { int epoch = 5; Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); + Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); + quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch - 1, voters)); KafkaRaftClient client = buildClient(voters); - assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), quorumStateStore.readElectionState()); + + time.sleep(electionTimeoutMs); + expectLeaderElection(client, voters, epoch); pollUntilSend(client); @@ -1945,7 +1993,6 @@ public void testFollowerLogReconciliation() throws Exception { public void testMetrics() throws Exception { Metrics metrics = new Metrics(time); int epoch = 1; - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, Collections.singleton(localId))); KafkaRaftClient client = buildClient(Collections.singleton(localId), metrics); assertNotNull(getMetric(metrics, "current-state")); @@ -2017,11 +2064,14 @@ public void testClusterAuthorizationFailedInFetch() throws Exception { public void testClusterAuthorizationFailedInBeginQuorumEpoch() throws Exception { int otherNodeId = 1; int epoch = 5; - Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); + + Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); + quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch - 1, voters)); KafkaRaftClient client = buildClient(voters); - assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), quorumStateStore.readElectionState()); + + time.sleep(electionTimeoutMs); + expectLeaderElection(client, voters, epoch); pollUntilSend(client); int correlationId = assertSentBeginQuorumEpochRequest(epoch); diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 1ea424d55b10a..5ecc168db3bd6 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -167,14 +167,23 @@ public void testInitializeAsLeader() throws IOException { Set voters = Utils.mkSet(localId, node1, node2); store.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); + // If we were previously a leader, we will start as unattached + // so that records are always uniquely defined by epoch and offset + // even accounting for the loss of unflushed data. + + // The election timeout should be reset after we become a candidate again + int jitterMs = 2500; + Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt()); + QuorumState state = buildQuorumState(voters); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - assertTrue(state.isLeader()); + assertFalse(state.isLeader()); assertEquals(epoch, state.epoch()); - LeaderState leaderState = state.leaderStateOrThrow(); - assertEquals(epoch, leaderState.epoch()); - assertEquals(Utils.mkSet(node1, node2), leaderState.nonEndorsingFollowers()); + UnattachedState unattachedState = state.unattachedStateOrThrow(); + assertEquals(epoch, unattachedState.epoch()); + assertEquals(electionTimeoutMs + jitterMs, + unattachedState.remainingElectionTimeMs(time.milliseconds())); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 7d1ef1ed5731e..7e3d99b35ee30 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -98,51 +98,9 @@ private void testInitialLeaderElection(QuorumConfig config) { cluster.startAll(); schedulePolling(scheduler, cluster, 3, 5); scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.runUntil(cluster::hasConsistentLeader); - } - } - - @Test - public void testReplicationNoLeaderChangeQuorumSizeOne() { - testReplicationNoLeaderChange(new QuorumConfig(1)); - } - - @Test - public void testReplicationNoLeaderChangeQuorumSizeTwo() { - testReplicationNoLeaderChange(new QuorumConfig(2)); - } - - @Test - public void testReplicationNoLeaderChangeQuorumSizeThree() { - testReplicationNoLeaderChange(new QuorumConfig(3, 0)); - } - - @Test - public void testReplicationNoLeaderChangeQuorumSizeFour() { - testReplicationNoLeaderChange(new QuorumConfig(4)); - } - - @Test - public void testReplicationNoLeaderChangeQuorumSizeFive() { - testReplicationNoLeaderChange(new QuorumConfig(5)); - } - - private void testReplicationNoLeaderChange(QuorumConfig config) { - for (int seed = 0; seed < 100; seed++) { - Cluster cluster = new Cluster(config, seed); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - Set voters = cluster.voters(); - // Start with node 0 as the leader - cluster.initializeElection(ElectionState.withElectedLeader(2, 0, voters)); - cluster.startAll(); - assertTrue(cluster.hasConsistentLeader()); - - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 0); scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); } } @@ -223,21 +181,17 @@ private void testElectionAfterLeaderShutdown(QuorumConfig config, boolean isGrac MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - // Start with node 0 as the leader - int leaderId = 0; - Set voters = cluster.voters(); - cluster.initializeElection(ElectionState.withElectedLeader(2, leaderId, voters)); - cluster.startAll(); - assertTrue(cluster.hasConsistentLeader()); - // Seed the cluster with some data + cluster.startAll(); schedulePolling(scheduler, cluster, 3, 5); scheduler.schedule(router::deliverAll, 0, 2, 1); scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); // Shutdown the leader and write some more data. We can verify the new leader has been elected // by verifying that the high watermark can still advance. + int leaderId = cluster.latestLeader().getAsInt(); if (isGracefulShutdown) { cluster.shutdown(leaderId); } else { @@ -287,24 +241,21 @@ private void testElectionAfterLeaderNetworkPartition(QuorumConfig config) { MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - // Start with node 1 as the leader - Set voters = cluster.voters(); - cluster.initializeElection(ElectionState.withElectedLeader(2, 1, voters)); - cluster.startAll(); - assertTrue(cluster.hasConsistentLeader()); - // Seed the cluster with some data + cluster.startAll(); schedulePolling(scheduler, cluster, 3, 5); scheduler.schedule(router::deliverAll, 0, 2, 2); scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); // The leader gets partitioned off. We can verify the new leader has been elected // by writing some data and ensuring that it gets replicated - router.filter(1, new DropAllTraffic()); + int leaderId = cluster.latestLeader().getAsInt(); + router.filter(leaderId, new DropAllTraffic()); Set nonPartitionedNodes = new HashSet<>(cluster.nodes()); - nonPartitionedNodes.remove(1); + nonPartitionedNodes.remove(leaderId); scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes)); } @@ -329,20 +280,17 @@ private void testElectionAfterMultiNodeNetworkPartition(QuorumConfig config) { MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - // Start with node 1 as the leader - Set voters = cluster.voters(); - cluster.initializeElection(ElectionState.withElectedLeader(2, 1, voters)); - cluster.startAll(); - assertTrue(cluster.hasConsistentLeader()); - // Seed the cluster with some data + cluster.startAll(); schedulePolling(scheduler, cluster, 3, 5); scheduler.schedule(router::deliverAll, 0, 2, 2); scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - // Partition the nodes into two sets. Nodes are reachable within each set, but the - // two sets cannot communicate with each other. + // Partition the nodes into two sets. Nodes are reachable within each set, + // but the two sets cannot communicate with each other. We should be able + // to make progress even if an election is needed in the larger set. router.filter(0, new DropOutboundRequestsFrom(Utils.mkSet(2, 3, 4))); router.filter(1, new DropOutboundRequestsFrom(Utils.mkSet(2, 3, 4))); router.filter(2, new DropOutboundRequestsFrom(Utils.mkSet(0, 1))); @@ -383,25 +331,21 @@ private void testBackToBackLeaderFailures(QuorumConfig config) { MessageRouter router = new MessageRouter(cluster); EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - // Start with node 1 as the leader - Set voters = cluster.voters(); - cluster.initializeElection(ElectionState.withElectedLeader(2, 1, voters)); - cluster.startAll(); - assertTrue(cluster.hasConsistentLeader()); - // Seed the cluster with some data + cluster.startAll(); schedulePolling(scheduler, cluster, 3, 5); scheduler.schedule(router::deliverAll, 0, 2, 5); scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - // Now partition off node 1 and wait for a new leader - router.filter(1, new DropAllTraffic()); - scheduler.runUntil(() -> cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != 1); + int leaderId = cluster.latestLeader().getAsInt(); + router.filter(leaderId, new DropAllTraffic()); + scheduler.runUntil(() -> cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != leaderId); - // As soon as we have a new leader, restore traffic to node 1 and partition the new leader + // As soon as we have a new leader, restore traffic to the old leader and partition the new leader int newLeaderId = cluster.latestLeader().getAsInt(); - router.filter(1, new PermitAllTraffic()); + router.filter(leaderId, new PermitAllTraffic()); router.filter(newLeaderId, new DropAllTraffic()); // Verify now that we can make progress @@ -656,6 +600,7 @@ OptionalInt latestLeader() { for (RaftNode node : running.values()) { if (node.quorum.epoch() > latestEpoch) { latestLeader = node.quorum.leaderId(); + latestEpoch = node.quorum.epoch(); } else if (node.quorum.epoch() == latestEpoch && node.quorum.leaderId().isPresent()) { latestLeader = node.quorum.leaderId(); } @@ -706,15 +651,6 @@ Collection running() { return running.values(); } - void initializeElection(ElectionState election) { - if (election.hasLeader() && !voters.contains(election.leaderId())) - throw new IllegalArgumentException("Illegal election of observer " + election.leaderId()); - - nodes.values().forEach(state -> { - state.store.writeElectionState(election); - }); - } - void ifRunning(int nodeId, Consumer action) { nodeIfRunning(nodeId).ifPresent(action); }