Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void updateLeaderEndOffsetAndTimestamp(
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();

if (state.updateLocalState(endOffsetMetadata)) {
if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet().voterIds())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}

Expand Down
45 changes: 36 additions & 9 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -243,8 +244,9 @@ private boolean maybeUpdateHighWatermark() {
);
return true;
} else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) {
log.error("The latest computed high watermark {} is smaller than the current " +
"value {}, which suggests that one of the voters has lost committed data. " +
log.info("The latest computed high watermark {} is smaller than the current " +
"value {}, which should only happen when voter set membership changes. If the voter " +
"set has not changed this suggests that one of the voters has lost committed data. " +
"Full voter replication state: {}", highWatermarkUpdateOffset,
currentHighWatermarkMetadata.offset, voterStates.values());
return false;
Expand Down Expand Up @@ -296,10 +298,12 @@ private void logHighWatermarkUpdate(
* Update the local replica state.
*
* @param endOffsetMetadata updated log end offset of local replica
* @param lastVoterSet the up-to-date voter set
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata
LogOffsetMetadata endOffsetMetadata,
Set<Integer> lastVoterSet
) {
ReplicaState state = getOrCreateReplicaState(localId);
state.endOffset.ifPresent(currentEndOffset -> {
Expand All @@ -308,7 +312,8 @@ public boolean updateLocalState(
"end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
}
});
state.updateLeaderState(endOffsetMetadata);
state.updateLeaderEndOffset(endOffsetMetadata);
updateVoterAndObserverStates(lastVoterSet);
return maybeUpdateHighWatermark();
}

Expand Down Expand Up @@ -341,9 +346,7 @@ public boolean updateReplicaState(
state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset);
}
});

Optional<LogOffsetMetadata> leaderEndOffsetOpt =
voterStates.get(localId).endOffset;
Optional<LogOffsetMetadata> leaderEndOffsetOpt = getOrCreateReplicaState(localId).endOffset;

state.updateFollowerState(
currentTimeMs,
Expand Down Expand Up @@ -435,16 +438,40 @@ private DescribeQuorumResponseData.ReplicaState describeReplicaState(

}

/**
* Clear observer states that have not been active for a while and are not the leader.
*/
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
integerReplicaStateEntry.getKey() != localId
);
}

private boolean isVoter(int remoteNodeId) {
return voterStates.containsKey(remoteNodeId);
}

private void updateVoterAndObserverStates(Set<Integer> lastVoterSet) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see we have any unit tests for this. Could we add some to verify the observerState/voterState are changed as expected?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can test this indirectly with making sure observers cannot influence HW (e.g. if we have two voters, two observers, all observers being up-to-date with leader will not cause HW to increase) until they are added back to the voterSet. This ensures nodes are correctly removed/added from the sets

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, actually seems we can test this pretty directly with describeQuorum. I'll add another test.

// Move any replica that is not in the last voter set from voterStates to observerStates
for (Iterator<Map.Entry<Integer, ReplicaState>> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Integer, ReplicaState> replica = iter.next();
if (!lastVoterSet.contains(replica.getKey())) {
observerStates.put(replica.getKey(), replica.getValue());
iter.remove();
}
}

// Add replicas that are in the last voter set and not in voterStates to voterStates (from observerStates
// if they exist)
for (int voterId : lastVoterSet) {
if (!voterStates.containsKey(voterId)) {
Optional<ReplicaState> existingObserverState = Optional.ofNullable(observerStates.remove(voterId));
voterStates.put(voterId, existingObserverState.orElse(new ReplicaState(voterId, false)));
}
}
}

private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
Optional<LogOffsetMetadata> endOffset;
Expand All @@ -462,7 +489,7 @@ public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}

void updateLeaderState(
void updateLeaderEndOffset(
LogOffsetMetadata endOffsetMetadata
) {
// For the leader, we only update the end offset. The remaining fields
Expand Down
Loading