From 9c3758879270f44ad432f5c6c15e153f0bb6600d Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 9 Dec 2024 21:39:46 +0800 Subject: [PATCH] KAFKA-17750: Extend kafka-consumer-groups command line tool to support new consumer group (part 2) Signed-off-by: PoAn Yang --- .../admin/ConsumerGroupDescription.java | 63 +++++++----- .../clients/admin/MemberDescription.java | 75 +++++++++++++- .../DescribeClassicGroupsHandler.java | 5 +- .../DescribeConsumerGroupsHandler.java | 17 +++- .../clients/admin/KafkaAdminClientTest.java | 23 ++++- .../clients/admin/MemberDescriptionTest.java | 33 +++++++ .../DescribeConsumerGroupsHandlerTest.java | 90 ++++++++++++----- .../api/PlaintextAdminIntegrationTest.scala | 99 ++++++++++++++++++- .../group/ConsumerGroupServiceTest.java | 13 ++- 9 files changed, 347 insertions(+), 71 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 4cbc5b4b43bb3..dd1b4b4cb5c7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -42,9 +43,11 @@ public class ConsumerGroupDescription { private final GroupState groupState; private final Node coordinator; private final Set authorizedOperations; + private final Optional groupEpoch; + private final Optional targetAssignmentEpoch; /** - * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node)}. + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}. */ @Deprecated public ConsumerGroupDescription(String groupId, @@ -57,7 +60,7 @@ public ConsumerGroupDescription(String groupId, } /** - * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node, Set)}. + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}. */ @Deprecated public ConsumerGroupDescription(String groupId, @@ -71,7 +74,7 @@ public ConsumerGroupDescription(String groupId, } /** - * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set)}. + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}. */ @Deprecated public ConsumerGroupDescription(String groupId, @@ -90,25 +93,8 @@ public ConsumerGroupDescription(String groupId, this.groupState = GroupState.parse(state.name()); this.coordinator = coordinator; this.authorizedOperations = authorizedOperations; - } - - public ConsumerGroupDescription(String groupId, - boolean isSimpleConsumerGroup, - Collection members, - String partitionAssignor, - GroupState groupState, - Node coordinator) { - this(groupId, isSimpleConsumerGroup, members, partitionAssignor, groupState, coordinator, Collections.emptySet()); - } - - public ConsumerGroupDescription(String groupId, - boolean isSimpleConsumerGroup, - Collection members, - String partitionAssignor, - GroupState groupState, - Node coordinator, - Set authorizedOperations) { - this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, groupState, coordinator, authorizedOperations); + this.groupEpoch = Optional.empty(); + this.targetAssignmentEpoch = Optional.empty(); } public ConsumerGroupDescription(String groupId, @@ -118,7 +104,9 @@ public ConsumerGroupDescription(String groupId, GroupType type, GroupState groupState, Node coordinator, - Set authorizedOperations) { + Set authorizedOperations, + Optional groupEpoch, + Optional targetAssignmentEpoch) { this.groupId = groupId == null ? "" : groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; this.members = members == null ? Collections.emptyList() : List.copyOf(members); @@ -127,6 +115,8 @@ public ConsumerGroupDescription(String groupId, this.groupState = groupState; this.coordinator = coordinator; this.authorizedOperations = authorizedOperations; + this.groupEpoch = groupEpoch; + this.targetAssignmentEpoch = targetAssignmentEpoch; } @Override @@ -141,12 +131,15 @@ public boolean equals(final Object o) { type == that.type && groupState == that.groupState && Objects.equals(coordinator, that.coordinator) && - Objects.equals(authorizedOperations, that.authorizedOperations); + Objects.equals(authorizedOperations, that.authorizedOperations) && + Objects.equals(groupEpoch, that.groupEpoch) && + Objects.equals(targetAssignmentEpoch, that.targetAssignmentEpoch); } @Override public int hashCode() { - return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, authorizedOperations); + return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, + authorizedOperations, groupEpoch, targetAssignmentEpoch); } /** @@ -215,6 +208,24 @@ public Set authorizedOperations() { return authorizedOperations; } + /** + * The epoch of the consumer group. + * The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it + * is a {@link GroupType#CLASSIC} group. + */ + public Optional groupEpoch() { + return groupEpoch; + } + + /** + * The epoch of the target assignment. + * The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it + * is a {@link GroupType#CLASSIC} group. + */ + public Optional targetAssignmentEpoch() { + return targetAssignmentEpoch; + } + @Override public String toString() { return "(groupId=" + groupId + @@ -225,6 +236,8 @@ public String toString() { ", groupState=" + groupState + ", coordinator=" + coordinator + ", authorizedOperations=" + authorizedOperations + + ", groupEpoch=" + groupEpoch.orElse(null) + + ", targetAssignmentEpoch=" + targetAssignmentEpoch.orElse(null) + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index 5ca7dba86f8f4..0785f2e67155f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.GroupType; + import java.util.Collections; import java.util.Objects; import java.util.Optional; @@ -30,13 +32,18 @@ public class MemberDescription { private final String host; private final MemberAssignment assignment; private final Optional targetAssignment; + private final Optional memberEpoch; + private final Optional upgraded; - public MemberDescription(String memberId, + public MemberDescription( + String memberId, Optional groupInstanceId, String clientId, String host, MemberAssignment assignment, - Optional targetAssignment + Optional targetAssignment, + Optional memberEpoch, + Optional upgraded ) { this.memberId = memberId == null ? "" : memberId; this.groupInstanceId = groupInstanceId; @@ -45,8 +52,38 @@ public MemberDescription(String memberId, this.assignment = assignment == null ? new MemberAssignment(Collections.emptySet()) : assignment; this.targetAssignment = targetAssignment; + this.memberEpoch = memberEpoch; + this.upgraded = upgraded; } + /** + * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. + */ + @Deprecated + public MemberDescription( + String memberId, + Optional groupInstanceId, + String clientId, + String host, + MemberAssignment assignment, + Optional targetAssignment + ) { + this( + memberId, + groupInstanceId, + clientId, + host, + assignment, + targetAssignment, + Optional.empty(), + Optional.empty() + ); + } + + /** + * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. + */ + @Deprecated public MemberDescription( String memberId, Optional groupInstanceId, @@ -64,6 +101,10 @@ public MemberDescription( ); } + /** + * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. + */ + @Deprecated public MemberDescription(String memberId, String clientId, String host, @@ -81,12 +122,14 @@ public boolean equals(Object o) { clientId.equals(that.clientId) && host.equals(that.host) && assignment.equals(that.assignment) && - targetAssignment.equals(that.targetAssignment); + targetAssignment.equals(that.targetAssignment) && + memberEpoch.equals(that.memberEpoch) && + upgraded.equals(that.upgraded); } @Override public int hashCode() { - return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment); + return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment, memberEpoch, upgraded); } /** @@ -131,6 +174,25 @@ public Optional targetAssignment() { return targetAssignment; } + /** + * The epoch of the group member. + * The optional is set to an integer if the member is in a {@link GroupType#CONSUMER} group, and to empty if it + * is in a {@link GroupType#CLASSIC} group. + */ + public Optional memberEpoch() { + return memberEpoch; + } + + /** + * The flag indicating whether a member within a {@link GroupType#CONSUMER} group uses the + * {@link GroupType#CONSUMER} protocol. + * The optional is set to true if it does, to false if it does not, and to empty if it is unknown or if the group + * is a {@link GroupType#CLASSIC} group. + */ + public Optional upgraded() { + return upgraded; + } + @Override public String toString() { return "(memberId=" + memberId + @@ -138,6 +200,9 @@ public String toString() { ", clientId=" + clientId + ", host=" + host + ", assignment=" + assignment + - ", targetAssignment=" + targetAssignment + ")"; + ", targetAssignment=" + targetAssignment + + ", memberEpoch=" + memberEpoch.orElse(null) + + ", upgraded=" + upgraded.orElse(null) + + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java index 77c04c5d5f02e..686ee43a44b2b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java @@ -136,7 +136,10 @@ public ApiResult handleResponse( Optional.ofNullable(groupMember.groupInstanceId()), groupMember.clientId(), groupMember.clientHost(), - new MemberAssignment(partitions))); + new MemberAssignment(partitions), + Optional.empty(), + Optional.empty(), + Optional.empty())); }); final ClassicGroupDescription classicGroupDescription = diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 1d911e2f0c7f4..457675e92675a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -222,7 +222,9 @@ private ApiResult handledConsumerGroup groupMember.clientId(), groupMember.clientHost(), new MemberAssignment(convertAssignment(groupMember.assignment())), - Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))) + Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))), + Optional.of(groupMember.memberEpoch()), + groupMember.memberType() == -1 ? Optional.empty() : Optional.of(groupMember.memberType() == 1) )) ); @@ -235,7 +237,9 @@ private ApiResult handledConsumerGroup GroupType.CONSUMER, GroupState.parse(describedGroup.groupState()), coordinator, - authorizedOperations + authorizedOperations, + Optional.of(describedGroup.groupEpoch()), + Optional.of(describedGroup.assignmentEpoch()) ); completed.put(groupIdKey, consumerGroupDescription); } @@ -281,7 +285,10 @@ private ApiResult handledClassicGroupR Optional.ofNullable(groupMember.groupInstanceId()), groupMember.clientId(), groupMember.clientHost(), - new MemberAssignment(partitions))); + new MemberAssignment(partitions), + Optional.empty(), + Optional.empty(), + Optional.empty())); } final ConsumerGroupDescription consumerGroupDescription = new ConsumerGroupDescription(groupIdKey.idValue, protocolType.isEmpty(), @@ -290,7 +297,9 @@ private ApiResult handledClassicGroupR GroupType.CLASSIC, GroupState.parse(describedGroup.groupState()), coordinator, - authorizedOperations); + authorizedOperations, + Optional.empty(), + Optional.empty()); completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b0b48e33c67ff..44f6e1f5a8891 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -4057,6 +4057,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { .setTopicName("foo") .setPartitions(singletonList(1)) ))) + .setMemberType((byte) 1) )), new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId("grp2") @@ -4110,14 +4111,18 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { ), Optional.of(new MemberAssignment( Collections.singleton(new TopicPartition("foo", 1)) - )) + )), + Optional.of(10), + Optional.of(true) ) ), "range", GroupType.CONSUMER, GroupState.STABLE, env.cluster().controller(), - Collections.emptySet() + Collections.emptySet(), + Optional.of(10), + Optional.of(10) )); expectedResult.put("grp2", new ConsumerGroupDescription( "grp2", @@ -4130,14 +4135,19 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception { "clientHost", new MemberAssignment( Collections.singleton(new TopicPartition("bar", 0)) - ) + ), + Optional.empty(), + Optional.empty(), + Optional.empty() ) ), "range", GroupType.CLASSIC, GroupState.STABLE, env.cluster().controller(), - Collections.emptySet() + Collections.emptySet(), + Optional.empty(), + Optional.empty() )); assertEquals(expectedResult, result.all().get()); @@ -8674,7 +8684,10 @@ private static MemberDescription convertToMemberDescriptions(DescribedGroupMembe Optional.ofNullable(member.groupInstanceId()), member.clientId(), member.clientHost(), - assignment); + assignment, + Optional.empty(), + Optional.empty(), + Optional.empty()); } private static ShareMemberDescription convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java index 0bddc618cfc03..16ce11d7361e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java @@ -99,5 +99,38 @@ public void testNonEqual() { assertNotEquals(STATIC_MEMBER_DESCRIPTION, newInstanceDescription); assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newInstanceDescription.hashCode()); + + MemberDescription newTargetAssignmentDescription = new MemberDescription(MEMBER_ID, + INSTANCE_ID, + CLIENT_ID, + HOST, + ASSIGNMENT, + Optional.of(ASSIGNMENT), + Optional.empty(), + Optional.empty()); + assertNotEquals(STATIC_MEMBER_DESCRIPTION, newTargetAssignmentDescription); + assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newTargetAssignmentDescription.hashCode()); + + MemberDescription newMemberEpochDescription = new MemberDescription(MEMBER_ID, + INSTANCE_ID, + CLIENT_ID, + HOST, + ASSIGNMENT, + Optional.empty(), + Optional.of(1), + Optional.empty()); + assertNotEquals(STATIC_MEMBER_DESCRIPTION, newMemberEpochDescription); + assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newMemberEpochDescription.hashCode()); + + MemberDescription newIsClassicDescription = new MemberDescription(MEMBER_ID, + INSTANCE_ID, + CLIENT_ID, + HOST, + ASSIGNMENT, + Optional.empty(), + Optional.empty(), + Optional.of(false)); + assertNotEquals(STATIC_MEMBER_DESCRIPTION, newIsClassicDescription); + assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), newIsClassicDescription.hashCode()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index cfbf67e2090d8..20cf0b761e641 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -54,6 +55,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.List; import java.util.Optional; import java.util.Set; @@ -152,29 +154,46 @@ public void testInvalidBuildRequest() { @Test public void testSuccessfulHandleConsumerGroupResponse() { DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(false, logContext); - Collection members = singletonList(new MemberDescription( - "memberId", - Optional.of("instanceId"), - "clientId", - "host", - new MemberAssignment(Set.of( - new TopicPartition("foo", 0), - new TopicPartition("bar", 1)) + Collection members = List.of( + new MemberDescription( + "memberId", + Optional.of("instanceId"), + "clientId", + "host", + new MemberAssignment(Set.of( + new TopicPartition("foo", 0) + )), + Optional.of(new MemberAssignment(Set.of( + new TopicPartition("foo", 1) + ))), + Optional.of(10), + Optional.of(true) ), - Optional.of(new MemberAssignment(Set.of( - new TopicPartition("foo", 1), - new TopicPartition("bar", 2) - ))) - )); + new MemberDescription( + "memberId-classic", + Optional.of("instanceId-classic"), + "clientId-classic", + "host", + new MemberAssignment(Set.of( + new TopicPartition("bar", 0) + )), + Optional.of(new MemberAssignment(Set.of( + new TopicPartition("bar", 1) + ))), + Optional.of(9), + Optional.of(false) + )); ConsumerGroupDescription expected = new ConsumerGroupDescription( groupId1, false, members, "range", GroupType.CONSUMER, - ConsumerGroupState.STABLE, + GroupState.STABLE, coordinator, - Collections.emptySet() + Collections.emptySet(), + Optional.of(10), + Optional.of(10) ); AdminApiHandler.ApiResult result = handler.handleResponse( coordinator, @@ -189,7 +208,7 @@ public void testSuccessfulHandleConsumerGroupResponse() { .setAssignmentEpoch(10) .setAssignorName("range") .setAuthorizedOperations(Utils.to32BitField(emptySet())) - .setMembers(singletonList( + .setMembers(List.of( new ConsumerGroupDescribeResponseData.Member() .setMemberId("memberId") .setInstanceId("instanceId") @@ -200,27 +219,44 @@ public void testSuccessfulHandleConsumerGroupResponse() { .setSubscribedTopicNames(singletonList("foo")) .setSubscribedTopicRegex("regex") .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() - .setTopicPartitions(Arrays.asList( + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) .setTopicName("foo") - .setPartitions(Collections.singletonList(0)), + .setPartitions(Collections.singletonList(0)) + ))) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) - .setTopicName("bar") + .setTopicName("foo") .setPartitions(Collections.singletonList(1)) ))) - .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() - .setTopicPartitions(Arrays.asList( + .setMemberType((byte) 1), + new ConsumerGroupDescribeResponseData.Member() + .setMemberId("memberId-classic") + .setInstanceId("instanceId-classic") + .setClientHost("host") + .setClientId("clientId-classic") + .setMemberEpoch(9) + .setRackId("rackid") + .setSubscribedTopicNames(singletonList("bar")) + .setSubscribedTopicRegex("regex") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) - .setTopicName("foo") - .setPartitions(Collections.singletonList(1)), + .setTopicName("bar") + .setPartitions(Collections.singletonList(0)) + ))) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List.of( new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(Uuid.randomUuid()) .setTopicName("bar") - .setPartitions(Collections.singletonList(2)) + .setPartitions(Collections.singletonList(1)) ))) + .setMemberType((byte) 0) )) )) ) @@ -232,9 +268,13 @@ public void testSuccessfulHandleConsumerGroupResponse() { public void testSuccessfulHandleClassicGroupResponse() { Collection members = singletonList(new MemberDescription( "memberId", + Optional.empty(), "clientId", "host", - new MemberAssignment(tps))); + new MemberAssignment(tps), + Optional.empty(), + Optional.empty(), + Optional.empty())); ConsumerGroupDescription expected = new ConsumerGroupDescription( groupId1, true, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 64d9cc94c2dda..bd381f0306ecc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1921,12 +1921,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test that we can get information about the test consumer group. assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() + assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.groupEpoch.isEmpty) + assertEquals(groupType == GroupType.CLASSIC, testGroupDescription.targetAssignmentEpoch.isEmpty) assertEquals(testGroupId, testGroupDescription.groupId()) assertFalse(testGroupDescription.isSimpleConsumerGroup) assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) val members = testGroupDescription.members() - members.asScala.foreach(member => assertEquals(testClientId, member.clientId())) + members.asScala.foreach { member => + assertEquals(testClientId, member.clientId) + assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded) + } val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) topicSet.foreach { topic => val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty) @@ -2058,6 +2063,89 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + /** + * Test the consumer group APIs. + */ + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testConsumerGroupWithMemberMigration(quorum: String): Unit = { + val config = createConfig + client = Admin.create(config) + var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null + var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null + try { + // Verify that initially there are no consumer groups to list. + val list1 = client.listConsumerGroups + assertEquals(0, list1.all.get.size) + assertEquals(0, list1.errors.get.size) + assertEquals(0, list1.valid.get.size) + val testTopicName = "test_topic" + val testNumPartitions = 2 + + client.createTopics(util.Arrays.asList( + new NewTopic(testTopicName, testNumPartitions, 1.toShort), + )).all.get + waitForTopics(client, List(testTopicName), List()) + + val producer = createProducer() + try { + producer.send(new ProducerRecord(testTopicName, 0, null, null)) + producer.send(new ProducerRecord(testTopicName, 1, null, null)) + producer.flush() + } finally { + Utils.closeQuietly(producer, "producer") + } + + val testGroupId = "test_group_id" + val testClassicClientId = "test_classic_client_id" + val testConsumerClientId = "test_consumer_client_id" + + val newConsumerConfig = new Properties(consumerConfig) + newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, testClassicClientId) + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) + + classicConsumer = createConsumer(configOverrides = newConsumerConfig) + classicConsumer.subscribe(List(testTopicName).asJava) + classicConsumer.poll(JDuration.ofMillis(1000)) + + newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, testConsumerClientId) + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name) + consumerConsumer = createConsumer(configOverrides = newConsumerConfig) + consumerConsumer.subscribe(List(testTopicName).asJava) + consumerConsumer.poll(JDuration.ofMillis(1000)) + + TestUtils.waitUntilTrue(() => { + classicConsumer.poll(JDuration.ofMillis(100)) + consumerConsumer.poll(JDuration.ofMillis(100)) + val describeConsumerGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava).all.get + describeConsumerGroupResult.containsKey(testGroupId) && + describeConsumerGroupResult.get(testGroupId).groupState == GroupState.STABLE && + describeConsumerGroupResult.get(testGroupId).members.size == 2 + }, s"Expected to find 2 members in a stable group $testGroupId") + + val describeConsumerGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava).all.get + val group = describeConsumerGroupResult.get(testGroupId) + assertNotNull(group) + assertEquals(Optional.of(2), group.groupEpoch) + assertEquals(Optional.of(2), group.targetAssignmentEpoch) + + val classicMember = group.members.asScala.find(_.clientId == testClassicClientId) + assertTrue(classicMember.isDefined) + assertEquals(Optional.of(2), classicMember.get.memberEpoch) + assertEquals(Optional.of(false), classicMember.get.upgraded) + + val consumerMember = group.members.asScala.find(_.clientId == testConsumerClientId) + assertTrue(consumerMember.isDefined) + assertEquals(Optional.of(2), consumerMember.get.memberEpoch) + assertEquals(Optional.of(true), consumerMember.get.upgraded) + } finally { + Utils.closeQuietly(classicConsumer, "classicConsumer") + Utils.closeQuietly(consumerConsumer, "consumerConsumer") + Utils.closeQuietly(client, "adminClient") + } + } + /** * Test the consumer group APIs. */ @@ -2546,9 +2634,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { }, "Expected to find all groups") val classicConsumers = client.describeClassicGroups(groupIds.asJavaCollection).all().get() - assertNotNull(classicConsumers.get(classicGroupId)) - assertEquals(classicGroupId, classicConsumers.get(classicGroupId).groupId()) - assertEquals("consumer", classicConsumers.get(classicGroupId).protocol()) + val classicConsumer = classicConsumers.get(classicGroupId) + assertNotNull(classicConsumer) + assertEquals(classicGroupId, classicConsumer.groupId) + assertEquals("consumer", classicConsumer.protocol) + assertFalse(classicConsumer.members.isEmpty) + classicConsumer.members.forEach(member => assertTrue(member.upgraded.isEmpty)) assertNotNull(classicConsumers.get(simpleGroupId)) assertEquals(simpleGroupId, classicConsumers.get(simpleGroupId).groupId()) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index dcb8fd054812d..7a134ac0c9610 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -139,8 +140,12 @@ public void testAdminRequestsForDescribeNegativeOffsets() throws Exception { true, Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions))), RangeAssignor.class.getName(), + GroupType.CLASSIC, GroupState.STABLE, - new Node(1, "localhost", 9092)); + new Node(1, "localhost", 9092), + Set.of(), + Optional.empty(), + Optional.empty()); Function, ArgumentMatcher>> offsetsArgMatcher = expectedPartitions -> topicPartitionOffsets -> topicPartitionOffsets != null && topicPartitionOffsets.keySet().equals(expectedPartitions); @@ -233,8 +238,12 @@ private DescribeConsumerGroupsResult describeGroupsResult(GroupState groupState) true, Collections.singleton(member1), RangeAssignor.class.getName(), + GroupType.CLASSIC, groupState, - new Node(1, "localhost", 9092)); + new Node(1, "localhost", 9092), + Set.of(), + Optional.empty(), + Optional.empty()); KafkaFutureImpl future = new KafkaFutureImpl<>(); future.complete(description); return new DescribeConsumerGroupsResult(Collections.singletonMap(GROUP, future));