Skip to content

KAFKA-7862 & KIP-345 part-1: Add static membership logic to JoinGroup protocol#6177

Merged
guozhangwang merged 20 commits intoapache:trunkfrom
abbccdda:345_merge
Apr 26, 2019
Merged

KAFKA-7862 & KIP-345 part-1: Add static membership logic to JoinGroup protocol#6177
guozhangwang merged 20 commits intoapache:trunkfrom
abbccdda:345_merge

Conversation

@abbccdda
Copy link
Copy Markdown

@abbccdda abbccdda commented Jan 20, 2019

This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains:

  1. Add group.instance.id to be unique identifier for consumer instances, provided by end user;
  2. Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability.
  3. Add client side support for incorporating static membership changes, including new config for group.instance.id, apply stream thread client id by default, and new join group exception handling.
  4. Remove internal.leave.on.close config by checking whether group.instance.id is defined. Effectively speaking, only dynamic member will send LeaveGroupRequest while static membership expiration is only controlled through session timeout.
  5. Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance.
  6. Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like:
    6.1 Dynamic/Static member
    6.2 Known/Unknown member id
    6.3 Group stable/unstable
    6.4 Leader/Follower

The hope here is to merge this logic before 2.2 code freeze so that we (as Pinterest) could start experimenting on the core logic ASAP.

The rest of the 345 change will be broken down to 4 separate diffs:

  1. Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout.
  2. Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc.
  3. Admin client changes to add ability to batch remove static members
  4. Deprecate group.initial.rebalance.delay

Let me know your thoughts @guozhangwang @hachikuji @stanislavkozlovski @MayureshGharat @kkonstantine @lindong28 @Ishiihara @shawnsnguyen , thanks!

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abbccdda abbccdda force-pushed the 345_merge branch 7 times, most recently from 24b5ac9 to 64c5d94 Compare January 22, 2019 03:59
Copy link
Copy Markdown
Contributor

@stanislavkozlovski stanislavkozlovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gave some of the code a short skim. I'll check it again later.

Let me just say thanks for driving this change - this is a great improvement and obviously requires a lot of effort to implement and test.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Errors.java Outdated
@abbccdda abbccdda changed the title KAFKA-7018 & KIP-345 part-one: Add static membership logic to JoinGroup protocol KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol Jan 23, 2019
@abbccdda
Copy link
Copy Markdown
Author

@guozhangwang @hachikuji Call for review folks!

Copy link
Copy Markdown
Contributor

@stanislavkozlovski stanislavkozlovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this PR a better pass now. I think this is sound functionality-wise. I left a couple of small comments about tests and style but otherwise LGTM!
Thanks a lot for the work on this @abbccdda! The tests are very exhaustive :)

*/
package org.apache.kafka.common.errors;

public class MemberIdMismatchException extends ApiException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a javadoc explaining when we expect to receive this exception and what could cause it?

My understanding is that a consumer that was part of the group used a group.instance.id-member.id pair and later that pair got updated with a new member.id by another consumer?

That is what I understand as a possibility from the explanation in the KIP:

For join group requests under static membership (with group.instance.id set),

If the member.id uses UNKNOWN_MEMBER_ID, we shall always generate a new member id and replace the one within current map. We also expect that after KIP-394, all the join group requests are requiring member.id to physically enter the consumer group, so the behavior of static member is consistent with that proposal.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Stanis, once I started the implementation, I realized that it's more clear to use the current member.id for static members instead of generating a new one, since we need member.id to track heartbeat & stuffs. I will update the KIP to reflect this change.

As for the MemberIdMismatchException, I put the explanation within Errors.java as error message to feedback end user.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abbccdda - judging by GroupMetadata#replace(), the current behavior is to generate a new member ID and return it, right?
What happens if a misconfigured consumer joins with an existing, duplicate consumer.instance.id? It essentially kicks out the old consumer using it (by invalidating its member.id)? Does the old consumer try to rejoin with its group.instance.id afterwards? We could get into a bad loop if that is the case

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a very good question. Previously my thought was to use conflict member.id to shut down duplicate consumer instances. However, this probably won’t work because upon receiving UNKNOWN_MEMBER_ID exception in either SyncGroup, Heartbeat, OffsetCommit requests will immediately reset the generation info which includes the member.id. One approach I could think of is to restrict the caller of resetGeneration on client side to only JoinGroup logic, which means for any other types of requests after receiving UNKNOWN_MEMBER_ID will be rejoining the group with their current generation info (the conflicting member.id). This should be able to help us detect the id collision and shut down duplicate member with MEMBER_ID_MISMATCH exception.

@stanislavkozlovski @hachikuji @guozhangwang Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, who would receive the UNKNOWN_MEMBER_ID?

If consumer A has member.id=1, instance.id=one and consumer B joins with member.id=2, instance.id=one, wouldn't A receive MEMBER_ID_MISMATCH and shut down?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stanislavkozlovski this won't happen automatically. The flow is like:

  1. consumer A with member.id=1, instance.id=one is working under stable group. The static member metadata map contains kv entry one=1.
  2. consumer B starts up, joining with same instance.id=one and member.id=unknown
  3. consumer B enters doUnknownJoinGroup block and successfully gets identity member.id=2. The static member metadata map now updates to one=2
  4. consumer A gets fenced by either SyncGroup, Heartbeat, OffsetCommit which informs A with UNKNOWN_MEMBER_ID error, which will trigger resetGeneration() on client side AbstractCoordinator.
  5. Now consumer A rejoins with instance.id=one and member.id=unknown, repeating step 2 like B.

So eventually A, B will bounce forever within the loop 2~5 unless one of them refuses to reset their assigned member.id. Otherwise MEMBER_ID_MISMATCH shall never trigger.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, yeah. We can only raise MEMBER_ID_MISMATCH in the JoinGroup request because that's the only request that has the group instance id field, right?

As you proposed, I think making the consumer issue a new JoinGroup with the same member.id would be the better approach. Otherwise, we'd probably need to add the new field to all the requests. The old functionality of resetting the generation should continue to work just fine, we'd just be adding an extra hop.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think restricting resetGeneration to only JoinGroup request is not the best approach since we do rely on, e.g. heartbeat response to notify consumers as early as possible. On the other hand, this issue would only raise if users mis configure their instance.id to have two running instances to have the same id, such issue is similar to producer client that two instances mistakenly configured with the same transactional.id and today it is handled by letting one of them to receive a fatal error (fenced) and either handle it themselves or die hard -- the bottom line is, brokers would not need to be responsible for abstracting such human errors from clients.

So I'd like to present an alternative proposal:

  1. when receiving a join group of null member.id, but existing instance.id, create a new member.id just instead of returning the associated member.id to the client (your PR already did this anyways)

  2. when receiving a join group of non-empty member.id, and existing instance.id, BUT is inconsistent with the static members map, return error MEMBER_ID_REQUIRED.

Now the only issue is what if two instances come with the same instance.id and the same member.id. I think it would not be possible for new members due to 1) since we always generate a new member.id.


EDIT: after thinking about this and discussing with @abbccdda a bit more, I am now inclined towards the original proposal now, i.e. for all responses other than join-group request, we let it client to not reset generation / member-id immediately, but try to re-join the group again. This logic is simpler because:

  1. for static members, not reseting the member-id and re-join, will then result in an fatal member-id-mismatch, and hence we can avoid the ping-pong scenario of two mis-configured clients keep kicking each other out by reseting the member id and re-join.

  2. for dynamic members, not resetting the member-id and then rejoin will likely to get the same unknown-member-id again, and then it can reset generation. The cons is that this requires one more round-trip. But to me, simpler logic that does not require much complexity worth the cost, compared to my proposal above that special handles static and dynamic members on client side much more.

  3. Moreover, as we move on to KIP-429 which will assume the assignors to be "sticky" somehow anyways, so even if somehow the member-id is still recognized by the group-coordinator when re-joining and the member happen to be the leader, this unnecessary rebalance triggered will be cheap.

cc @stanislavkozlovski @hachikuji

Comment thread clients/src/main/java/org/apache/kafka/common/protocol/Errors.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala Outdated
@abbccdda abbccdda force-pushed the 345_merge branch 4 times, most recently from 388049b to 54de121 Compare January 27, 2019 05:08
@abbccdda
Copy link
Copy Markdown
Author

Thanks Stanis for the great suggestions. Could I get some further review? @hachikuji @stanislavkozlovski @guozhangwang ?

Copy link
Copy Markdown
Contributor

@Ishiihara Ishiihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The high level logic looks good to me. I left some minor comments to clarify my understanding.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
val shouldRebalance = group.isLeader(member.memberId) || !member.matches(protocols) || !group.is(Stable)
if (shouldRebalance) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add comments to the case when the member doe not have a valid protocol, why do we want to force rebalance? Are we handling the case of rolling upgrades?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually a change of protocol indicates that the group needs to use a different strategy to allocate topic partitions. Current logic is to trigger rebalance anyway to find a common agreed strategy for all current members. This diff doesn't change this part of the logic, however this is a good thing to discuss in a separate JIRA!

Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented Feb 1, 2019

Pinging for more reviews @hachikuji @stanislavkozlovski @guozhangwang thanks a lot!

@abbccdda abbccdda force-pushed the 345_merge branch 2 times, most recently from 7d7ae3c to 4a9c479 Compare February 4, 2019 03:47
@stanislavkozlovski
Copy link
Copy Markdown
Contributor

stanislavkozlovski commented Feb 4, 2019

@abbccdda for future reference, could we not force-push changes after review comments? We will squash everything in the end anyway. It makes subsequent reviews much easier

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented Feb 4, 2019

@stanislavkozlovski Sounds good, I will try to see if I could work around that when I rebase to trunk.

@hachikuji hachikuji self-assigned this Feb 20, 2019
@abbccdda abbccdda force-pushed the 345_merge branch 2 times, most recently from 5516658 to fb50b51 Compare February 21, 2019 18:53
@abbccdda
Copy link
Copy Markdown
Author

Retest this please

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few initial comments. I'll try to look over the coordinator logic later this week.

Comment thread checkstyle/suppressions.xml Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java Outdated
@guozhangwang
Copy link
Copy Markdown
Contributor

I've taken another look at the added commits, and besides this minor edb7ba4#r278739994 it lgtm.

@abbccdda
Copy link
Copy Markdown
Author

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2540/ This shows that the system tests are fixed now.

@guozhangwang
Copy link
Copy Markdown
Contributor

Triggered https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2540/parameters/ (345_system_test is on top of 345_merge) and passed.

@abbccdda
Copy link
Copy Markdown
Author

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2541/ triggered and passed exactly on top of current 345_merge branch.

@abbccdda
Copy link
Copy Markdown
Author

Retest this please

@guozhangwang guozhangwang merged commit 0f995ba into apache:trunk Apr 26, 2019
@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks for the great patience and great work on the first step towards KIP-345, @abbccdda.

dhruvilshah3 added a commit to confluentinc/kafka that referenced this pull request Apr 29, 2019
* ak/trunk: (42 commits)
  KAFKA-8134: `linger.ms` must be a long
  KAFKA-7779; Avoid unnecessary loop iteration in leastLoadedNode (apache#6081)
  MINOR: Update Gradle to 5.4.1 and update its plugins  (apache#6436)
  MINOR: improve Session expiration notice (apache#6618)
  KAFKA-8029: In memory session store (apache#6525)
  MINOR: In-memory stores cleanup (apache#6595)
  KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (apache#6177)
  KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (apache#6602)
  KAFKA-7903: automatically generate OffsetCommitRequest (apache#6583)
  KAFKA-8291 : System test fix (apache#6637)
  MINOR: Do not log retriable offset commit exceptions as errors (apache#5904)
  MINOR: Fix log message error of loadTransactionMetadata (apache#6571)
  MINOR: Fix 404 security features links (apache#6634)
  MINOR: Remove an unnecessary character from broker's startup log
  MINOR: Make LogCleaner.shouldRetainRecord more readable (apache#6590)
  MINOR: Remove implicit return statement (apache#6629)
  KAFKA-8237; Untangle TopicDeleteManager and add test cases (apache#6588)
  KAFKA-8227 DOCS Fixed missing links duality of streams tables (apache#6625)
  MINOR: reformat settings.gradle to be more readable (apache#6621)
  MINOR: Correct RestServerTest formatting
  ...

 Conflicts:
	build.gradle
	settings.gradle
@abbccdda abbccdda changed the title KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol KAFKA-7862 & KIP-345 part-1: Add static membership logic to JoinGroup protocol May 13, 2019
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…up protocol (apache#6177)

This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains:

* Add group.instance.id to be unique identifier for consumer instances, provided by end user;
Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability.
* Add client side support for incorporating static membership changes, including new config for group.instance.id, apply stream thread client id by default, and new join group exception handling.
* Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance.
* Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like:
6.1 Dynamic/Static member
6.2 Known/Unknown member id
6.3 Group stable/unstable
6.4 Leader/Follower

The rest of the 345 change will be broken down to 4 separate diffs:

* Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout.
* Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc.
* Admin client changes to add ability to batch remove static members
* Deprecate group.initial.rebalance.delay

Reviewers: Liquan Pei <liquanpei@gmail.com>, Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants