Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException,
randomElectionTimeoutMs()
);
} else if (election.isLeader(localId)) {
initialState = new LeaderState(
localId,
// If we were previously a leader, then we will start out as unattached
// in the same epoch. This protects the invariant that each record
// is uniquely identified by offset and epoch, which might otherwise
// be violated if unflushed data is lost after restarting.
initialState = new UnattachedState(
time,
election.epoch,
logEndOffsetAndEpoch.offset,
voters
voters,
randomElectionTimeoutMs()
);
} else if (election.isCandidate(localId)) {
initialState = new CandidateState(
Expand Down
98 changes: 74 additions & 24 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,42 @@ private KafkaRaftClient buildClient(Set<Integer> voters, Metrics metrics) throws

@Test
public void testInitializeSingleMemberQuorum() throws IOException {
KafkaRaftClient client = buildClient(Collections.singleton(localId));
assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), quorumStateStore.readElectionState());
client.poll();
assertEquals(0, channel.drainSendQueue().size());
buildClient(Collections.singleton(localId));
assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)),
quorumStateStore.readElectionState());
}

@Test
public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception {
// Start off as leader. We should still bump the epoch after initialization

int initialEpoch = 2;
Set<Integer> voters = Collections.singleton(localId);
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters));

KafkaRaftClient client = buildClient(voters);
assertEquals(1L, log.endOffset().offset);
assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
client.currentLeaderAndEpoch());
assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters),
quorumStateStore.readElectionState());
}

@Test
public void testInitializeAsLeaderFromStateStore() throws IOException {
public void testInitializeAsLeaderFromStateStore() throws Exception {
Set<Integer> voters = Utils.mkSet(localId, 1);
int epoch = 2;

Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
KafkaRaftClient client = buildClient(voters);
assertEquals(1L, log.endOffset().offset);
assertEquals(0L, log.endOffset().offset);
assertEquals(ElectionState.withUnknownLeader(epoch, voters), quorumStateStore.readElectionState());

// FIXME: Is this test useful?
time.sleep(electionTimeoutMs);
pollUntilSend(client);
assertSentVoteRequest(epoch + 1, 0, 0L);
}

@Test
Expand Down Expand Up @@ -293,8 +313,7 @@ public void testEndQuorumIgnoredIfAlreadyLeader() throws Exception {
Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt());

Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
KafkaRaftClient client = buildClient(voters);
KafkaRaftClient client = initializeAsLeader(voters, epoch);

// One of the voters may have sent EndEpoch as a candidate because it
// had not yet been notified that the local node was the leader.
Expand Down Expand Up @@ -833,12 +852,7 @@ public void testLeaderIgnoreVoteRequestOnSameEpoch() throws Exception {
int leaderEpoch = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);

quorumStateStore.writeElectionState(ElectionState.withElectedLeader(leaderEpoch, localId, voters));

KafkaRaftClient client = buildClient(voters);

pollUntilSend(client);

KafkaRaftClient client = initializeAsLeader(voters, leaderEpoch);
deliverRequest(voteRequest(leaderEpoch, otherNodeId, leaderEpoch - 1, 1));

client.poll();
Expand Down Expand Up @@ -1317,18 +1331,49 @@ public void testPurgatoryFetchCompletedByFollowerTransition() throws Exception {
assertEquals(0, fetchedRecords.sizeInBytes());
}

private void expectLeaderElection(
KafkaRaftClient client,
Set<Integer> voters,
int epoch
) throws Exception {
pollUntilSend(client);

List<RaftRequest.Outbound> voteRequests = collectVoteRequests(epoch,
log.lastFetchedEpoch(), log.endOffset().offset);

for (RaftRequest.Outbound request : voteRequests) {
VoteResponseData voteResponse = voteResponse(true, Optional.empty(), epoch);
deliverResponse(request.correlationId, request.destinationId(), voteResponse);
}

client.poll();
assertEquals(ElectionState.withElectedLeader(epoch, localId, voters),
quorumStateStore.readElectionState());
}

private KafkaRaftClient initializeAsLeader(Set<Integer> voters, int epoch) throws Exception {
ElectionState leaderElectionState = ElectionState.withElectedLeader(epoch, localId, voters);
quorumStateStore.writeElectionState(leaderElectionState);
if (epoch <= 0) {
throw new IllegalArgumentException("Cannot become leader in epoch " + epoch);
}

Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);

ElectionState electionState = ElectionState.withUnknownLeader(epoch - 1, voters);
quorumStateStore.writeElectionState(electionState);
KafkaRaftClient client = buildClient(voters);
assertEquals(leaderElectionState, quorumStateStore.readElectionState());
assertEquals(electionState, quorumStateStore.readElectionState());

// Advance the clock so that we become a candidate
time.sleep(electionTimeoutMs);
expectLeaderElection(client, voters, epoch);

// Handle BeginEpoch
pollUntilSend(client);
for (RaftRequest.Outbound request : collectBeginEpochRequests(epoch)) {
BeginQuorumEpochResponseData beginEpochResponse = beginEpochResponse(epoch, localId);
deliverResponse(request.correlationId, request.destinationId(), beginEpochResponse);
}

client.poll();
return client;
}
Expand Down Expand Up @@ -1821,9 +1866,12 @@ public void testFetchShouldBeTreatedAsLeaderEndorsement() throws Exception {
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);

quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch - 1, voters));
KafkaRaftClient client = buildClient(voters);
assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), quorumStateStore.readElectionState());

time.sleep(electionTimeoutMs);
expectLeaderElection(client, voters, epoch);

pollUntilSend(client);

Expand Down Expand Up @@ -1945,7 +1993,6 @@ public void testFollowerLogReconciliation() throws Exception {
public void testMetrics() throws Exception {
Metrics metrics = new Metrics(time);
int epoch = 1;
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, Collections.singleton(localId)));
KafkaRaftClient client = buildClient(Collections.singleton(localId), metrics);

assertNotNull(getMetric(metrics, "current-state"));
Expand Down Expand Up @@ -2017,11 +2064,14 @@ public void testClusterAuthorizationFailedInFetch() throws Exception {
public void testClusterAuthorizationFailedInBeginQuorumEpoch() throws Exception {
int otherNodeId = 1;
int epoch = 5;

Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));

Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
quorumStateStore.writeElectionState(ElectionState.withUnknownLeader(epoch - 1, voters));
KafkaRaftClient client = buildClient(voters);
assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), quorumStateStore.readElectionState());

time.sleep(electionTimeoutMs);
expectLeaderElection(client, voters, epoch);

pollUntilSend(client);
int correlationId = assertSentBeginQuorumEpochRequest(epoch);
Expand Down
17 changes: 13 additions & 4 deletions raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,23 @@ public void testInitializeAsLeader() throws IOException {
Set<Integer> voters = Utils.mkSet(localId, node1, node2);
store.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));

// If we were previously a leader, we will start as unattached
// so that records are always uniquely defined by epoch and offset
// even accounting for the loss of unflushed data.

// The election timeout should be reset after we become a candidate again
int jitterMs = 2500;
Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt());

QuorumState state = buildQuorumState(voters);
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
assertTrue(state.isLeader());
assertFalse(state.isLeader());
assertEquals(epoch, state.epoch());

LeaderState leaderState = state.leaderStateOrThrow();
assertEquals(epoch, leaderState.epoch());
assertEquals(Utils.mkSet(node1, node2), leaderState.nonEndorsingFollowers());
UnattachedState unattachedState = state.unattachedStateOrThrow();
assertEquals(epoch, unattachedState.epoch());
assertEquals(electionTimeoutMs + jitterMs,
unattachedState.remainingElectionTimeMs(time.milliseconds()));
}

@Test
Expand Down
Loading