KAFKA-14500; [2/N] Rewrite GroupMetadata in Java#13663
Conversation
dajac
left a comment
There was a problem hiding this comment.
@jeffkbkim Thanks for the patch. I made a first pass on it and left some small comments.
|
|
||
| @BeforeEach | ||
| public void initialize() { | ||
| group = new GenericGroup(new LogContext(), "groupId", Empty, Time.SYSTEM); |
There was a problem hiding this comment.
nit: I usually prefer to avoid global variable like this in tests but I leave it up to you.
There was a problem hiding this comment.
i'll keep this as is since this variable is used for all tests.
| private GenericGroupState state; | ||
|
|
||
| /** | ||
| * The timestamp of when the group transitioned |
There was a problem hiding this comment.
this looks grammatically correct
| private final Map<String, String> staticMembers = new HashMap<>(); | ||
|
|
||
| /** | ||
| * Members who have yet to (re)join the group |
There was a problem hiding this comment.
this looks grammatically correct
dajac
left a comment
There was a problem hiding this comment.
@jeffkbkim Thanks for the update. I left some comments for consideration.
| public boolean isGenericGroup() { | ||
| return protocolType.map(type -> | ||
| type.equals(ConsumerProtocol.PROTOCOL_TYPE) | ||
| ).orElse(false); | ||
| } |
There was a problem hiding this comment.
Was this method in the old implementation? The name is a bit weird because a generic group could use different protocol types (e.g. connect). Should it be named useConsumerGroupProtocol?
There was a problem hiding this comment.
got it. this is from
def isConsumerGroup: Boolean = protocolType.contains(ConsumerProtocol.PROTOCOL_TYPE)
the naming is confusing because we both consumer and generic groups but a generic group can expect a group using the consumer protocol
| * The timestamp of when the group transitioned | ||
| * to its current state. | ||
| */ | ||
| private Optional<Long> currentStateTimestamp; |
There was a problem hiding this comment.
Is this necessary to be an Optional? As far as I see we immediately define it in the constructor and we never set it equal to something which is empty. Am I missing something obvious?
There was a problem hiding this comment.
This is a bit awkward as the existing GroupMetadata updates this field when we read the group metadata record (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1225).
So we should expect the new group metadata manager introduced in #13639 to perform this.
There was a problem hiding this comment.
Sorry, I do not understand. This field is currently private so the only way we can set it now is either in the constructor or via a setter. Do you mean that in the near future either we will make this field public or we will add a setter which will then be called by the GroupMetadataManager to possibly set this to an empty Optional? If so, then okay, this makes sense.
Rewrites GroupMetadata as GenericGroup that will be used with the new group coordinator. Offset related fields, classes, and methods will not be included as they will be reworked as a separate offset manager component.
Committer Checklist (excluded from commit message)