Skip to content

KAFKA-17750: Extend kafka-consumer-groups command line tool to support new consumer group (part 2)#18034

Merged
dajac merged 1 commit intoapache:trunkfrom
FrankYang0529:KAFKA-17750-admin
Dec 10, 2024
Merged

KAFKA-17750: Extend kafka-consumer-groups command line tool to support new consumer group (part 2)#18034
dajac merged 1 commit intoapache:trunkfrom
FrankYang0529:KAFKA-17750-admin

Conversation

@FrankYang0529
Copy link
Copy Markdown
Member

@FrankYang0529 FrankYang0529 commented Dec 4, 2024

  • Add fields groupEpoch and targetAssignmentEpoch to ConsumerGroupDescription.java.
  • Add fields memberEpoch and upgraded to MemberDescription.java.
  • Add assertion to PlaintextAdminIntegrationTest#testDescribeClassicGroups to make sure member in classic group returns upgraded as Optional.empty.
  • Add new case testConsumerGroupWithMemberMigration to PlaintextAdminIntegrationTest to make sure migration member has correct upgraded value. Add assertion for groupEpoch, targetAssignmentEpoch, memberEpoch as well.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the patch. I left a few suggestions and questions for consideration.

Optional.empty(), Optional.empty());
}

public ConsumerGroupDescription(String groupId,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this constructor introduced in 4.0 only? It seems to be the case but we need to double check. If it is not, we would need to introduce a new one in order to not break existing usages.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is introduced in 4.0. It is not existent in 3.9.

public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator) {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, state, coordinator, Collections.emptySet());
}
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, state, coordinator, authorizedOperations);
}
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
GroupType type,
ConsumerGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.state = state;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the two constructors directly, as doing so does not result in any compatibility issues.

private final Optional<Integer> memberEpoch;
private final Optional<Boolean> isClassic;

public MemberDescription(String memberId,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I suppose that we must introduce a new overload of the constructor with the new arguments in order to not break compatibility of existing usages.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminder. Updated it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered deprecating the other constructors? It seems odd to have so many constructors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated both PR and KIP. Thanks for the suggestion.

) {
Optional<MemberAssignment> targetAssignment,
Optional<Integer> memberEpoch,
Optional<Boolean> isClassic) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's keep the closing ) on a new line as it was before.

}

/**
* The flag indicating whether a member is classic.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about The flag indicating whether a member with a consumer group uses the classic protocol. It indicates that the member was perhaps not upgraded?

/**
* The flag indicating whether a member is classic.
*/
public Optional<Boolean> isClassic() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for coming back on this one but I just thought about an alternative. How about calling it nonUpgraded? We could set it to true when the group is a consumer group and the member uses classic protocol. We could also only set it when the group is a consumer group vs a classic group. This may be even more explicit for users. What do you think? @chia7712 Do you have any thoughts too?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling it nonUpgraded? We could set it to true when the group is a consumer group and the member uses classic protocol.

How about using "upgraded"? it is true only if the group is a consumer group and the member uses the consumer protocol.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works too. Something as follow?

  • Classic Group -> Optional.empty
  • Consumer Group -> Optional.empty if unknown
  • Consumer Group -> Optional.of(false) if classic
  • Consumer Group -> Optional.of(true) if consumer

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Updated PR and KIP.

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17750-admin branch 2 times, most recently from bf9ea6b to b61e4b5 Compare December 5, 2024 15:07
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch

Optional.empty(), Optional.empty());
}

public ConsumerGroupDescription(String groupId,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the two constructors directly, as doing so does not result in any compatibility issues.

", targetAssignment=" + targetAssignment + ")";
", targetAssignment=" + targetAssignment +
", memberEpoch=" + memberEpoch.orElse(null) +
", isClassic=" + upgraded.orElse(null) +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isClassic -> upgraded

private final Optional<Integer> memberEpoch;
private final Optional<Boolean> isClassic;

public MemberDescription(String memberId,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered deprecating the other constructors? It seems odd to have so many constructors.

@github-actions github-actions bot added the tools label Dec 6, 2024
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17750-admin branch 2 times, most recently from 1486622 to 9361113 Compare December 6, 2024 07:10
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the update. I left some more comments for consideration.

Comment on lines +60 to +70
public MemberDescription(String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
Optional<MemberAssignment> targetAssignment
) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think that we are mixing two code styles here. We would usually use:

public MemberDescription(
    String memberId,
    Optional<String> groupInstanceId,
    String clientId,
    String host,
    MemberAssignment assignment,
    Optional<MemberAssignment> targetAssignment
) {

or

    public MemberDescription(String memberId,
                             Optional<String> groupInstanceId,
                             String clientId,
                             String host,
                             MemberAssignment assignment,
                             Optional<MemberAssignment> targetAssignment) {

I personally prefer the first one but it is up to you.

Comment on lines +182 to +184
* True for consumer member.
* False for classic member
* Empty for unknown or members in classic group.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I suggest to reference the group type in the java doc in order to be crystal clear. e.g.

The flag indicating whether a member within a {CONSUMER} group uses the {CONSUMER} protocol.
The optional is set to true if it does, to false if it does not, and to empty if it is unknown or if the group
is a {CLASSIC} group.

)))
))),
Optional.of(10),
Optional.of(true)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we cover the unknown and the classic cases in the tests too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add classic case to testSuccessfulHandleConsumerGroupResponse. For unknown case, I think it's covered by testSuccessfulHandleClassicGroupResponse. Thanks.

Comment on lines +1924 to +1925
assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.groupEpoch.isEmpty)
assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.targetAssignmentEpoch.isEmpty)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to actually verify the values too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to verify values here, but the case is only for classic now. It's waiting for https://issues.apache.org/jira/browse/KAFKA-17960. Could we add it after we reopen consumer for this case? Thanks.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a minor PR for this: #18203

Comment on lines +1931 to +1934
members.asScala.foreach(member => {
assertEquals(testClientId, member.clientId)
assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded)
})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: foreach(member => { -> foreach { member =>


val classicMember = group.members.asScala.find(_.clientId == testClassicClientId)
assertTrue(classicMember.isDefined)
assertTrue(classicMember.get.memberEpoch.isPresent && classicMember.get.memberEpoch.get > 0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals(Optional.of(2), classicMember.get.memberEpoch).

val classicMember = group.members.asScala.find(_.clientId == testClassicClientId)
assertTrue(classicMember.isDefined)
assertTrue(classicMember.get.memberEpoch.isPresent && classicMember.get.memberEpoch.get > 0)
assertTrue(classicMember.get.upgraded.isPresent && !classicMember.get.upgraded.get)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals(Optional.of(false), classicMember.get.upgraded).

Comment on lines +2141 to +2142
assertTrue(classicMember.get.memberEpoch.isPresent && classicMember.get.memberEpoch.get > 0)
assertTrue(consumerMember.get.upgraded.isPresent && consumerMember.get.upgraded.get)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

/**
* The epoch of the group member.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's also mention that this is only provided in the CONSUMER group case.

Comment on lines +211 to +227
/**
* The epoch of the consumer group.
*/
public Optional<Integer> groupEpoch() {
return groupEpoch;
}

/**
* The epoch of the target assignment.
*/
public Optional<Integer> targetAssignmentEpoch() {
return targetAssignmentEpoch;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also mention here that they are only provided when the type is consumer.

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17750-admin branch 3 times, most recently from 263c615 to 69bd0f3 Compare December 6, 2024 13:48
…t new consumer group (part 2)

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529
Copy link
Copy Markdown
Member Author

Hi @dajac, could you please help me review this PR when you have time? I would like to keep working on last part of KIP-1099 (tool). Thanks.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks.

@dajac dajac merged commit c8380ae into apache:trunk Dec 10, 2024
@dajac
Copy link
Copy Markdown
Member

dajac commented Dec 10, 2024

@FrankYang0529 Done :)

@FrankYang0529 FrankYang0529 deleted the KAFKA-17750-admin branch December 10, 2024 13:11
@chia7712
Copy link
Copy Markdown
Member

@FrankYang0529 Could you please file a minor to fix the deprecation warning introduced by this PR?

> Task :tools:compileTestJava
Note: /home/chia7712/project/kafka/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

@FrankYang0529
Copy link
Copy Markdown
Member Author

Could you please file a minor to fix the deprecation warning introduced by this PR?

Thanks for finding this. Create a PR for it.

#18139

peterxcli pushed a commit to peterxcli/kafka that referenced this pull request Dec 18, 2024
…t new consumer group (part 2) (apache#18034)

* Add fields `groupEpoch` and `targetAssignmentEpoch` to `ConsumerGroupDescription.java`.
* Add fields `memberEpoch` and `upgraded` to `MemberDescription.java`.
* Add assertion to `PlaintextAdminIntegrationTest#testDescribeClassicGroups` to make sure member in classic group returns `upgraded` as `Optional.empty`.
* Add new case `testConsumerGroupWithMemberMigration` to `PlaintextAdminIntegrationTest` to make sure migration member has correct `upgraded` value. Add assertion for `groupEpoch`, `targetAssignmentEpoch`, `memberEpoch` as well.

Reviewers: David Jacot <djacot@confluent.io>

Signed-off-by: PoAn Yang <payang@apache.org>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…t new consumer group (part 2) (apache#18034)

* Add fields `groupEpoch` and `targetAssignmentEpoch` to `ConsumerGroupDescription.java`.
* Add fields `memberEpoch` and `upgraded` to `MemberDescription.java`.
* Add assertion to `PlaintextAdminIntegrationTest#testDescribeClassicGroups` to make sure member in classic group returns `upgraded` as `Optional.empty`.
* Add new case `testConsumerGroupWithMemberMigration` to `PlaintextAdminIntegrationTest` to make sure migration member has correct `upgraded` value. Add assertion for `groupEpoch`, `targetAssignmentEpoch`, `memberEpoch` as well.

Reviewers: David Jacot <djacot@confluent.io>

Signed-off-by: PoAn Yang <payang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants