From 372e02c94f423a679294a91956b71d02b17faace Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Fri, 24 May 2024 14:48:03 -0700 Subject: [PATCH 1/8] additional tests in LeaderStateTest to enforce monotonically increasing HW --- .../org/apache/kafka/raft/LeaderState.java | 27 ++++++++-- .../apache/kafka/raft/LeaderStateTest.java | 54 +++++++++++++++++++ 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index df4cc1315ac52..50ef93a7c352a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -243,8 +243,9 @@ private boolean maybeUpdateHighWatermark() { ); return true; } else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) { - log.error("The latest computed high watermark {} is smaller than the current " + - "value {}, which suggests that one of the voters has lost committed data. " + + log.warn("The latest computed high watermark {} is smaller than the current " + + "value {}, which should only happen when voter set membership changes. If the voter " + + "set has not changed this suggests that one of the voters has lost committed data. " + "Full voter replication state: {}", highWatermarkUpdateOffset, currentHighWatermarkMetadata.offset, voterStates.values()); return false; @@ -341,9 +342,16 @@ public boolean updateReplicaState( state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset); } }); - - Optional leaderEndOffsetOpt = - voterStates.get(localId).endOffset; + Optional leaderEndOffsetOpt; + if (voterStates.containsKey(localId)) { + leaderEndOffsetOpt = voterStates.get(localId).endOffset; + } else if (observerStates.containsKey(localId)) { + // The leader is not guaranteed to be in the voter set when in the process of being removed from the quorum. + log.info("Updating end offset for leader {} which is also an observer.", localId); + leaderEndOffsetOpt = observerStates.get(localId).endOffset; + } else { + throw new IllegalStateException("Leader state not found for localId " + localId); + } state.updateFollowerState( currentTimeMs, @@ -445,6 +453,15 @@ private boolean isVoter(int remoteNodeId) { return voterStates.containsKey(remoteNodeId); } + // for testing purposes + boolean removeVoter(int nodeId) { + if (voterStates.containsKey(nodeId)) { + voterStates.remove(nodeId); + return true; + } + return false; + } + private static class ReplicaState implements Comparable { final int nodeId; Optional endOffset; diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index e8fd1bb9ff365..eca2c2ea2e4fe 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -280,6 +280,60 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } + @Test + public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { + int node1 = 1; + int node2 = 2; + LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // removing node1 should not decrement HW to 10L + assertTrue(state.removeVoter(node1)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW cannot change until after node2 catches up to last HW + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW should update to 16L + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + + @Test + public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { + int node1 = 1; + int node2 = 2; + LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // removing leader should not decrement HW to 10L + assertTrue(state.removeVoter(localId)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW cannot change until node2 catches up to last HW + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will not update to 16L until majority of remaining voterSet (node1, node2) are at least 16L + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + @Test public void testNonMonotonicHighWatermarkUpdate() { MockTime time = new MockTime(); From c3a00c09f2fe7490fd13c1138530326ef028c46a Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Wed, 29 May 2024 11:02:45 -0700 Subject: [PATCH 2/8] voter set updated from partitionstate --- .../apache/kafka/raft/KafkaRaftClient.java | 2 +- .../org/apache/kafka/raft/LeaderState.java | 54 +- .../apache/kafka/raft/LeaderStateTest.java | 607 +++++++++--------- .../raft/internals/KafkaRaftMetricsTest.java | 190 +++--- 4 files changed, 441 insertions(+), 412 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 10910c3db7906..7e926103fd336 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -284,7 +284,7 @@ private void updateLeaderEndOffsetAndTimestamp( ) { final LogOffsetMetadata endOffsetMetadata = log.endOffset(); - if (state.updateLocalState(endOffsetMetadata)) { + if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet().voterIds())) { onUpdateLeaderHighWatermark(state, currentTimeMs); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 50ef93a7c352a..9adda6a66e762 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -243,7 +244,7 @@ private boolean maybeUpdateHighWatermark() { ); return true; } else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) { - log.warn("The latest computed high watermark {} is smaller than the current " + + log.info("The latest computed high watermark {} is smaller than the current " + "value {}, which should only happen when voter set membership changes. If the voter " + "set has not changed this suggests that one of the voters has lost committed data. " + "Full voter replication state: {}", highWatermarkUpdateOffset, @@ -297,10 +298,12 @@ private void logHighWatermarkUpdate( * Update the local replica state. * * @param endOffsetMetadata updated log end offset of local replica + * @param lastVoterSet the up-to-date voter set * @return true if the high watermark is updated as a result of this call */ public boolean updateLocalState( - LogOffsetMetadata endOffsetMetadata + LogOffsetMetadata endOffsetMetadata, + Set lastVoterSet ) { ReplicaState state = getOrCreateReplicaState(localId); state.endOffset.ifPresent(currentEndOffset -> { @@ -309,7 +312,8 @@ public boolean updateLocalState( "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); } }); - state.updateLeaderState(endOffsetMetadata); + state.updateLeaderEndOffset(endOffsetMetadata); + updateVoterSet(lastVoterSet); return maybeUpdateHighWatermark(); } @@ -343,12 +347,14 @@ public boolean updateReplicaState( } }); Optional leaderEndOffsetOpt; - if (voterStates.containsKey(localId)) { - leaderEndOffsetOpt = voterStates.get(localId).endOffset; - } else if (observerStates.containsKey(localId)) { + ReplicaState leaderVoterState = voterStates.get(localId); + ReplicaState leaderObserverState = observerStates.get(localId); + if (leaderVoterState != null) { + leaderEndOffsetOpt = leaderVoterState.endOffset; + } else if (leaderObserverState != null) { // The leader is not guaranteed to be in the voter set when in the process of being removed from the quorum. log.info("Updating end offset for leader {} which is also an observer.", localId); - leaderEndOffsetOpt = observerStates.get(localId).endOffset; + leaderEndOffsetOpt = leaderObserverState.endOffset; } else { throw new IllegalStateException("Leader state not found for localId " + localId); } @@ -395,12 +401,16 @@ public long epochStartOffset() { private ReplicaState getOrCreateReplicaState(int remoteNodeId) { ReplicaState state = voterStates.get(remoteNodeId); if (state == null) { - observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false)); - return observerStates.get(remoteNodeId); + return createObserverState(remoteNodeId); } return state; } + private ReplicaState createObserverState(int remoteNodeId) { + observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false)); + return observerStates.get(remoteNodeId); + } + public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) { clearInactiveObservers(currentTimeMs); @@ -453,13 +463,25 @@ private boolean isVoter(int remoteNodeId) { return voterStates.containsKey(remoteNodeId); } - // for testing purposes - boolean removeVoter(int nodeId) { - if (voterStates.containsKey(nodeId)) { - voterStates.remove(nodeId); - return true; + // with Jose's changes this will probably make more sense as VoterSet + private void updateVoterSet(Set lastVoterSet) { + // Remove any voter state that is not in the last voter set. They become observers. + for (Iterator> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) { + Integer nodeId = iter.next().getKey(); + if (!lastVoterSet.contains(nodeId)) { + createObserverState(nodeId); + iter.remove(); + } + } + + // Add any voter state that is in the last voter set but not in the current voter set. They are removed from + // the observerStates if they exist. + for (int voterId : lastVoterSet) { + if (!voterStates.containsKey(voterId)) { + voterStates.put(voterId, new ReplicaState(voterId, false)); + observerStates.remove(voterId); + } } - return false; } private static class ReplicaState implements Comparable { @@ -479,7 +501,7 @@ public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) { this.hasAcknowledgedLeader = hasAcknowledgedLeader; } - void updateLeaderState( + void updateLeaderEndOffset( LogOffsetMetadata endOffsetMetadata ) { // For the leader, we only update the end offset. The remaining fields diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index eca2c2ea2e4fe..07565be589bb0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.ReplicaKey; +//import org.apache.kafka.raft.internals.VoterSet; +//import org.apache.kafka.raft.internals.VoterSetTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -103,165 +105,166 @@ public void testNonFollowerAcknowledgement() { assertThrows(IllegalArgumentException.class, () -> state.addAcknowledgementFrom(nonVoterId)); } - @Test - public void testUpdateHighWatermarkQuorumSizeOne() { - LeaderState state = newLeaderState(singleton(localId), 15L); - assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); - assertEquals(emptySet(), state.nonAcknowledgingVoters()); - assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); - assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(20))); - assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); - } - - @Test - public void testNonMonotonicLocalEndOffsetUpdate() { - LeaderState state = newLeaderState(singleton(localId), 15L); - assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); - assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - assertThrows(IllegalStateException.class, - () -> state.updateLocalState(new LogOffsetMetadata(15L))); - } - - @Test - public void testLastCaughtUpTimeVoters() { - int node1 = 1; - int node2 = 2; - int currentTime = 1000; - int fetchTime = 0; - int caughtUpTime = -1; - LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); - assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(10L))); - assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); - assertEquals(Optional.empty(), state.highWatermark()); - - // Node 1 falls behind - assertFalse(state.updateLocalState(new LogOffsetMetadata(11L))); - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 catches up to leader - assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); - caughtUpTime = fetchTime; - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 falls behind - assertFalse(state.updateLocalState(new LogOffsetMetadata(100L))); - assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 catches up to the last fetch offset - int prevFetchTime = fetchTime; - assertFalse(state.updateLocalState(new LogOffsetMetadata(200L))); - assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); - caughtUpTime = prevFetchTime; - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node2 has never caught up to leader - assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(300L))); - assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); - assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); - assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); - assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); - } - - @Test - public void testLastCaughtUpTimeObserver() { - int node1 = 1; - int currentTime = 1000; - int fetchTime = 0; - int caughtUpTime = -1; - LeaderState state = newLeaderState(singleton(localId), 5L); - assertEquals(Optional.empty(), state.highWatermark()); - assertEquals(emptySet(), state.nonAcknowledgingVoters()); - - // Node 1 falls behind - assertTrue(state.updateLocalState(new LogOffsetMetadata(11L))); - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 catches up to leader - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); - caughtUpTime = fetchTime; - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 falls behind - assertTrue(state.updateLocalState(new LogOffsetMetadata(100L))); - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 catches up to the last fetch offset - int prevFetchTime = fetchTime; - assertTrue(state.updateLocalState(new LogOffsetMetadata(200L))); - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); - caughtUpTime = prevFetchTime; - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); - - // Node 1 catches up to leader - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(200L))); - caughtUpTime = fetchTime; - assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); - } - - @Test - public void testIdempotentEndOffsetUpdate() { - LeaderState state = newLeaderState(singleton(localId), 15L); - assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); - assertFalse(state.updateLocalState(new LogOffsetMetadata(16L))); - assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - } - - @Test - public void testUpdateHighWatermarkMetadata() { - LeaderState state = newLeaderState(singleton(localId), 15L); - assertEquals(Optional.empty(), state.highWatermark()); - - LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); - assertTrue(state.updateLocalState(initialHw)); - assertEquals(Optional.of(initialHw), state.highWatermark()); - - LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); - assertTrue(state.updateLocalState(updateHw)); - assertEquals(Optional.of(updateHw), state.highWatermark()); - } - - @Test - public void testUpdateHighWatermarkQuorumSizeTwo() { - int otherNodeId = 1; - LeaderState state = newLeaderState(mkSet(localId, otherNodeId), 10L); - assertFalse(state.updateLocalState(new LogOffsetMetadata(13L))); - assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); - assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); - assertEquals(emptySet(), state.nonAcknowledgingVoters()); - assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); - assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); - assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); - assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); - } +// @Test +// public void testUpdateHighWatermarkQuorumSizeOne() { +// LeaderState state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); +// assertEquals(emptySet(), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(20))); +// assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); +// } +// +// @Test +// public void testNonMonotonicLocalEndOffsetUpdate() { +// LeaderState state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +// assertThrows(IllegalStateException.class, +// () -> state.updateLocalState(new LogOffsetMetadata(15L))); +// } +// +// @Test +// public void testLastCaughtUpTimeVoters() { +// int node1 = 1; +// int node2 = 2; +// int currentTime = 1000; +// int fetchTime = 0; +// int caughtUpTime = -1; +// LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(10L))); +// assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// +// // Node 1 falls behind +// assertFalse(state.updateLocalState(new LogOffsetMetadata(11L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to leader +// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); +// caughtUpTime = fetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 falls behind +// assertFalse(state.updateLocalState(new LogOffsetMetadata(100L))); +// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to the last fetch offset +// int prevFetchTime = fetchTime; +// assertFalse(state.updateLocalState(new LogOffsetMetadata(200L))); +// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); +// caughtUpTime = prevFetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node2 has never caught up to leader +// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(300L))); +// assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); +// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); +// assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); +// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); +// } +// +// @Test +// public void testLastCaughtUpTimeObserver() { +// int node1 = 1; +// int currentTime = 1000; +// int fetchTime = 0; +// int caughtUpTime = -1; +// LeaderState state = newLeaderState(singleton(localId), 5L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertEquals(emptySet(), state.nonAcknowledgingVoters()); +// +// // Node 1 falls behind +// assertTrue(state.updateLocalState(new LogOffsetMetadata(11L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to leader +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); +// caughtUpTime = fetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 falls behind +// assertTrue(state.updateLocalState(new LogOffsetMetadata(100L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to the last fetch offset +// int prevFetchTime = fetchTime; +// assertTrue(state.updateLocalState(new LogOffsetMetadata(200L))); +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); +// caughtUpTime = prevFetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// +// // Node 1 catches up to leader +// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(200L))); +// caughtUpTime = fetchTime; +// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); +// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); +// } +// +// @Test +// public void testIdempotentEndOffsetUpdate() { +// LeaderState state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(16L))); +// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); +// } +// +// @Test +// public void testUpdateHighWatermarkMetadata() { +// LeaderState state = newLeaderState(singleton(localId), 15L); +// assertEquals(Optional.empty(), state.highWatermark()); +// +// LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); +// assertTrue(state.updateLocalState(initialHw)); +// assertEquals(Optional.of(initialHw), state.highWatermark()); +// +// LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); +// assertTrue(state.updateLocalState(updateHw)); +// assertEquals(Optional.of(updateHw), state.highWatermark()); +// } +// +// @Test +// public void testUpdateHighWatermarkQuorumSizeTwo() { +// int otherNodeId = 1; +// LeaderState state = newLeaderState(mkSet(localId, otherNodeId), 10L); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(13L))); +// assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); +// assertEquals(emptySet(), state.nonAcknowledgingVoters()); +// assertEquals(Optional.empty(), state.highWatermark()); +// assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); +// assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); +// assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); +// assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); +// } @Test public void testUpdateHighWatermarkQuorumSizeThree() { int node1 = 1; int node2 = 2; - LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + Set voterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(voterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voterSet)); assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); @@ -272,7 +275,7 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(20L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); @@ -284,15 +287,16 @@ public void testUpdateHighWatermarkQuorumSizeThree() { public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { int node1 = 1; int node2 = 2; - LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // removing node1 should not decrement HW to 10L - assertTrue(state.removeVoter(node1)); - assertFalse(state.updateLocalState(new LogOffsetMetadata(17L))); + Set voterSetWithoutNode1 = mkSet(localId, node2); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutNode1)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW cannot change until after node2 catches up to last HW @@ -310,15 +314,16 @@ public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { int node1 = 1; int node2 = 2; - LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // removing leader should not decrement HW to 10L - assertTrue(state.removeVoter(localId)); - assertFalse(state.updateLocalState(new LogOffsetMetadata(17L))); + Set voterSetWithoutLeader = mkSet(node1, node2); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutLeader)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW cannot change until node2 catches up to last HW @@ -338,8 +343,9 @@ public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { public void testNonMonotonicHighWatermarkUpdate() { MockTime time = new MockTime(); int node1 = 1; - LeaderState state = newLeaderState(mkSet(localId, node1), 0L); - state.updateLocalState(new LogOffsetMetadata(10L)); + Set voterSet = mkSet(localId, node1); + LeaderState state = newLeaderState(voterSet, 0L); + state.updateLocalState(new LogOffsetMetadata(10L), voterSet); state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L)); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); @@ -363,151 +369,152 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending() { assertEquals(Arrays.asList(node2, node1), state.nonLeaderVotersByDescendingFetchOffset()); } - @Test - public void testDescribeQuorumWithSingleVoter() { - MockTime time = new MockTime(); - long leaderStartOffset = 10L; - long leaderEndOffset = 15L; - - LeaderState state = newLeaderState(mkSet(localId), leaderStartOffset); - - // Until we have updated local state, high watermark should be uninitialized - assertEquals(Optional.empty(), state.highWatermark()); - DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); - assertEquals(-1, partitionData.highWatermark()); - assertEquals(localId, partitionData.leaderId()); - assertEquals(epoch, partitionData.leaderEpoch()); - assertEquals(Collections.emptyList(), partitionData.observers()); - assertEquals(1, partitionData.currentVoters().size()); - assertEquals(new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(localId) - .setLogEndOffset(-1) - .setLastFetchTimestamp(time.milliseconds()) - .setLastCaughtUpTimestamp(time.milliseconds()), - partitionData.currentVoters().get(0)); - - - // Now update the high watermark and verify the describe output - assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); - assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); - - time.sleep(500); - - partitionData = state.describeQuorum(time.milliseconds()); - assertEquals(leaderEndOffset, partitionData.highWatermark()); - assertEquals(localId, partitionData.leaderId()); - assertEquals(epoch, partitionData.leaderEpoch()); - assertEquals(Collections.emptyList(), partitionData.observers()); - assertEquals(1, partitionData.currentVoters().size()); - assertEquals(new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(localId) - .setLogEndOffset(leaderEndOffset) - .setLastFetchTimestamp(time.milliseconds()) - .setLastCaughtUpTimestamp(time.milliseconds()), - partitionData.currentVoters().get(0)); - } - - @Test - public void testDescribeQuorumWithMultipleVoters() { - MockTime time = new MockTime(); - int activeFollowerId = 1; - int inactiveFollowerId = 2; - long leaderStartOffset = 10L; - long leaderEndOffset = 15L; - - LeaderState state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset); - assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); - assertEquals(Optional.empty(), state.highWatermark()); - - long activeFollowerFetchTimeMs = time.milliseconds(); - assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset))); - assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); - - time.sleep(500); - - DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); - assertEquals(leaderEndOffset, partitionData.highWatermark()); - assertEquals(localId, partitionData.leaderId()); - assertEquals(epoch, partitionData.leaderEpoch()); - assertEquals(Collections.emptyList(), partitionData.observers()); - - List voterStates = partitionData.currentVoters(); - assertEquals(3, voterStates.size()); - - DescribeQuorumResponseData.ReplicaState leaderState = - findReplicaOrFail(localId, partitionData.currentVoters()); - assertEquals(new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(localId) - .setLogEndOffset(leaderEndOffset) - .setLastFetchTimestamp(time.milliseconds()) - .setLastCaughtUpTimestamp(time.milliseconds()), - leaderState); - - DescribeQuorumResponseData.ReplicaState activeFollowerState = - findReplicaOrFail(activeFollowerId, partitionData.currentVoters()); - assertEquals(new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(activeFollowerId) - .setLogEndOffset(leaderEndOffset) - .setLastFetchTimestamp(activeFollowerFetchTimeMs) - .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs), - activeFollowerState); - - DescribeQuorumResponseData.ReplicaState inactiveFollowerState = - findReplicaOrFail(inactiveFollowerId, partitionData.currentVoters()); - assertEquals(new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(inactiveFollowerId) - .setLogEndOffset(-1) - .setLastFetchTimestamp(-1) - .setLastCaughtUpTimestamp(-1), - inactiveFollowerState); - } - +// @Test +// public void testDescribeQuorumWithSingleVoter() { +// MockTime time = new MockTime(); +// long leaderStartOffset = 10L; +// long leaderEndOffset = 15L; +// +// LeaderState state = newLeaderState(mkSet(localId), leaderStartOffset); +// +// // Until we have updated local state, high watermark should be uninitialized +// assertEquals(Optional.empty(), state.highWatermark()); +// DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); +// assertEquals(-1, partitionData.highWatermark()); +// assertEquals(localId, partitionData.leaderId()); +// assertEquals(epoch, partitionData.leaderEpoch()); +// assertEquals(Collections.emptyList(), partitionData.observers()); +// assertEquals(1, partitionData.currentVoters().size()); +// assertEquals(new DescribeQuorumResponseData.ReplicaState() +// .setReplicaId(localId) +// .setLogEndOffset(-1) +// .setLastFetchTimestamp(time.milliseconds()) +// .setLastCaughtUpTimestamp(time.milliseconds()), +// partitionData.currentVoters().get(0)); +// +// +// // Now update the high watermark and verify the describe output +// assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); +// assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); +// +// time.sleep(500); +// +// partitionData = state.describeQuorum(time.milliseconds()); +// assertEquals(leaderEndOffset, partitionData.highWatermark()); +// assertEquals(localId, partitionData.leaderId()); +// assertEquals(epoch, partitionData.leaderEpoch()); +// assertEquals(Collections.emptyList(), partitionData.observers()); +// assertEquals(1, partitionData.currentVoters().size()); +// assertEquals(new DescribeQuorumResponseData.ReplicaState() +// .setReplicaId(localId) +// .setLogEndOffset(leaderEndOffset) +// .setLastFetchTimestamp(time.milliseconds()) +// .setLastCaughtUpTimestamp(time.milliseconds()), +// partitionData.currentVoters().get(0)); +// } +// +// @Test +// public void testDescribeQuorumWithMultipleVoters() { +// MockTime time = new MockTime(); +// int activeFollowerId = 1; +// int inactiveFollowerId = 2; +// long leaderStartOffset = 10L; +// long leaderEndOffset = 15L; +// +// LeaderState state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset); +// assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); +// assertEquals(Optional.empty(), state.highWatermark()); +// +// long activeFollowerFetchTimeMs = time.milliseconds(); +// assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset))); +// assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); +// +// time.sleep(500); +// +// DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); +// assertEquals(leaderEndOffset, partitionData.highWatermark()); +// assertEquals(localId, partitionData.leaderId()); +// assertEquals(epoch, partitionData.leaderEpoch()); +// assertEquals(Collections.emptyList(), partitionData.observers()); +// +// List voterStates = partitionData.currentVoters(); +// assertEquals(3, voterStates.size()); +// +// DescribeQuorumResponseData.ReplicaState leaderState = +// findReplicaOrFail(localId, partitionData.currentVoters()); +// assertEquals(new DescribeQuorumResponseData.ReplicaState() +// .setReplicaId(localId) +// .setLogEndOffset(leaderEndOffset) +// .setLastFetchTimestamp(time.milliseconds()) +// .setLastCaughtUpTimestamp(time.milliseconds()), +// leaderState); +// +// DescribeQuorumResponseData.ReplicaState activeFollowerState = +// findReplicaOrFail(activeFollowerId, partitionData.currentVoters()); +// assertEquals(new DescribeQuorumResponseData.ReplicaState() +// .setReplicaId(activeFollowerId) +// .setLogEndOffset(leaderEndOffset) +// .setLastFetchTimestamp(activeFollowerFetchTimeMs) +// .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs), +// activeFollowerState); +// +// DescribeQuorumResponseData.ReplicaState inactiveFollowerState = +// findReplicaOrFail(inactiveFollowerId, partitionData.currentVoters()); +// assertEquals(new DescribeQuorumResponseData.ReplicaState() +// .setReplicaId(inactiveFollowerId) +// .setLogEndOffset(-1) +// .setLastFetchTimestamp(-1) +// .setLastCaughtUpTimestamp(-1), +// inactiveFollowerState); +// } +// private LeaderState setUpLeaderAndFollowers(int follower1, int follower2, long leaderStartOffset, long leaderEndOffset) { - LeaderState state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset); - state.updateLocalState(new LogOffsetMetadata(leaderEndOffset)); + Set voterSet = mkSet(localId, follower1, follower2); + LeaderState state = newLeaderState(voterSet, leaderStartOffset); + state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet); assertEquals(Optional.empty(), state.highWatermark()); state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset)); state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset)); return state; } - @Test - public void testDescribeQuorumWithObservers() { - MockTime time = new MockTime(); - int observerId = 10; - long epochStartOffset = 10L; - - LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); - assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1))); - assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); - - time.sleep(500); - long observerFetchTimeMs = time.milliseconds(); - assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); - - time.sleep(500); - DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); - assertEquals(epochStartOffset + 1, partitionData.highWatermark()); - assertEquals(localId, partitionData.leaderId()); - assertEquals(epoch, partitionData.leaderEpoch()); - - assertEquals(1, partitionData.currentVoters().size()); - assertEquals(localId, partitionData.currentVoters().get(0).replicaId()); - - List observerStates = partitionData.observers(); - assertEquals(1, observerStates.size()); - - DescribeQuorumResponseData.ReplicaState observerState = observerStates.get(0); - assertEquals(new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(observerId) - .setLogEndOffset(epochStartOffset + 1) - .setLastFetchTimestamp(observerFetchTimeMs) - .setLastCaughtUpTimestamp(observerFetchTimeMs), - observerState); - } +// @Test +// public void testDescribeQuorumWithObservers() { +// MockTime time = new MockTime(); +// int observerId = 10; +// long epochStartOffset = 10L; +// +// LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); +// assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1))); +// assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); +// +// time.sleep(500); +// long observerFetchTimeMs = time.milliseconds(); +// assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); +// +// time.sleep(500); +// DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); +// assertEquals(epochStartOffset + 1, partitionData.highWatermark()); +// assertEquals(localId, partitionData.leaderId()); +// assertEquals(epoch, partitionData.leaderEpoch()); +// +// assertEquals(1, partitionData.currentVoters().size()); +// assertEquals(localId, partitionData.currentVoters().get(0).replicaId()); +// +// List observerStates = partitionData.observers(); +// assertEquals(1, observerStates.size()); +// +// DescribeQuorumResponseData.ReplicaState observerState = observerStates.get(0); +// assertEquals(new DescribeQuorumResponseData.ReplicaState() +// .setReplicaId(observerId) +// .setLogEndOffset(epochStartOffset + 1) +// .setLastFetchTimestamp(observerFetchTimeMs) +// .setLastCaughtUpTimestamp(observerFetchTimeMs), +// observerState); +// } @Test public void testCheckQuorum() { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 1b729e36d30b6..d656fddfc32f0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.raft.LogOffsetMetadata; +//import org.apache.kafka.raft.LogOffsetMetadata; import org.apache.kafka.raft.MockQuorumStateStore; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; @@ -32,9 +32,9 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; -import java.util.Map; +//import java.util.Map; import java.util.Collections; -import java.util.Optional; +//import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; @@ -88,98 +88,98 @@ private QuorumState buildQuorumState(VoterSet voterSet, short kraftVersion) { ); } - @ParameterizedTest - @ValueSource(shorts = {0, 1}) - public void shouldRecordVoterQuorumState(short kraftVersion) { - boolean withDirectoryId = kraftVersion > 0; - Map voterMap = VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId); - voterMap.put( - localId, - VoterSetTest.voterNode( - ReplicaKey.of( - localId, - withDirectoryId ? Optional.of(localDirectoryId) : Optional.empty() - ) - ) - ); - QuorumState state = buildQuorumState(VoterSetTest.voterSet(voterMap), kraftVersion); - - state.initialize(new OffsetAndEpoch(0L, 0)); - raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); - - assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); - assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); - assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); - assertEquals( - Uuid.ZERO_UUID.toString(), - getMetric(metrics, "current-vote-directory-id").metricValue() - ); - assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); - assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); - - state.transitionToCandidate(); - assertEquals("candidate", getMetric(metrics, "current-state").metricValue()); - assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); - assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); - assertEquals( - localDirectoryId.toString(), - getMetric(metrics, "current-vote-directory-id").metricValue() - ); - assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); - assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); - - state.candidateStateOrThrow().recordGrantedVote(1); - state.transitionToLeader(2L, accumulator); - assertEquals("leader", getMetric(metrics, "current-state").metricValue()); - assertEquals((double) localId, getMetric(metrics, "current-leader").metricValue()); - assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); - assertEquals( - localDirectoryId.toString(), - getMetric(metrics, "current-vote-directory-id").metricValue() - ); - assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); - assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); - - state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L)); - state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); - assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); - - state.transitionToFollower(2, 1); - assertEquals("follower", getMetric(metrics, "current-state").metricValue()); - assertEquals((double) 1, getMetric(metrics, "current-leader").metricValue()); - assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); - assertEquals( - Uuid.ZERO_UUID.toString(), - getMetric(metrics, "current-vote-directory-id").metricValue() - ); - assertEquals((double) 2, getMetric(metrics, "current-epoch").metricValue()); - assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); - - state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L)); - assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); - - state.transitionToVoted(3, ReplicaKey.of(2, Optional.empty())); - assertEquals("voted", getMetric(metrics, "current-state").metricValue()); - assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); - assertEquals((double) 2, getMetric(metrics, "current-vote").metricValue()); - assertEquals( - Uuid.ZERO_UUID.toString(), - getMetric(metrics, "current-vote-directory-id").metricValue() - ); - assertEquals((double) 3, getMetric(metrics, "current-epoch").metricValue()); - assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); - - state.transitionToUnattached(4); - assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); - assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); - assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); - assertEquals( - Uuid.ZERO_UUID.toString(), - getMetric(metrics, "current-vote-directory-id").metricValue() - ); - assertEquals((double) 4, getMetric(metrics, "current-epoch").metricValue()); - assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); - } +// @ParameterizedTest +// @ValueSource(shorts = {0, 1}) +// public void shouldRecordVoterQuorumState(short kraftVersion) { +// boolean withDirectoryId = kraftVersion > 0; +// Map voterMap = VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId); +// voterMap.put( +// localId, +// VoterSetTest.voterNode( +// ReplicaKey.of( +// localId, +// withDirectoryId ? Optional.of(localDirectoryId) : Optional.empty() +// ) +// ) +// ); +// QuorumState state = buildQuorumState(VoterSetTest.voterSet(voterMap), kraftVersion); +// +// state.initialize(new OffsetAndEpoch(0L, 0)); +// raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); +// +// assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +// assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); +// assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); +// assertEquals( +// Uuid.ZERO_UUID.toString(), +// getMetric(metrics, "current-vote-directory-id").metricValue() +// ); +// assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); +// assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.transitionToCandidate(); +// assertEquals("candidate", getMetric(metrics, "current-state").metricValue()); +// assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); +// assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); +// assertEquals( +// localDirectoryId.toString(), +// getMetric(metrics, "current-vote-directory-id").metricValue() +// ); +// assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); +// assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.candidateStateOrThrow().recordGrantedVote(1); +// state.transitionToLeader(2L, accumulator); +// assertEquals("leader", getMetric(metrics, "current-state").metricValue()); +// assertEquals((double) localId, getMetric(metrics, "current-leader").metricValue()); +// assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); +// assertEquals( +// localDirectoryId.toString(), +// getMetric(metrics, "current-vote-directory-id").metricValue() +// ); +// assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); +// assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L)); +// state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); +// assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.transitionToFollower(2, 1); +// assertEquals("follower", getMetric(metrics, "current-state").metricValue()); +// assertEquals((double) 1, getMetric(metrics, "current-leader").metricValue()); +// assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); +// assertEquals( +// Uuid.ZERO_UUID.toString(), +// getMetric(metrics, "current-vote-directory-id").metricValue() +// ); +// assertEquals((double) 2, getMetric(metrics, "current-epoch").metricValue()); +// assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L)); +// assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.transitionToVoted(3, ReplicaKey.of(2, Optional.empty())); +// assertEquals("voted", getMetric(metrics, "current-state").metricValue()); +// assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); +// assertEquals((double) 2, getMetric(metrics, "current-vote").metricValue()); +// assertEquals( +// Uuid.ZERO_UUID.toString(), +// getMetric(metrics, "current-vote-directory-id").metricValue() +// ); +// assertEquals((double) 3, getMetric(metrics, "current-epoch").metricValue()); +// assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); +// +// state.transitionToUnattached(4); +// assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +// assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); +// assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); +// assertEquals( +// Uuid.ZERO_UUID.toString(), +// getMetric(metrics, "current-vote-directory-id").metricValue() +// ); +// assertEquals((double) 4, getMetric(metrics, "current-epoch").metricValue()); +// assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); +// } @ParameterizedTest @ValueSource(shorts = {0, 1}) From 93fd95db8e137376f00cc85d4672f88bac0d73de Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Mon, 3 Jun 2024 11:43:14 -0700 Subject: [PATCH 3/8] more comments --- .../org/apache/kafka/raft/LeaderState.java | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9adda6a66e762..13a6d106345d8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -313,7 +313,7 @@ public boolean updateLocalState( } }); state.updateLeaderEndOffset(endOffsetMetadata); - updateVoterSet(lastVoterSet); + updateVoterAndObserverStates(lastVoterSet); return maybeUpdateHighWatermark(); } @@ -346,18 +346,7 @@ public boolean updateReplicaState( state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset); } }); - Optional leaderEndOffsetOpt; - ReplicaState leaderVoterState = voterStates.get(localId); - ReplicaState leaderObserverState = observerStates.get(localId); - if (leaderVoterState != null) { - leaderEndOffsetOpt = leaderVoterState.endOffset; - } else if (leaderObserverState != null) { - // The leader is not guaranteed to be in the voter set when in the process of being removed from the quorum. - log.info("Updating end offset for leader {} which is also an observer.", localId); - leaderEndOffsetOpt = leaderObserverState.endOffset; - } else { - throw new IllegalStateException("Leader state not found for localId " + localId); - } + Optional leaderEndOffsetOpt = getOrCreateReplicaState(localId).endOffset; state.updateFollowerState( currentTimeMs, @@ -401,16 +390,12 @@ public long epochStartOffset() { private ReplicaState getOrCreateReplicaState(int remoteNodeId) { ReplicaState state = voterStates.get(remoteNodeId); if (state == null) { - return createObserverState(remoteNodeId); + observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false)); + return observerStates.get(remoteNodeId); } return state; } - private ReplicaState createObserverState(int remoteNodeId) { - observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false)); - return observerStates.get(remoteNodeId); - } - public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) { clearInactiveObservers(currentTimeMs); @@ -453,9 +438,13 @@ private DescribeQuorumResponseData.ReplicaState describeReplicaState( } + /** + * Clear observer states that have not been active for a while and are not the leader. + */ private void clearInactiveObservers(final long currentTimeMs) { observerStates.entrySet().removeIf(integerReplicaStateEntry -> - currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS + currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS && + integerReplicaStateEntry.getKey() != localId ); } @@ -463,23 +452,22 @@ private boolean isVoter(int remoteNodeId) { return voterStates.containsKey(remoteNodeId); } - // with Jose's changes this will probably make more sense as VoterSet - private void updateVoterSet(Set lastVoterSet) { - // Remove any voter state that is not in the last voter set. They become observers. + private void updateVoterAndObserverStates(Set lastVoterSet) { + // Move any replica that is not in the last voter set from voterStates to observerStates for (Iterator> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) { - Integer nodeId = iter.next().getKey(); - if (!lastVoterSet.contains(nodeId)) { - createObserverState(nodeId); + Map.Entry replica = iter.next(); + if (!lastVoterSet.contains(replica.getKey())) { + observerStates.put(replica.getKey(), replica.getValue()); iter.remove(); } } - // Add any voter state that is in the last voter set but not in the current voter set. They are removed from - // the observerStates if they exist. + // Add replicas that are in the last voter set and not in voterStates to voterStates (from observerStates + // if they exist) for (int voterId : lastVoterSet) { if (!voterStates.containsKey(voterId)) { - voterStates.put(voterId, new ReplicaState(voterId, false)); - observerStates.remove(voterId); + Optional existingObserverState = Optional.ofNullable(observerStates.remove(voterId)); + voterStates.put(voterId, existingObserverState.orElse(new ReplicaState(voterId, false))); } } } From 66e27d75d4f0b7e1aa04cdd55a2c75aae82c38a4 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Mon, 3 Jun 2024 14:00:22 -0700 Subject: [PATCH 4/8] refactoring tests --- .../apache/kafka/raft/LeaderStateTest.java | 582 +++++++++--------- .../raft/internals/KafkaRaftMetricsTest.java | 191 +++--- 2 files changed, 391 insertions(+), 382 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 07565be589bb0..d7058eb76aada 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -22,8 +22,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.ReplicaKey; -//import org.apache.kafka.raft.internals.VoterSet; -//import org.apache.kafka.raft.internals.VoterSetTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -105,158 +103,165 @@ public void testNonFollowerAcknowledgement() { assertThrows(IllegalArgumentException.class, () -> state.addAcknowledgementFrom(nonVoterId)); } -// @Test -// public void testUpdateHighWatermarkQuorumSizeOne() { -// LeaderState state = newLeaderState(singleton(localId), 15L); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); -// assertEquals(emptySet(), state.nonAcknowledgingVoters()); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); -// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); -// assertTrue(state.updateLocalState(new LogOffsetMetadata(20))); -// assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); -// } -// -// @Test -// public void testNonMonotonicLocalEndOffsetUpdate() { -// LeaderState state = newLeaderState(singleton(localId), 15L); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); -// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); -// assertThrows(IllegalStateException.class, -// () -> state.updateLocalState(new LogOffsetMetadata(15L))); -// } -// -// @Test -// public void testLastCaughtUpTimeVoters() { -// int node1 = 1; -// int node2 = 2; -// int currentTime = 1000; -// int fetchTime = 0; -// int caughtUpTime = -1; -// LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertFalse(state.updateLocalState(new LogOffsetMetadata(10L))); -// assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); -// assertEquals(Optional.empty(), state.highWatermark()); -// -// // Node 1 falls behind -// assertFalse(state.updateLocalState(new LogOffsetMetadata(11L))); -// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 catches up to leader -// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); -// caughtUpTime = fetchTime; -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 falls behind -// assertFalse(state.updateLocalState(new LogOffsetMetadata(100L))); -// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 catches up to the last fetch offset -// int prevFetchTime = fetchTime; -// assertFalse(state.updateLocalState(new LogOffsetMetadata(200L))); -// assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); -// caughtUpTime = prevFetchTime; -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node2 has never caught up to leader -// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); -// assertFalse(state.updateLocalState(new LogOffsetMetadata(300L))); -// assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); -// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); -// assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); -// assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); -// } -// -// @Test -// public void testLastCaughtUpTimeObserver() { -// int node1 = 1; -// int currentTime = 1000; -// int fetchTime = 0; -// int caughtUpTime = -1; -// LeaderState state = newLeaderState(singleton(localId), 5L); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertEquals(emptySet(), state.nonAcknowledgingVoters()); -// -// // Node 1 falls behind -// assertTrue(state.updateLocalState(new LogOffsetMetadata(11L))); -// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 catches up to leader -// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); -// caughtUpTime = fetchTime; -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 falls behind -// assertTrue(state.updateLocalState(new LogOffsetMetadata(100L))); -// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 catches up to the last fetch offset -// int prevFetchTime = fetchTime; -// assertTrue(state.updateLocalState(new LogOffsetMetadata(200L))); -// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); -// caughtUpTime = prevFetchTime; -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); -// -// // Node 1 catches up to leader -// assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(200L))); -// caughtUpTime = fetchTime; -// assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); -// assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); -// } -// -// @Test -// public void testIdempotentEndOffsetUpdate() { -// LeaderState state = newLeaderState(singleton(localId), 15L); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); -// assertFalse(state.updateLocalState(new LogOffsetMetadata(16L))); -// assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); -// } -// -// @Test -// public void testUpdateHighWatermarkMetadata() { -// LeaderState state = newLeaderState(singleton(localId), 15L); -// assertEquals(Optional.empty(), state.highWatermark()); -// -// LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); -// assertTrue(state.updateLocalState(initialHw)); -// assertEquals(Optional.of(initialHw), state.highWatermark()); -// -// LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); -// assertTrue(state.updateLocalState(updateHw)); -// assertEquals(Optional.of(updateHw), state.highWatermark()); -// } -// -// @Test -// public void testUpdateHighWatermarkQuorumSizeTwo() { -// int otherNodeId = 1; -// LeaderState state = newLeaderState(mkSet(localId, otherNodeId), 10L); -// assertFalse(state.updateLocalState(new LogOffsetMetadata(13L))); -// assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); -// assertEquals(emptySet(), state.nonAcknowledgingVoters()); -// assertEquals(Optional.empty(), state.highWatermark()); -// assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); -// assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); -// assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); -// assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); -// } + @Test + public void testUpdateHighWatermarkQuorumSizeOne() { + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); + assertEquals(Optional.empty(), state.highWatermark()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voterSet)); + assertEquals(emptySet(), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + assertTrue(state.updateLocalState(new LogOffsetMetadata(20), voterSet)); + assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); + } + + @Test + public void testNonMonotonicLocalEndOffsetUpdate() { + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); + assertEquals(Optional.empty(), state.highWatermark()); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + assertThrows(IllegalStateException.class, + () -> state.updateLocalState(new LogOffsetMetadata(15L), voterSet)); + } + + @Test + public void testLastCaughtUpTimeVoters() { + int node1 = 1; + int node2 = 2; + int currentTime = 1000; + int fetchTime = 0; + int caughtUpTime = -1; + Set voterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(voterSet, 10L); + assertEquals(Optional.empty(), state.highWatermark()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), voterSet)); + assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + + // Node 1 falls behind + assertFalse(state.updateLocalState(new LogOffsetMetadata(11L), voterSet)); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 catches up to leader + assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); + caughtUpTime = fetchTime; + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 falls behind + assertFalse(state.updateLocalState(new LogOffsetMetadata(100L), voterSet)); + assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 catches up to the last fetch offset + int prevFetchTime = fetchTime; + assertFalse(state.updateLocalState(new LogOffsetMetadata(200L), voterSet)); + assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); + caughtUpTime = prevFetchTime; + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node2 has never caught up to leader + assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(300L), voterSet)); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); + assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); + assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); + } + + @Test + public void testLastCaughtUpTimeObserver() { + int node1 = 1; + int currentTime = 1000; + int fetchTime = 0; + int caughtUpTime = -1; + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 5L); + assertEquals(Optional.empty(), state.highWatermark()); + assertEquals(emptySet(), state.nonAcknowledgingVoters()); + + // Node 1 falls behind + assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), voterSet)); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); + caughtUpTime = fetchTime; + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 falls behind + assertTrue(state.updateLocalState(new LogOffsetMetadata(100L), voterSet)); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 catches up to the last fetch offset + int prevFetchTime = fetchTime; + assertTrue(state.updateLocalState(new LogOffsetMetadata(200L), voterSet)); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); + caughtUpTime = prevFetchTime; + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(200L))); + caughtUpTime = fetchTime; + assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); + } + + @Test + public void testIdempotentEndOffsetUpdate() { + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); + assertEquals(Optional.empty(), state.highWatermark()); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + + @Test + public void testUpdateHighWatermarkMetadata() { + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); + assertEquals(Optional.empty(), state.highWatermark()); + + LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); + assertTrue(state.updateLocalState(initialHw, voterSet)); + assertEquals(Optional.of(initialHw), state.highWatermark()); + + LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); + assertTrue(state.updateLocalState(updateHw, voterSet)); + assertEquals(Optional.of(updateHw), state.highWatermark()); + } + + @Test + public void testUpdateHighWatermarkQuorumSizeTwo() { + int otherNodeId = 1; + Set voterSet = mkSet(localId, otherNodeId); + LeaderState state = newLeaderState(voterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), voterSet)); + assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); + assertEquals(emptySet(), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); + assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); + assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); + assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); + } @Test public void testUpdateHighWatermarkQuorumSizeThree() { @@ -369,105 +374,107 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending() { assertEquals(Arrays.asList(node2, node1), state.nonLeaderVotersByDescendingFetchOffset()); } -// @Test -// public void testDescribeQuorumWithSingleVoter() { -// MockTime time = new MockTime(); -// long leaderStartOffset = 10L; -// long leaderEndOffset = 15L; -// -// LeaderState state = newLeaderState(mkSet(localId), leaderStartOffset); -// -// // Until we have updated local state, high watermark should be uninitialized -// assertEquals(Optional.empty(), state.highWatermark()); -// DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); -// assertEquals(-1, partitionData.highWatermark()); -// assertEquals(localId, partitionData.leaderId()); -// assertEquals(epoch, partitionData.leaderEpoch()); -// assertEquals(Collections.emptyList(), partitionData.observers()); -// assertEquals(1, partitionData.currentVoters().size()); -// assertEquals(new DescribeQuorumResponseData.ReplicaState() -// .setReplicaId(localId) -// .setLogEndOffset(-1) -// .setLastFetchTimestamp(time.milliseconds()) -// .setLastCaughtUpTimestamp(time.milliseconds()), -// partitionData.currentVoters().get(0)); -// -// -// // Now update the high watermark and verify the describe output -// assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); -// assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); -// -// time.sleep(500); -// -// partitionData = state.describeQuorum(time.milliseconds()); -// assertEquals(leaderEndOffset, partitionData.highWatermark()); -// assertEquals(localId, partitionData.leaderId()); -// assertEquals(epoch, partitionData.leaderEpoch()); -// assertEquals(Collections.emptyList(), partitionData.observers()); -// assertEquals(1, partitionData.currentVoters().size()); -// assertEquals(new DescribeQuorumResponseData.ReplicaState() -// .setReplicaId(localId) -// .setLogEndOffset(leaderEndOffset) -// .setLastFetchTimestamp(time.milliseconds()) -// .setLastCaughtUpTimestamp(time.milliseconds()), -// partitionData.currentVoters().get(0)); -// } -// -// @Test -// public void testDescribeQuorumWithMultipleVoters() { -// MockTime time = new MockTime(); -// int activeFollowerId = 1; -// int inactiveFollowerId = 2; -// long leaderStartOffset = 10L; -// long leaderEndOffset = 15L; -// -// LeaderState state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset); -// assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); -// assertEquals(Optional.empty(), state.highWatermark()); -// -// long activeFollowerFetchTimeMs = time.milliseconds(); -// assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset))); -// assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); -// -// time.sleep(500); -// -// DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); -// assertEquals(leaderEndOffset, partitionData.highWatermark()); -// assertEquals(localId, partitionData.leaderId()); -// assertEquals(epoch, partitionData.leaderEpoch()); -// assertEquals(Collections.emptyList(), partitionData.observers()); -// -// List voterStates = partitionData.currentVoters(); -// assertEquals(3, voterStates.size()); -// -// DescribeQuorumResponseData.ReplicaState leaderState = -// findReplicaOrFail(localId, partitionData.currentVoters()); -// assertEquals(new DescribeQuorumResponseData.ReplicaState() -// .setReplicaId(localId) -// .setLogEndOffset(leaderEndOffset) -// .setLastFetchTimestamp(time.milliseconds()) -// .setLastCaughtUpTimestamp(time.milliseconds()), -// leaderState); -// -// DescribeQuorumResponseData.ReplicaState activeFollowerState = -// findReplicaOrFail(activeFollowerId, partitionData.currentVoters()); -// assertEquals(new DescribeQuorumResponseData.ReplicaState() -// .setReplicaId(activeFollowerId) -// .setLogEndOffset(leaderEndOffset) -// .setLastFetchTimestamp(activeFollowerFetchTimeMs) -// .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs), -// activeFollowerState); -// -// DescribeQuorumResponseData.ReplicaState inactiveFollowerState = -// findReplicaOrFail(inactiveFollowerId, partitionData.currentVoters()); -// assertEquals(new DescribeQuorumResponseData.ReplicaState() -// .setReplicaId(inactiveFollowerId) -// .setLogEndOffset(-1) -// .setLastFetchTimestamp(-1) -// .setLastCaughtUpTimestamp(-1), -// inactiveFollowerState); -// } -// + @Test + public void testDescribeQuorumWithSingleVoter() { + MockTime time = new MockTime(); + long leaderStartOffset = 10L; + long leaderEndOffset = 15L; + + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, leaderStartOffset); + + // Until we have updated local state, high watermark should be uninitialized + assertEquals(Optional.empty(), state.highWatermark()); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(-1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(localId) + .setLogEndOffset(-1) + .setLastFetchTimestamp(time.milliseconds()) + .setLastCaughtUpTimestamp(time.milliseconds()), + partitionData.currentVoters().get(0)); + + + // Now update the high watermark and verify the describe output + assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet)); + assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); + + time.sleep(500); + + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(leaderEndOffset, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(localId) + .setLogEndOffset(leaderEndOffset) + .setLastFetchTimestamp(time.milliseconds()) + .setLastCaughtUpTimestamp(time.milliseconds()), + partitionData.currentVoters().get(0)); + } + + @Test + public void testDescribeQuorumWithMultipleVoters() { + MockTime time = new MockTime(); + int activeFollowerId = 1; + int inactiveFollowerId = 2; + long leaderStartOffset = 10L; + long leaderEndOffset = 15L; + + Set voterSet = mkSet(localId, activeFollowerId, inactiveFollowerId); + LeaderState state = newLeaderState(voterSet, leaderStartOffset); + assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet)); + assertEquals(Optional.empty(), state.highWatermark()); + + long activeFollowerFetchTimeMs = time.milliseconds(); + assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset))); + assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); + + time.sleep(500); + + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(leaderEndOffset, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + + List voterStates = partitionData.currentVoters(); + assertEquals(3, voterStates.size()); + + DescribeQuorumResponseData.ReplicaState leaderState = + findReplicaOrFail(localId, partitionData.currentVoters()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(localId) + .setLogEndOffset(leaderEndOffset) + .setLastFetchTimestamp(time.milliseconds()) + .setLastCaughtUpTimestamp(time.milliseconds()), + leaderState); + + DescribeQuorumResponseData.ReplicaState activeFollowerState = + findReplicaOrFail(activeFollowerId, partitionData.currentVoters()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(activeFollowerId) + .setLogEndOffset(leaderEndOffset) + .setLastFetchTimestamp(activeFollowerFetchTimeMs) + .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs), + activeFollowerState); + + DescribeQuorumResponseData.ReplicaState inactiveFollowerState = + findReplicaOrFail(inactiveFollowerId, partitionData.currentVoters()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(inactiveFollowerId) + .setLogEndOffset(-1) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + inactiveFollowerState); + } + private LeaderState setUpLeaderAndFollowers(int follower1, int follower2, long leaderStartOffset, @@ -481,40 +488,41 @@ private LeaderState setUpLeaderAndFollowers(int follower1, return state; } -// @Test -// public void testDescribeQuorumWithObservers() { -// MockTime time = new MockTime(); -// int observerId = 10; -// long epochStartOffset = 10L; -// -// LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); -// assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1))); -// assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); -// -// time.sleep(500); -// long observerFetchTimeMs = time.milliseconds(); -// assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); -// -// time.sleep(500); -// DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); -// assertEquals(epochStartOffset + 1, partitionData.highWatermark()); -// assertEquals(localId, partitionData.leaderId()); -// assertEquals(epoch, partitionData.leaderEpoch()); -// -// assertEquals(1, partitionData.currentVoters().size()); -// assertEquals(localId, partitionData.currentVoters().get(0).replicaId()); -// -// List observerStates = partitionData.observers(); -// assertEquals(1, observerStates.size()); -// -// DescribeQuorumResponseData.ReplicaState observerState = observerStates.get(0); -// assertEquals(new DescribeQuorumResponseData.ReplicaState() -// .setReplicaId(observerId) -// .setLogEndOffset(epochStartOffset + 1) -// .setLastFetchTimestamp(observerFetchTimeMs) -// .setLastCaughtUpTimestamp(observerFetchTimeMs), -// observerState); -// } + @Test + public void testDescribeQuorumWithObservers() { + MockTime time = new MockTime(); + int observerId = 10; + long epochStartOffset = 10L; + + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, epochStartOffset); + assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet)); + assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); + + time.sleep(500); + long observerFetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); + + time.sleep(500); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(localId, partitionData.currentVoters().get(0).replicaId()); + + List observerStates = partitionData.observers(); + assertEquals(1, observerStates.size()); + + DescribeQuorumResponseData.ReplicaState observerState = observerStates.get(0); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(observerId) + .setLogEndOffset(epochStartOffset + 1) + .setLastFetchTimestamp(observerFetchTimeMs) + .setLastCaughtUpTimestamp(observerFetchTimeMs), + observerState); + } @Test public void testCheckQuorum() { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index d656fddfc32f0..ba701dee89ddb 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -//import org.apache.kafka.raft.LogOffsetMetadata; +import org.apache.kafka.raft.LogOffsetMetadata; import org.apache.kafka.raft.MockQuorumStateStore; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; @@ -32,9 +32,9 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; -//import java.util.Map; +import java.util.Map; import java.util.Collections; -//import java.util.Optional; +import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; @@ -88,98 +88,99 @@ private QuorumState buildQuorumState(VoterSet voterSet, short kraftVersion) { ); } -// @ParameterizedTest -// @ValueSource(shorts = {0, 1}) -// public void shouldRecordVoterQuorumState(short kraftVersion) { -// boolean withDirectoryId = kraftVersion > 0; -// Map voterMap = VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId); -// voterMap.put( -// localId, -// VoterSetTest.voterNode( -// ReplicaKey.of( -// localId, -// withDirectoryId ? Optional.of(localDirectoryId) : Optional.empty() -// ) -// ) -// ); -// QuorumState state = buildQuorumState(VoterSetTest.voterSet(voterMap), kraftVersion); -// -// state.initialize(new OffsetAndEpoch(0L, 0)); -// raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); -// -// assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); -// assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); -// assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); -// assertEquals( -// Uuid.ZERO_UUID.toString(), -// getMetric(metrics, "current-vote-directory-id").metricValue() -// ); -// assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); -// assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.transitionToCandidate(); -// assertEquals("candidate", getMetric(metrics, "current-state").metricValue()); -// assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); -// assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); -// assertEquals( -// localDirectoryId.toString(), -// getMetric(metrics, "current-vote-directory-id").metricValue() -// ); -// assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); -// assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.candidateStateOrThrow().recordGrantedVote(1); -// state.transitionToLeader(2L, accumulator); -// assertEquals("leader", getMetric(metrics, "current-state").metricValue()); -// assertEquals((double) localId, getMetric(metrics, "current-leader").metricValue()); -// assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); -// assertEquals( -// localDirectoryId.toString(), -// getMetric(metrics, "current-vote-directory-id").metricValue() -// ); -// assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); -// assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L)); -// state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); -// assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.transitionToFollower(2, 1); -// assertEquals("follower", getMetric(metrics, "current-state").metricValue()); -// assertEquals((double) 1, getMetric(metrics, "current-leader").metricValue()); -// assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); -// assertEquals( -// Uuid.ZERO_UUID.toString(), -// getMetric(metrics, "current-vote-directory-id").metricValue() -// ); -// assertEquals((double) 2, getMetric(metrics, "current-epoch").metricValue()); -// assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L)); -// assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.transitionToVoted(3, ReplicaKey.of(2, Optional.empty())); -// assertEquals("voted", getMetric(metrics, "current-state").metricValue()); -// assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); -// assertEquals((double) 2, getMetric(metrics, "current-vote").metricValue()); -// assertEquals( -// Uuid.ZERO_UUID.toString(), -// getMetric(metrics, "current-vote-directory-id").metricValue() -// ); -// assertEquals((double) 3, getMetric(metrics, "current-epoch").metricValue()); -// assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); -// -// state.transitionToUnattached(4); -// assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); -// assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); -// assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); -// assertEquals( -// Uuid.ZERO_UUID.toString(), -// getMetric(metrics, "current-vote-directory-id").metricValue() -// ); -// assertEquals((double) 4, getMetric(metrics, "current-epoch").metricValue()); -// assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); -// } + @ParameterizedTest + @ValueSource(shorts = {0, 1}) + public void shouldRecordVoterQuorumState(short kraftVersion) { + boolean withDirectoryId = kraftVersion > 0; + Set voterSet = Utils.mkSet(localId, 1, 2); + Map voterMap = VoterSetTest.voterMap(voterSet, withDirectoryId); + voterMap.put( + localId, + VoterSetTest.voterNode( + ReplicaKey.of( + localId, + withDirectoryId ? Optional.of(localDirectoryId) : Optional.empty() + ) + ) + ); + QuorumState state = buildQuorumState(VoterSetTest.voterSet(voterMap), kraftVersion); + + state.initialize(new OffsetAndEpoch(0L, 0)); + raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); + + assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); + assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); + assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); + assertEquals( + Uuid.ZERO_UUID.toString(), + getMetric(metrics, "current-vote-directory-id").metricValue() + ); + assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); + assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); + + state.transitionToCandidate(); + assertEquals("candidate", getMetric(metrics, "current-state").metricValue()); + assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); + assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); + assertEquals( + localDirectoryId.toString(), + getMetric(metrics, "current-vote-directory-id").metricValue() + ); + assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); + assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); + + state.candidateStateOrThrow().recordGrantedVote(1); + state.transitionToLeader(2L, accumulator); + assertEquals("leader", getMetric(metrics, "current-state").metricValue()); + assertEquals((double) localId, getMetric(metrics, "current-leader").metricValue()); + assertEquals((double) localId, getMetric(metrics, "current-vote").metricValue()); + assertEquals( + localDirectoryId.toString(), + getMetric(metrics, "current-vote-directory-id").metricValue() + ); + assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); + assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); + + state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), voterSet); + state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); + assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); + + state.transitionToFollower(2, 1); + assertEquals("follower", getMetric(metrics, "current-state").metricValue()); + assertEquals((double) 1, getMetric(metrics, "current-leader").metricValue()); + assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); + assertEquals( + Uuid.ZERO_UUID.toString(), + getMetric(metrics, "current-vote-directory-id").metricValue() + ); + assertEquals((double) 2, getMetric(metrics, "current-epoch").metricValue()); + assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); + + state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L)); + assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); + + state.transitionToVoted(3, ReplicaKey.of(2, Optional.empty())); + assertEquals("voted", getMetric(metrics, "current-state").metricValue()); + assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); + assertEquals((double) 2, getMetric(metrics, "current-vote").metricValue()); + assertEquals( + Uuid.ZERO_UUID.toString(), + getMetric(metrics, "current-vote-directory-id").metricValue() + ); + assertEquals((double) 3, getMetric(metrics, "current-epoch").metricValue()); + assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); + + state.transitionToUnattached(4); + assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); + assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); + assertEquals((double) -1, getMetric(metrics, "current-vote").metricValue()); + assertEquals( + Uuid.ZERO_UUID.toString(), + getMetric(metrics, "current-vote-directory-id").metricValue() + ); + assertEquals((double) 4, getMetric(metrics, "current-epoch").metricValue()); + assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); + } @ParameterizedTest @ValueSource(shorts = {0, 1}) From 9acdb0239b83efb4da052056f8b8366724aeb952 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Mon, 3 Jun 2024 14:52:59 -0700 Subject: [PATCH 5/8] test comments --- .../apache/kafka/raft/LeaderStateTest.java | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index d7058eb76aada..74798060488ff 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -288,6 +288,32 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } + @Test + public void testUpdateHighWatermarkAddingFollowerToVoterStates() { + int node1 = 1; + int node2 = 2; + Set originalVoterSet = mkSet(localId, node1); + LeaderState state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.empty(), state.highWatermark()); + + // updating replica state of 2 before it joins voterSet should not increase HW to 15L + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.empty(), state.highWatermark()); + + // adding node2 to voterSet will cause HW to increase to 15L + Set voterSetWithNode2 = mkSet(localId, node1, node2); + assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), voterSetWithNode2)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will not update to 16L until a majority reaches it + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSetWithNode2)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + @Test public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { int node1 = 1; @@ -332,15 +358,15 @@ public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW cannot change until node2 catches up to last HW + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); // HW will not update to 16L until majority of remaining voterSet (node1, node2) are at least 16L - assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); - assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); } From d33f923c1991417103dd80c6df093ad64ae69ea5 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Tue, 4 Jun 2024 09:48:49 -0700 Subject: [PATCH 6/8] more test improvements --- .../test/java/org/apache/kafka/raft/LeaderStateTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 74798060488ff..fb4879483508d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -333,6 +333,10 @@ public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { // HW cannot change until after node2 catches up to last HW assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), voterSetWithoutNode1)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(18L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); @@ -360,6 +364,8 @@ public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { // HW cannot change until node2 catches up to last HW assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), voterSetWithoutLeader)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); From 4eb796186ac5d911e69ab06ea117f732d25feba2 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Tue, 4 Jun 2024 11:35:55 -0700 Subject: [PATCH 7/8] more tests --- .../apache/kafka/raft/LeaderStateTest.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index fb4879483508d..22fbfcf1b80a8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -556,6 +556,110 @@ public void testDescribeQuorumWithObservers() { observerState); } + @Test + public void testDescribeQuorumWithVotersAndObservers() { + MockTime time = new MockTime(); + int leader = localId; + int node1 = 1; + int node2 = 2; + long epochStartOffset = 10L; + + Set voterSet = mkSet(leader, node1, node2); + LeaderState state = newLeaderState(voterSet, epochStartOffset); + assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet)); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(epochStartOffset + 1))); + assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); + + // node1 becomes an observer + long fetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(node1, fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); + Set voterSetWithoutNode1 = mkSet(leader, node2); + state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5), voterSetWithoutNode1); + + + time.sleep(500); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + DescribeQuorumResponseData.ReplicaState observer = partitionData.observers().get(0); + assertEquals(node1, observer.replicaId()); + assertEquals(epochStartOffset + 1, observer.logEndOffset()); + assertEquals(2, partitionData.currentVoters().size()); + + // node1 catches up with leader, HW should not change + time.sleep(500); + fetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(node1, fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 5))); + assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); + + // node1 becomes a voter again, HW should change + assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 10), voterSet)); + + time.sleep(500); + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 5, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + assertEquals(3, partitionData.currentVoters().size()); + DescribeQuorumResponseData.ReplicaState node1State = partitionData.currentVoters().stream() + .filter(replicaState -> replicaState.replicaId() == node1) + .findFirst().get(); + assertEquals(epochStartOffset + 5, node1State.logEndOffset()); + assertEquals(fetchTimeMs, node1State.lastFetchTimestamp()); + } + + @Test + public void testClearInactiveObserversIgnoresLeader() { + MockTime time = new MockTime(); + int followerId = 1; + int observerId = 10; + long epochStartOffset = 10L; + + Set voterSet = mkSet(localId, followerId); + LeaderState state = newLeaderState(voterSet, epochStartOffset); + assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet)); + assertTrue(state.updateReplicaState(followerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1))); + + // observer is returned since its lastFetchTimestamp is within OBSERVER_SESSION_TIMEOUT_MS + time.sleep(500); + long observerFetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); + + time.sleep(500); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(2, partitionData.currentVoters().size()); + assertEquals(1, partitionData.observers().size()); + assertEquals(observerId, partitionData.observers().get(0).replicaId()); + + // observer is not returned once its lastFetchTimestamp surpasses OBSERVER_SESSION_TIMEOUT_MS + time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS); + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(2, partitionData.currentVoters().size()); + assertEquals(0, partitionData.observers().size()); + + // leader becomes observer + Set voterSetWithoutLeader = singleton(followerId); + assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 10), voterSetWithoutLeader)); + + // leader should be returned in describe quorum output + time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS); + long describeQuorumCalledTime = time.milliseconds(); + partitionData = state.describeQuorum(describeQuorumCalledTime); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(1, partitionData.observers().size()); + DescribeQuorumResponseData.ReplicaState observer = partitionData.observers().get(0); + assertEquals(localId, observer.replicaId()); + assertEquals(describeQuorumCalledTime, observer.lastFetchTimestamp()); + } + @Test public void testCheckQuorum() { int node1 = 1; From 5d736245443a486b3a35d4ae83d19e2c99f34c51 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Wed, 5 Jun 2024 10:04:24 -0700 Subject: [PATCH 8/8] comments --- .../apache/kafka/raft/LeaderStateTest.java | 46 ++++++++++++++++--- .../raft/internals/KafkaRaftMetricsTest.java | 5 +- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 22fbfcf1b80a8..6d31154ccf233 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -289,18 +289,18 @@ public void testUpdateHighWatermarkQuorumSizeThree() { } @Test - public void testUpdateHighWatermarkAddingFollowerToVoterStates() { + public void testHighWatermarkDoesIncreaseFromNewVoter() { int node1 = 1; int node2 = 2; Set originalVoterSet = mkSet(localId, node1); - LeaderState state = newLeaderState(originalVoterSet, 10L); + LeaderState state = newLeaderState(originalVoterSet, 5L); assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); - assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); - assertEquals(Optional.empty(), state.highWatermark()); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); - // updating replica state of 2 before it joins voterSet should not increase HW to 15L + // updating replica state of node2 before it joins voterSet should not increase HW to 15L assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); - assertEquals(Optional.empty(), state.highWatermark()); + assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); // adding node2 to voterSet will cause HW to increase to 15L Set voterSetWithNode2 = mkSet(localId, node1, node2); @@ -314,6 +314,40 @@ public void testUpdateHighWatermarkAddingFollowerToVoterStates() { assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); } + @Test + public void testHighWatermarkDoesNotDecreaseFromNewVoter() { + int node1 = 1; + int node2 = 2; + int node3 = 3; + // start with three voters with HW at 15L + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 5L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + + // updating replica state of node3 before it joins voterSet + assertFalse(state.updateReplicaState(node3, 0, new LogOffsetMetadata(10L))); + + // adding node3 to voterSet should not cause HW to decrease even if majority is < HW + Set voterSetWithNode3 = mkSet(localId, node1, node2, node3); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSetWithNode3)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will not decrease if calculated HW is anything lower than the last HW + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(13L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node3, 0, new LogOffsetMetadata(13L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will update to 16L once a majority of the voterSet is at least 16L + assertTrue(state.updateReplicaState(node3, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + @Test public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { int node1 = 1; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 825e199aa98d6..ce59d587b9dd5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.LogOffsetMetadata; import org.apache.kafka.raft.MockQuorumStateStore; import org.apache.kafka.raft.OffsetAndEpoch; @@ -38,7 +37,6 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; -import java.util.Set; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -100,7 +98,6 @@ private VoterSet localStandaloneVoterSet(short kraftVersion) { @ValueSource(shorts = {0, 1}) public void shouldRecordVoterQuorumState(short kraftVersion) { boolean withDirectoryId = kraftVersion > 0; - Set voterSet = Utils.mkSet(localId, 1, 2); Map voterMap = VoterSetTest.voterMap(IntStream.of(1, 2), withDirectoryId); voterMap.put( localId, @@ -150,7 +147,7 @@ public void shouldRecordVoterQuorumState(short kraftVersion) { assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); - state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), voterSet); + state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), voters.voterIds()); state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue());