Skip to content

KAFKA-7610; Proactively timeout new group members if rebalance is delayed#5962

Merged
guozhangwang merged 2 commits intoapache:trunkfrom
hachikuji:KAFKA-7610-STATIC-TIMEOUT
Dec 10, 2018
Merged

KAFKA-7610; Proactively timeout new group members if rebalance is delayed#5962
guozhangwang merged 2 commits intoapache:trunkfrom
hachikuji:KAFKA-7610-STATIC-TIMEOUT

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.

This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is scheduled by an executor, right? I just want to make sure this test can't be flaky

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tasks run by MockTimer just run in the foreground. Execution should be deterministic since we rely on MockTime under the covers.

Copy link
Copy Markdown
Contributor

@stanislavkozlovski stanislavkozlovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Since this is an internal stability improvement, I'm wondering if it will be worth it to backport it to some older versions as well? I'm not sure how we decide what gets backported in Kafka

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we increase max session timeout here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this is just a test case. I needed a more reasonable value in order to verify the behavior in this patch which depends on a static timeout.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Dec 8, 2018

cc @mumrah

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor comment, otherwise LGTM.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just call invokeJoinCallback which does other things like setting the callback to null, decrementing numMembersAwaitingJoin as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I invoked the callback directly is that the member has already been removed from the group. I think that's probably why I didn't both setting the callback to null as well. I think we can just change the callback first. Then all paths go through invokeJoinCallback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thumb up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. in GroupCoordinator code after we've triggered the callback, we actually did not set it to null. Maybe this does not affect the correctness of the logic but I'm a bit concerned it is vulnerable to bugs in the future, maybe we can just remove the callback after triggers it above (see my other comment)?

@hachikuji hachikuji force-pushed the KAFKA-7610-STATIC-TIMEOUT branch from cb752ae to 259f706 Compare December 10, 2018 18:22
@guozhangwang guozhangwang merged commit 20069b3 into apache:trunk Dec 10, 2018
guozhangwang pushed a commit that referenced this pull request Dec 11, 2018
…ayed (#5962)

When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.

This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

Also cherry-picked to 2.1

// timeout during a long rebalance), they may simply retry which will lead to a lot of defunct
// members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests
// for new members. If the new member is still there, we expect it to retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we define a new NewMemberJoinTimeoutMs instead of using the member's brought-in session timeout?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated above, for new members they do not have member ids and cannot start sending heartbeats, so session timeout would not matter here. Thus we need a separate value just for this purpose.

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…ayed (apache#5962)

When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.

This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants