Skip to content

MINOR: Clean up group instance id handling in GroupCoordinator#9958

Merged
hachikuji merged 4 commits intoapache:trunkfrom
hachikuji:clean-up-static-member-handling
Feb 26, 2021
Merged

MINOR: Clean up group instance id handling in GroupCoordinator#9958
hachikuji merged 4 commits intoapache:trunkfrom
hachikuji:clean-up-static-member-handling

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji commented Jan 24, 2021

This is a continuation of a refactor started in #9952. The logic in GroupCoordinator is loose and inconsistent in the handling of the groupInstanceId. In some cases, such as in the JoinGroup hander, we verify that the groupInstanceId from the request is mapped to the memberId precisely. In other cases, such as Heartbeat, we check the mapping, but only to validate fencing. The patch consolidates the member validation so that all handlers follow the same logic.

A second problem is that many of the APIs where a groupInstanceId is expected use optional arguments. For example:

def hasStaticMember(groupInstanceId: Option[String]): Boolean

def addStaticMember(groupInstanceId: Option[String], newMemberId: String): Unit

If groupInstanceId is None, then hasStaticMember is guaranteed to return false while addStaticMember raises an IllegalStateException. So the APIs suggest a generality which is not supported and does not make sense.

Finally, the patch attempts to introduce stronger internal invariants inside GroupMetadata. Currently it is possible for an inconsistent groupInstanceId to memberId mapping to exist because we expose separate APIs to modify members and staticMembers. We rely on the caller to ensure this doesn't happen. Similarly, it is possible for a member to be in the pendingMembers set as well as the stable members map. The patch fixes this by consolidating the paths to addition and removal from these collections and adding assertions to ensure that invariants are maintained.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji
Copy link
Copy Markdown
Contributor Author

cc @dajac No hurry, but if you have time, please take a look.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Great refactoring! It really improve the readability of the code. It seems to me that all the invariants are preserved so it looks good overall. I have left a couple of minor comments. I need to make a second pass over it.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala Outdated
@hachikuji hachikuji force-pushed the clean-up-static-member-handling branch from e668563 to 8bef3bb Compare February 22, 2021 23:44
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the updates. I went through the logic again and it looks good to me. I have left few minor comments.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
// get the set of protocols that are commonly supported by all members
val numMembers = members.size
supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
supportedProtocols.filter(_._2 == numMembers).keys.toSet
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we directly use keysSet instead of .keys.toSet?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this, but I think we still need the toSet because we are converting to the immutable Set type.

def remove(memberId: String): Unit = {
members.remove(memberId).foreach { member =>
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
member.supportedProtocols.foreach { case (protocol, _) => supportedProtocols(protocol) -= 1 }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have noticed that we update supportedProtocols in multiple places with the exact same line (modulo +/-). I think that we should consolidate them into one or two helper methods. We can address this separately though. I just wanted to point that out.

Copy link
Copy Markdown
Contributor Author

@hachikuji hachikuji Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. I tried a few options for this, but I could not find one I felt satisfied with. If you have a good suggestion, I'm happy to give it a shot here or to help review if you want to do a separate PR.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hachikuji hachikuji merged commit 5dae364 into apache:trunk Feb 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants