From fd9d7698fff884c020a6b26d2481d7116ccd2ceb Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 5 Jun 2024 19:58:26 +0800 Subject: [PATCH 1/5] KAFKA-16531: calculate check-quorum when leader is not in voter set --- .../org/apache/kafka/raft/LeaderState.java | 9 ++++-- .../apache/kafka/raft/LeaderStateTest.java | 32 +++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 13a6d106345d8..862480fe884aa 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -126,8 +126,13 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { */ public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { updateFetchedVoters(id); - // The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 - int majority = voterStates.size() / 2; + // The majority number of the voters. Ex: 3 voters, the value will be 2 + int majority = (int) ((double) (voterStates.size() + 1) / 2); + // Check if the leader is removed from the voter set + if (voterStates.containsKey(localId)) { + majority = majority - 1; + } + if (fetchedVoters.size() >= majority) { fetchedVoters.clear(); checkQuorumTimer.update(currentTimeMs); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 6d31154ccf233..766eb498d3bb0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -729,6 +729,38 @@ public void testCheckQuorum() { assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); } + @Test + public void testCheckQuorumWithoutLeader() { + int node1 = 1; + int node2 = 2; + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 0L); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + int resignLeadershipTimeout = checkQuorumTimeoutMs; + + // checkQuorum timeout not exceeded, should not expire the timer + time.sleep(resignLeadershipTimeout / 2); + assertTrue(state.timeUntilCheckQuorumExpires(time.milliseconds()) > 0); + + // received fetch request from 1 voter node, the timer should be reset + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // removing leader from the voter set + Set voterSetWithoutLeader = mkSet(node1, node2); + state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithoutLeader); + + // received fetch request from 1 voter node1, the timer should not be reset + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + // Since the timer was reset, it won't expire this time. + time.sleep(resignLeadershipTimeout); + assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // received fetch request from another voter, the timer should not be reset. + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + } + @Test public void testCheckQuorumWithOneVoter() { int observer = 1; From 7eae1256e053f4578e1a033cf58d445bbf68e371 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 6 Jun 2024 16:32:37 +0800 Subject: [PATCH 2/5] KAFKA-16531: add tests --- .../org/apache/kafka/raft/LeaderState.java | 5 ++-- .../apache/kafka/raft/LeaderStateTest.java | 28 ++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 862480fe884aa..d9e8720aca607 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -111,9 +111,10 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { long remainingMs = checkQuorumTimer.remainingMs(); if (remainingMs == 0) { log.info( - "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", + "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}, and voters are {}", checkQuorumTimeoutMs, - fetchedVoters); + fetchedVoters, + voterStates); } return remainingMs; } diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 766eb498d3bb0..0154239d84d32 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -730,33 +730,47 @@ public void testCheckQuorum() { } @Test - public void testCheckQuorumWithoutLeader() { + public void testCheckQuorumAfterVoterSetChanges() { int node1 = 1; int node2 = 2; + int node3 = 3; Set originalVoterSet = mkSet(localId, node1, node2); LeaderState state = newLeaderState(originalVoterSet, 0L); assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); - int resignLeadershipTimeout = checkQuorumTimeoutMs; // checkQuorum timeout not exceeded, should not expire the timer - time.sleep(resignLeadershipTimeout / 2); + time.sleep(checkQuorumTimeoutMs / 2); assertTrue(state.timeUntilCheckQuorumExpires(time.milliseconds()) > 0); // received fetch request from 1 voter node, the timer should be reset state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + // Adding 1 new voter to the voter set + Set voterSetWithNode3 = mkSet(localId, node1, node2, node3); + state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithNode3); + + // received fetch request from 1 voter node, the timer should not be reset because the majority should be 3 + state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); + // Since the timer was not reset, it will be expired. + time.sleep(checkQuorumTimeoutMs); + assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // Timer should be reset after receiving another voter's fetch request + state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); + assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); + // removing leader from the voter set - Set voterSetWithoutLeader = mkSet(node1, node2); + Set voterSetWithoutLeader = mkSet(node1, node2, node3); state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithoutLeader); - // received fetch request from 1 voter node1, the timer should not be reset + // received fetch request from 1 voter, the timer should not be reset. state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); // Since the timer was reset, it won't expire this time. - time.sleep(resignLeadershipTimeout); + time.sleep(checkQuorumTimeoutMs); assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); - // received fetch request from another voter, the timer should not be reset. + // received fetch request from another voter, the timer should be reset since the current quorum majority is 2. state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); assertEquals(checkQuorumTimeoutMs, state.timeUntilCheckQuorumExpires(time.milliseconds())); } From 7db45c2e8f08b4a381717b4c6b7f466bd116cd4e Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 13 Jun 2024 11:28:14 +0800 Subject: [PATCH 3/5] KAFKA-16531: fix wrong majority formation --- .../main/java/org/apache/kafka/raft/LeaderState.java | 6 +++--- .../java/org/apache/kafka/raft/LeaderStateTest.java | 12 +++++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index d9e8720aca607..2ab2f85937dc7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -114,7 +114,7 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { "Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}, and voters are {}", checkQuorumTimeoutMs, fetchedVoters, - voterStates); + voterStates.keySet()); } return remainingMs; } @@ -127,8 +127,8 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { */ public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { updateFetchedVoters(id); - // The majority number of the voters. Ex: 3 voters, the value will be 2 - int majority = (int) ((double) (voterStates.size() + 1) / 2); + // The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc. + int majority = (int) Math.ceil((double) (voterStates.size() + 1) / 2); // Check if the leader is removed from the voter set if (voterStates.containsKey(localId)) { majority = majority - 1; diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 0154239d84d32..74286912f2a2c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -740,7 +740,7 @@ public void testCheckQuorumAfterVoterSetChanges() { // checkQuorum timeout not exceeded, should not expire the timer time.sleep(checkQuorumTimeoutMs / 2); - assertTrue(state.timeUntilCheckQuorumExpires(time.milliseconds()) > 0); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); // received fetch request from 1 voter node, the timer should be reset state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); @@ -750,11 +750,10 @@ public void testCheckQuorumAfterVoterSetChanges() { Set voterSetWithNode3 = mkSet(localId, node1, node2, node3); state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithNode3); + time.sleep(checkQuorumTimeoutMs / 2); // received fetch request from 1 voter node, the timer should not be reset because the majority should be 3 state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); - // Since the timer was not reset, it will be expired. - time.sleep(checkQuorumTimeoutMs); - assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); // Timer should be reset after receiving another voter's fetch request state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); @@ -764,11 +763,10 @@ public void testCheckQuorumAfterVoterSetChanges() { Set voterSetWithoutLeader = mkSet(node1, node2, node3); state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithoutLeader); + time.sleep(checkQuorumTimeoutMs / 2); // received fetch request from 1 voter, the timer should not be reset. state.updateCheckQuorumForFollowingVoter(node2, time.milliseconds()); - // Since the timer was reset, it won't expire this time. - time.sleep(checkQuorumTimeoutMs); - assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); + assertEquals(checkQuorumTimeoutMs / 2, state.timeUntilCheckQuorumExpires(time.milliseconds())); // received fetch request from another voter, the timer should be reset since the current quorum majority is 2. state.updateCheckQuorumForFollowingVoter(node1, time.milliseconds()); From e829e83ddcb062959e124441163d09ac489c8561 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Fri, 14 Jun 2024 15:09:55 +0800 Subject: [PATCH 4/5] KAFKA-16531: fix broken tests --- raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 8a0e6b458fd8c..4f93e51372bd6 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -780,7 +780,7 @@ public void testCheckQuorumAfterVoterSetChanges() { // Adding 1 new voter to the voter set Set voterSetWithNode3 = mkSet(localId, node1, node2, node3); - state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithNode3); + state.updateLocalState(new LogOffsetMetadata(1L), toMap(voterSetWithNode3)); time.sleep(checkQuorumTimeoutMs / 2); // received fetch request from 1 voter node, the timer should not be reset because the majority should be 3 @@ -793,7 +793,7 @@ public void testCheckQuorumAfterVoterSetChanges() { // removing leader from the voter set Set voterSetWithoutLeader = mkSet(node1, node2, node3); - state.updateLocalState(new LogOffsetMetadata(1L), voterSetWithoutLeader); + state.updateLocalState(new LogOffsetMetadata(1L), toMap(voterSetWithoutLeader)); time.sleep(checkQuorumTimeoutMs / 2); // received fetch request from 1 voter, the timer should not be reset. From 22f748027b9d9aab19ca1f6f51d259916911630b Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Thu, 20 Jun 2024 10:01:47 -0700 Subject: [PATCH 5/5] address reviews --- raft/src/main/java/org/apache/kafka/raft/LeaderState.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 87ccf70ea9ecb..15dbb4ec48370 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -134,8 +134,11 @@ public long timeUntilCheckQuorumExpires(long currentTimeMs) { public void updateCheckQuorumForFollowingVoter(int id, long currentTimeMs) { updateFetchedVoters(id); // The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc. - int majority = (int) Math.ceil((double) (voterStates.size() + 1) / 2); - // Check if the leader is removed from the voter set + int majority = (voterStates.size() / 2) + 1; + // If the leader is in the voter set, it should be implicitly counted as part of the + // majority, but the leader will never be a member of the fetchedVoters. + // If the leader is not in the voter set, it is not in the majority. Then, the + // majority can only be composed of fetched voters. if (voterStates.containsKey(localId)) { majority = majority - 1; }