KAFKA-13888: Addition of Information in DescribeQuorumResponse#12508
KAFKA-13888: Addition of Information in DescribeQuorumResponse#12508hachikuji merged 17 commits intoapache:trunkfrom
Conversation
This commit adds in the implementation for two new fields: * LastFetchTimestamp * LastCaughtUpTimestamp as they are described in KIP-836
| convertToReplicaStates(leaderState.getVoterEndOffsets()), | ||
| convertToReplicaStates(leaderState.getObserverStates(currentTimeMs)) | ||
| convertToReplicaStates(leaderState.getVoterEndOffsets(), | ||
| leaderState.getVoterLastFetchTimes(), |
There was a problem hiding this comment.
I think it would be a bit simpler to push creation of the ReplicaState object into LeaderState:
class LeaderState {
...
List<ReplicaState> voterStates(long currentTimeMs);
List<ReplicaState> observerStates(long currentTimeMs);
}Then we wouldn't need all the boilerplate methods to create all the different Map variations. Also, then the sanity checks in convertToReplicaStates become unnecessary.
| state.updateFetchTimestamp(fetchTimestamp); | ||
| // Update the Last CaughtUp Time | ||
| if (logOffsetMetadata.offset >= leaderLogEndOffset) { | ||
| state.updateLastCaughtUpTimestamp(fetchTimestamp); |
There was a problem hiding this comment.
We do these updates before the call to updateEndOffset, which may raise an exception (same thing for updateFetchTimestamp which is an existing issue). I think it would be better to update state only after we have confirmed the update is valid.
| val quorumInfo = quorumState.quorumInfo.get() | ||
|
|
||
| assertEquals(0, quorumInfo.observers.size) | ||
| assertEquals(3, quorumInfo.voters.size) |
There was a problem hiding this comment.
A more straightforward way to assert the voter/observer sets is in DescribeQuorumRequestTest:
val voterData = partitionData.currentVoters.asScala
assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet);
val observerData = partitionData.observers.asScala
assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet);There was a problem hiding this comment.
That check already exists in the DescribeQuorumRequestTest. The count validation here is just a sanity check.
There was a problem hiding this comment.
We are checking the size here and all of the individual replicaIds below. For example:
assertTrue(-1 < observer.replicaId && 4 > observer.replicaId,
s"Observer ID ${observer.replicaId} was not within expected range.")
I am just offering a more concise and complete way to do that.
There was a problem hiding this comment.
You may have missed my comment above. The nice thing about asserting the set directly is that it ensures that all expected voters/observers are present in the result.
* Some code refactoring * Fixing error handling when updating LeaderState::ReplicaState
| val quorumInfo = quorumState.quorumInfo.get() | ||
|
|
||
| assertEquals(0, quorumInfo.observers.size) | ||
| assertEquals(3, quorumInfo.voters.size) |
There was a problem hiding this comment.
You may have missed my comment above. The nice thing about asserting the set directly is that it ensures that all expected voters/observers are present in the result.
| assertEquals(4, quorumInfo.observers.size) | ||
| assertEquals(3, quorumInfo.voters.size) | ||
| assertEquals(cluster.controllers.asScala.keySet, quorumInfo.voters.asScala.map(_.replicaId).toSet) | ||
| assertTrue(2999 < quorumInfo.leaderId && 3003 > quorumInfo.leaderId, |
There was a problem hiding this comment.
We could make this assertion a little more generic by asserting that quorumInfo.leaderId is contained in cluster.controllers.asScala.keySet.
| private boolean updateEndOffset( | ||
| ReplicaState state, | ||
| LogOffsetMetadata endOffsetMetadata, | ||
| boolean verifyUpdate |
There was a problem hiding this comment.
Why do we need this? Seems like false is the only value we ever pass.
There was a problem hiding this comment.
Wanted to retain the verification step in the method in case we want to use it elsewhere in the future.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, just a few more small comments.
|
Thanks for the comments @hachikuji . Have addressed them and pushed another version. :) |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Just one more nit to fix.
| assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimeMs) | ||
| } | ||
| } catch { | ||
| case t: Throwable => throw t |
There was a problem hiding this comment.
nit: if we're just re-throwing, we don't need to catch
This reverts commit a12013c.
38a0af7 to
d1936ab
Compare
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, the updates LGTM as well. Hopefully we've nailed the cause of the test flakiness.
…tamp` for DescribeQuorumResponse [KIP-836] (#12508) This commit implements the newly added fields `LastFetchTimestamp` and `LastCaughtUpTimestamp` for KIP-836: https://cwiki.apache.org/confluence/display/KAFKA/KIP-836:+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag. Reviewers: Jason Gustafson <jason@confluent.io>
This commit adds in the implementation for two new fields:
as they are described in KIP-836