KAFKA-9232: Coordinator new member heartbeat completion does not work for JoinGroup v3#7753
Conversation
| def shouldKeepAlive(deadlineMs: Long): Boolean = { | ||
| if (isAwaitingJoin) | ||
| !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs | ||
| else awaitingSyncCallback != null || |
There was a problem hiding this comment.
replaced awaitingSyncCallback != null with isAwaitingSync
| if (isAwaitingJoin) | ||
| !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs | ||
| else awaitingSyncCallback != null || | ||
| else if (isNew || isAwaitingSync) |
There was a problem hiding this comment.
In onCompleteJoin, we call
- group.maybeInvokeJoinCallback(member, joinResult)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- member.isNew = false
In 1. we ultimately set member.isAwaitingJoin to false, so we never reach the first branch in this case.
I refactored this a bit to reflect what (I think) makes sense: we want to always return true in the special cases of a) awaiting Sync, and b) between awaiting the Join and awaiting the Sync, ie, when isNew = true -- otherwise, we fall back to the usual case, ie, just return whether we've seen a heartbeat within the deadline or not
There was a problem hiding this comment.
Alternatively we could leave this method alone and change the ordering in onCompleteJoin to
member.isNew = false
completeAndScheduleNextHeartbeatExpiration(group, member)
group.maybeInvokeJoinCallback(member, joinResult)
I think this also makes sense, and might be a bit more clear, but I'd want to verify that it's ok to move the heartbeat completion to before the group.maybeInvokeJoinCallback
There was a problem hiding this comment.
I'm for simplifying if possible. What I am thinking is the following:
if (isNew)
// New members are expired after the static join timeout
latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
else if (isAwaitingJoin || isAwaitingSync)
// Don't remove members as long as they have a request in purgatory
true
else
// Otherwise check for session expiration
latestHeartbeat + sessionTimeoutMs > deadlineMs
Then I think this works with the reordering you suggested above:
member.isNew = false
completeAndScheduleNextHeartbeatExpiration(group, member)
group.maybeInvokeJoinCallback(member, joinResult)
Upon rebalance completion, the first check will fail since we set isNew to false and the second check will succeed because we haven't cleared the join callback yet. What do you think?
There was a problem hiding this comment.
Ok yeah I like that, the shouldKeepAlive logic is much easier to follow
|
@ableegoldman do you know when we regressed? |
|
@hachikuji updated the PR, let me know if we want to test this beyond just the usual unit/integration tests or if it's good as is |
|
@ableegoldman I'd be happy with unit tests. |
This reverts commit 74a9fe9.
| !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs | ||
| else awaitingSyncCallback != null || | ||
| if (isNew) | ||
| // New members are expired after the static join timeout |
There was a problem hiding this comment.
nit: can we fix the indentation? i think braces are fine
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, looks good. Left some small suggestions on the test case.
| @Test | ||
| def testCompleteNewMemberTimeoutHeartbeat(): Unit = { | ||
| // New member joins the group | ||
| var responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, None, DefaultSessionTimeout, DefaultRebalanceTimeout, false) |
| val joinResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS)) | ||
| val group = groupCoordinator.groupManager.getGroup(groupId).get | ||
| assertEquals(Errors.NONE, joinResult.error) | ||
| assertEquals(0, group.allMemberMetadata.count(_.isNew)) |
There was a problem hiding this comment.
Might be worth asserting latestHeartbeat as well?
| assertEquals(0, group.allMemberMetadata.count(_.isNew)) | ||
|
|
||
| // Make sure the delayed heartbeat based on the new member timeout has been completed | ||
| assertEquals(1, groupCoordinator.heartbeatPurgatory.numDelayed) |
There was a problem hiding this comment.
If we wanted to go a tad further, we could advance the clock by the session timeout and assert the member gets kicked out. That ensures that the heartbeat left behind has the right timeout.
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the fix!
|
retest this please |
2 similar comments
|
retest this please |
|
retest this please |
…p v3 and below (#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson <jason@confluent.io>
…p v3 and below (#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson <jason@confluent.io>
…p v3 and below (#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson <jason@confluent.io>
…p v3 and below (#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson <jason@confluent.io>
…p v3 and below (apache#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson <jason@confluent.io>
…p v3 and below (apache#7753) The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout if the member does not rejoin. The core issue is in the `shouldKeepAlive` method, which returns false when it should return true because of an inconsistent timeout check. Reviewers: Jason Gustafson <jason@confluent.io>
The v3 JoinGroup logic does not properly complete the initial heartbeat for new members, which then expires after the static 5 minute timeout. The core issue is in the
shouldKeepAlivemethod, which returns false when it should return true.