diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index fcb4a5950bc7f..15dbb4ec48370 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -117,9 +117,10 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { long remainingMs = checkQuorumTimer.remainingMs(); if (remainingMs == 0) { log.info( - "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}, and voters are {}", checkQuorumTimeoutMs, - fetchedVoters); + fetchedVoters, + voterStates.keySet()); } return remainingMs; } @@ -132,8 +133,16 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { */ public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { updateFetchedVoters(id); - // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 - int majority = voterStates.size() / 2; + // The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc. + int majority = (voterStates.size() / 2) + 1; + // If the leader is in the voter set, it should be implicitly counted as part of the + // majority, but the leader will never be a member of the fetchedVoters. + // If the leader is not in the voter set, it is not in the majority. Then, the + // majority can only be composed of fetched voters. + if (voterStates.containsKey(localId)) { + majority = majority - 1; + } + if (fetchedVoters.size() >= majority) { fetchedVoters.clear(); checkQuorumTimer.update(currentTimeMs); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 20b9e0b3f29c7..4f93e51372bd6 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -761,6 +761,50 @@ public void testCheckQuorum() { assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); } + @Test + public void testCheckQuorumAfterVoterSetChanges() { + int node1 = 1; + int node2 = 2; + int node3 = 3; + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 0L); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // checkQuorum timeout not exceeded, should not expire the timer + time.sleep(checkQuorumTimeoutMs / 2); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // received fetch request from 1 voter node, the timer should be reset + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // Adding 1 new voter to the voter set + Set voterSetWithNode3 = mkSet(localId, node1, node2, node3); + state.updateLocalState(new LogOffsetMetadata(1L), toMap(voterSetWithNode3)); + + time.sleep(checkQuorumTimeoutMs / 2); + // received fetch request from 1 voter node, the timer should not be reset because the majority should be 3 + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // Timer should be reset after receiving another voter's fetch request + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // removing leader from the voter set + Set voterSetWithoutLeader = mkSet(node1, node2, node3); + state.updateLocalState(new LogOffsetMetadata(1L), toMap(voterSetWithoutLeader)); + + time.sleep(checkQuorumTimeoutMs / 2); + // received fetch request from 1 voter, the timer should not be reset. + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // received fetch request from another voter, the timer should be reset since the current quorum majority is 2. + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + } + @Test public void testCheckQuorumWithOneVoter() { int observer = 1;