diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 4590c74d6f2de..32bd165817457 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -50,12 +50,12 @@ public ConsumerGroupDescription(String groupId, } ConsumerGroupDescription(String groupId, - boolean isSimpleConsumerGroup, - Collection members, - String partitionAssignor, - ConsumerGroupState state, - Node coordinator, - Set authorizedOperations) { + boolean isSimpleConsumerGroup, + Collection members, + String partitionAssignor, + ConsumerGroupState state, + Node coordinator, + Set authorizedOperations) { this.groupId = groupId == null ? "" : groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; this.members = members == null ? Collections.emptyList() : diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e01ecf3c22fbc..d923bd75a337e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2716,8 +2716,9 @@ void handleResponse(AbstractResponse abstractResponse) { deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); partitions = new HashSet<>(assignment.partitions()); } - final MemberDescription memberDescription = - new MemberDescription(groupMember.memberId(), + final MemberDescription memberDescription = new MemberDescription( + groupMember.memberId(), + Optional.ofNullable(groupMember.groupInstanceId()), groupMember.clientId(), groupMember.clientHost(), new MemberAssignment(partitions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index a8865bed0ee2d..5d6622791cb2d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -14,31 +14,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.clients.admin; -import org.apache.kafka.common.TopicPartition; - import java.util.Collections; import java.util.Objects; +import java.util.Optional; /** * A detailed description of a single group instance in the cluster. */ public class MemberDescription { private final String memberId; + private final Optional groupInstanceId; private final String clientId; private final String host; private final MemberAssignment assignment; - public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) { + MemberDescription(String memberId, + Optional groupInstanceId, + String clientId, + String host, + MemberAssignment assignment) { this.memberId = memberId == null ? "" : memberId; + this.groupInstanceId = groupInstanceId; this.clientId = clientId == null ? "" : clientId; this.host = host == null ? "" : host; this.assignment = assignment == null ? - new MemberAssignment(Collections.emptySet()) : assignment; + new MemberAssignment(Collections.emptySet()) : assignment; } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -62,6 +67,13 @@ public String consumerId() { return memberId; } + /** + * The instance id of the group member. + */ + public Optional groupInstanceId() { + return groupInstanceId; + } + /** * The client id of the group member. */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 823512f857d97..cb369a0980867 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -54,12 +54,14 @@ public DescribeGroupsResponse(Struct struct, short version) { public static DescribedGroupMember groupMember( final String memberId, + final String groupInstanceId, final String clientId, final String clientHost, final byte[] assignment, final byte[] metadata) { return new DescribedGroupMember() .setMemberId(memberId) + .setGroupInstanceId(groupInstanceId) .setClientId(clientId) .setClientHost(clientHost) .setMemberAssignment(assignment) diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json index b6f33ed4022df..87cad1ab9c037 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json +++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json @@ -19,7 +19,8 @@ "name": "DescribeGroupsRequest", // Versions 1 and 2 are the same as version 0. // Starting in version 3, authorized operations can be requested. - "validVersions": "0-3", + // Starting in version 4, the response will include group.instance.id info for members. + "validVersions": "0-4", "fields": [ { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The names of the groups to describe" }, diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json index dd8525a3069b5..9ec56e9c255c6 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json +++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json @@ -20,7 +20,8 @@ // Version 1 added throttle time. // Starting in version 2, on quota violation, brokers send out responses before throttling. // Starting in version 3, brokers can send authorized operations. - "validVersions": "0-3", + // Starting in version 4, the response will include group.instance.id info for members. + "validVersions": "0-4", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, @@ -42,6 +43,9 @@ "about": "The group members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID assigned by the group coordinator." }, + { "name": "GroupInstanceId", "type": "string", "versions": "4+", + "nullableVersions": "4+", "default": "null", + "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "ClientId", "type": "string", "versions": "0+", "about": "The client ID used in the member's latest join group request." }, { "name": "ClientHost", "type": "string", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index aa8749cd7d67e..693538d9f1f2b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DeleteTopicsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; @@ -1151,7 +1152,7 @@ public void testDescribeConsumerGroups() throws Exception { DescribeGroupsResponseData data = new DescribeGroupsResponseData(); - //Retriable errors should be retried + //Retriable errors should be retried data.groups().add(DescribeGroupsResponse.groupMetadata( "group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS, @@ -1204,16 +1205,21 @@ public void testDescribeConsumerGroups() throws Exception { byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; memberAssignment.get(memberAssignmentBytes); + DescribedGroupMember memberOne = DescribeGroupsResponse.groupMember("0", "instance1", "clientId0", "clientHost", memberAssignmentBytes, null); + DescribedGroupMember memberTwo = DescribeGroupsResponse.groupMember("1", "instance2", "clientId1", "clientHost", memberAssignmentBytes, null); + + List expectedMemberDescriptions = new ArrayList<>(); + expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne, + new MemberAssignment(new HashSet<>(topicPartitions)))); + expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo, + new MemberAssignment(new HashSet<>(topicPartitions)))); data.groups().add(DescribeGroupsResponse.groupMetadata( "group-0", Errors.NONE, "", ConsumerProtocol.PROTOCOL_TYPE, "", - asList( - DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null), - DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null) - ), + asList(memberOne, memberTwo), Collections.emptySet())); env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); @@ -1224,6 +1230,7 @@ public void testDescribeConsumerGroups() throws Exception { assertEquals(1, result.describedGroups().size()); assertEquals("group-0", groupDescription.groupId()); assertEquals(2, groupDescription.members().size()); + assertEquals(expectedMemberDescriptions, groupDescription.members()); } } @@ -1266,8 +1273,8 @@ public void testDescribeMultipleConsumerGroups() throws Exception { ConsumerProtocol.PROTOCOL_TYPE, "", asList( - DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null), - DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null) + DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null), + DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null) ), Collections.emptySet())); @@ -1279,8 +1286,8 @@ public void testDescribeMultipleConsumerGroups() throws Exception { "connect", "", asList( - DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null), - DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null) + DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null), + DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null) ), Collections.emptySet())); @@ -1511,6 +1518,15 @@ public void testIncrementalAlterConfigs() throws Exception { } } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, + MemberAssignment assignment) { + return new MemberDescription(member.memberId(), + Optional.ofNullable(member.groupInstanceId()), + member.clientId(), + member.clientHost(), + assignment); + } + @SafeVarargs private static void assertCollectionIs(Collection collection, T... elements) { for (T element : elements) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 11d85d6d7cf8a..23cbbd08bb331 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -889,7 +889,7 @@ private DescribeGroupsResponse createDescribeGroupResponse() { String clientId = "consumer-1"; String clientHost = "localhost"; DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData(); - DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId", + DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId", null, clientId, clientHost, new byte[0], new byte[0]); DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE, "STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet()); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fa45cf5308a84..037a310428863 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1278,6 +1278,7 @@ class KafkaApis(val requestChannel: RequestChannel, val members = summary.members.map { member => new DescribeGroupsResponseData.DescribedGroupMember() .setMemberId(member.memberId) + .setGroupInstanceId(member.groupInstanceId.orNull) .setClientId(member.clientId) .setClientHost(member.clientHost) .setMemberAssignment(member.assignment) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2cf3e5db4093a..0a7bcc74e2804 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -2515,9 +2515,30 @@ class GroupCoordinatorTest { } @Test - def testDescribeGroupStable() { - val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) + def testDescribeGroupStableForDynamicMember() { + val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(protocolType, summary.protocolType) + assertEquals("range", summary.protocol) + assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) + } + + @Test + def testDescribeGroupStableForStaticMember() { + val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocols) val assignedMemberId = joinGroupResult.memberId val generationId = joinGroupResult.generationId val joinGroupError = joinGroupResult.error @@ -2535,6 +2556,7 @@ class GroupCoordinatorTest { assertEquals(protocolType, summary.protocolType) assertEquals("range", summary.protocol) assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) + assertEquals(List(leaderInstanceId), summary.members.map(_.groupInstanceId)) } @Test