From 352f35a808831bd64b4a1639ce4aff128b6ac191 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 13 Jun 2019 15:57:37 -0700 Subject: [PATCH 1/5] add group.instance.id to DescribeGroup --- .../admin/ConsumerGroupDescription.java | 12 +++---- .../kafka/clients/admin/KafkaAdminClient.java | 5 +-- .../clients/admin/MemberDescription.java | 25 +++++++++++++- .../requests/DescribeGroupsResponse.java | 2 ++ .../common/message/DescribeGroupsRequest.json | 3 +- .../message/DescribeGroupsResponse.json | 6 +++- .../clients/admin/KafkaAdminClientTest.java | 34 ++++++++++++++----- .../common/requests/RequestResponseTest.java | 2 +- .../kafka/admin/ConsumerGroupCommand.scala | 27 ++++++++++----- .../main/scala/kafka/server/KafkaApis.scala | 1 + .../api/AdminClientIntegrationTest.scala | 4 +++ .../group/GroupCoordinatorTest.scala | 3 +- 12 files changed, 94 insertions(+), 30 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 4590c74d6f2de..32bd165817457 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -50,12 +50,12 @@ public ConsumerGroupDescription(String groupId, } ConsumerGroupDescription(String groupId, - boolean isSimpleConsumerGroup, - Collection members, - String partitionAssignor, - ConsumerGroupState state, - Node coordinator, - Set authorizedOperations) { + boolean isSimpleConsumerGroup, + Collection members, + String partitionAssignor, + ConsumerGroupState state, + Node coordinator, + Set authorizedOperations) { this.groupId = groupId == null ? "" : groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; this.members = members == null ? Collections.emptyList() : diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e01ecf3c22fbc..d923bd75a337e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2716,8 +2716,9 @@ void handleResponse(AbstractResponse abstractResponse) { deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); partitions = new HashSet<>(assignment.partitions()); } - final MemberDescription memberDescription = - new MemberDescription(groupMember.memberId(), + final MemberDescription memberDescription = new MemberDescription( + groupMember.memberId(), + Optional.ofNullable(groupMember.groupInstanceId()), groupMember.clientId(), groupMember.clientHost(), new MemberAssignment(partitions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index a8865bed0ee2d..0fa8ba90679de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -21,24 +21,40 @@ import java.util.Collections; import java.util.Objects; +import java.util.Optional; /** * A detailed description of a single group instance in the cluster. */ public class MemberDescription { private final String memberId; + private final Optional groupInstanceId; private final String clientId; private final String host; private final MemberAssignment assignment; - public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) { + @Deprecated + public MemberDescription(String memberId, + String clientId, + String host, + MemberAssignment assignment) { + this(memberId, Optional.empty(), clientId, host, assignment); + } + + public MemberDescription(String memberId, + Optional groupInstanceId, + String clientId, + String host, + MemberAssignment assignment) { this.memberId = memberId == null ? "" : memberId; + this.groupInstanceId = groupInstanceId; this.clientId = clientId == null ? "" : clientId; this.host = host == null ? "" : host; this.assignment = assignment == null ? new MemberAssignment(Collections.emptySet()) : assignment; } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -62,6 +78,13 @@ public String consumerId() { return memberId; } + /** + * The instance id of the group member. + */ + public Optional groupInstanceId() { + return groupInstanceId; + } + /** * The client id of the group member. */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 823512f857d97..cb369a0980867 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -54,12 +54,14 @@ public DescribeGroupsResponse(Struct struct, short version) { public static DescribedGroupMember groupMember( final String memberId, + final String groupInstanceId, final String clientId, final String clientHost, final byte[] assignment, final byte[] metadata) { return new DescribedGroupMember() .setMemberId(memberId) + .setGroupInstanceId(groupInstanceId) .setClientId(clientId) .setClientHost(clientHost) .setMemberAssignment(assignment) diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json index b6f33ed4022df..87cad1ab9c037 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json +++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json @@ -19,7 +19,8 @@ "name": "DescribeGroupsRequest", // Versions 1 and 2 are the same as version 0. // Starting in version 3, authorized operations can be requested. - "validVersions": "0-3", + // Starting in version 4, the response will include group.instance.id info for members. + "validVersions": "0-4", "fields": [ { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The names of the groups to describe" }, diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json index dd8525a3069b5..9ec56e9c255c6 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json +++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json @@ -20,7 +20,8 @@ // Version 1 added throttle time. // Starting in version 2, on quota violation, brokers send out responses before throttling. // Starting in version 3, brokers can send authorized operations. - "validVersions": "0-3", + // Starting in version 4, the response will include group.instance.id info for members. + "validVersions": "0-4", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, @@ -42,6 +43,9 @@ "about": "The group members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID assigned by the group coordinator." }, + { "name": "GroupInstanceId", "type": "string", "versions": "4+", + "nullableVersions": "4+", "default": "null", + "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "ClientId", "type": "string", "versions": "0+", "about": "The client ID used in the member's latest join group request." }, { "name": "ClientHost", "type": "string", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index aa8749cd7d67e..693538d9f1f2b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DeleteTopicsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; @@ -1151,7 +1152,7 @@ public void testDescribeConsumerGroups() throws Exception { DescribeGroupsResponseData data = new DescribeGroupsResponseData(); - //Retriable errors should be retried + //Retriable errors should be retried data.groups().add(DescribeGroupsResponse.groupMetadata( "group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS, @@ -1204,16 +1205,21 @@ public void testDescribeConsumerGroups() throws Exception { byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; memberAssignment.get(memberAssignmentBytes); + DescribedGroupMember memberOne = DescribeGroupsResponse.groupMember("0", "instance1", "clientId0", "clientHost", memberAssignmentBytes, null); + DescribedGroupMember memberTwo = DescribeGroupsResponse.groupMember("1", "instance2", "clientId1", "clientHost", memberAssignmentBytes, null); + + List expectedMemberDescriptions = new ArrayList<>(); + expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne, + new MemberAssignment(new HashSet<>(topicPartitions)))); + expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo, + new MemberAssignment(new HashSet<>(topicPartitions)))); data.groups().add(DescribeGroupsResponse.groupMetadata( "group-0", Errors.NONE, "", ConsumerProtocol.PROTOCOL_TYPE, "", - asList( - DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null), - DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null) - ), + asList(memberOne, memberTwo), Collections.emptySet())); env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); @@ -1224,6 +1230,7 @@ public void testDescribeConsumerGroups() throws Exception { assertEquals(1, result.describedGroups().size()); assertEquals("group-0", groupDescription.groupId()); assertEquals(2, groupDescription.members().size()); + assertEquals(expectedMemberDescriptions, groupDescription.members()); } } @@ -1266,8 +1273,8 @@ public void testDescribeMultipleConsumerGroups() throws Exception { ConsumerProtocol.PROTOCOL_TYPE, "", asList( - DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null), - DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null) + DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null), + DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null) ), Collections.emptySet())); @@ -1279,8 +1286,8 @@ public void testDescribeMultipleConsumerGroups() throws Exception { "connect", "", asList( - DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null), - DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null) + DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null), + DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null) ), Collections.emptySet())); @@ -1511,6 +1518,15 @@ public void testIncrementalAlterConfigs() throws Exception { } } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, + MemberAssignment assignment) { + return new MemberDescription(member.memberId(), + Optional.ofNullable(member.groupInstanceId()), + member.clientId(), + member.clientHost(), + assignment); + } + @SafeVarargs private static void assertCollectionIs(Collection collection, T... elements) { for (T element : elements) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 11d85d6d7cf8a..23cbbd08bb331 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -889,7 +889,7 @@ private DescribeGroupsResponse createDescribeGroupResponse() { String clientId = "consumer-1"; String clientHost = "localhost"; DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData(); - DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId", + DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId", null, clientId, clientHost, new byte[0], new byte[0]); DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE, "STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet()); diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 1ca351565330c..4f7c99b0fd75c 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -116,11 +116,11 @@ object ConsumerGroupCommand extends Logging { private[admin] case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String], partition: Option[Int], offset: Option[Long], lag: Option[Long], - consumerId: Option[String], host: Option[String], + consumerId: Option[String], groupInstanceId: Option[String], host: Option[String], clientId: Option[String], logEndOffset: Option[Long]) - private[admin] case class MemberAssignmentState(group: String, consumerId: String, host: String, clientId: String, - numPartitions: Int, assignment: List[TopicPartition]) + private[admin] case class MemberAssignmentState(group: String, consumerId: String, groupInstanceId: Option[String], + host: String, clientId: String, numPartitions: Int, assignment: List[TopicPartition]) private[admin] case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int) @@ -325,15 +325,18 @@ object ConsumerGroupCommand extends Logging { topicPartitions: Seq[TopicPartition], getPartitionOffset: TopicPartition => Option[Long], consumerIdOpt: Option[String], + groupInstanceId: Option[String], hostOpt: Option[String], clientIdOpt: Option[String]): Array[PartitionAssignmentState] = { if (topicPartitions.isEmpty) { Array[PartitionAssignmentState]( - PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None) + PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), + consumerIdOpt, groupInstanceId, hostOpt, clientIdOpt, None) ) } else - describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt, hostOpt, clientIdOpt) + describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, + consumerIdOpt, groupInstanceId, hostOpt, clientIdOpt) } private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] = @@ -344,13 +347,14 @@ object ConsumerGroupCommand extends Logging { topicPartitions: Seq[TopicPartition], getPartitionOffset: TopicPartition => Option[Long], consumerIdOpt: Option[String], + groupInstanceId: Option[String], hostOpt: Option[String], clientIdOpt: Option[String]): Array[PartitionAssignmentState] = { def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]): PartitionAssignmentState = { val offset = getPartitionOffset(topicPartition) PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition), offset, - getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt) + getLag(offset, logEndOffsetOpt), consumerIdOpt, groupInstanceId, hostOpt, clientIdOpt, logEndOffsetOpt) } getLogEndOffsets(group, topicPartitions).map { @@ -428,8 +432,13 @@ object ConsumerGroupCommand extends Logging { .map { topicPartition => topicPartition -> committedOffsets.get(topicPartition).map(_.offset) }.toMap - collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList, - partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), + collectConsumerAssignment(groupId, + Option(consumerGroup.coordinator), + topicPartitions.toList, + partitionOffsets, + Some(s"${consumerSummary.consumerId}"), + Option(consumerSummary.groupInstanceId.orElse(null)), + Some(s"${consumerSummary.host}"), Some(s"${consumerSummary.clientId}")) } val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap { @@ -441,6 +450,7 @@ object ConsumerGroupCommand extends Logging { Map(topicPartition -> Some(offset.offset)), Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE), + Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE)) } groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer)) @@ -461,6 +471,7 @@ object ConsumerGroupCommand extends Logging { MemberAssignmentState( groupId, consumer.consumerId, + Option(consumer.groupInstanceId.orElse(null)), consumer.host, consumer.clientId, consumer.assignment.topicPartitions.size(), diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fa45cf5308a84..037a310428863 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1278,6 +1278,7 @@ class KafkaApis(val requestChannel: RequestChannel, val members = summary.members.map { member => new DescribeGroupsResponseData.DescribedGroupMember() .setMemberId(member.memberId) + .setGroupInstanceId(member.groupInstanceId.orNull) .setClientId(member.clientId) .setClientHost(member.clientHost) .setMemberAssignment(member.assignment) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 306bcb5fb91d6..c5d5eafcbfa8f 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1160,10 +1160,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } val testGroupId = "test_group_id" val testClientId = "test_client_id" + val testGroupInstanceId = "test_group_instance_id" val fakeGroupId = "fake_group_id" val newConsumerConfig = new Properties(consumerConfig) newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, testGroupInstanceId) val consumer = createConsumer(configOverrides = newConsumerConfig) val latch = new CountDownLatch(1) try { @@ -1206,6 +1208,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(1, testGroupDescription.members().size()) val member = testGroupDescription.members().iterator().next() assertEquals(testClientId, member.clientId()) + assertTrue(member.groupInstanceId().isPresent) + assertEquals(testGroupInstanceId, member.groupInstanceId().get()) val topicPartitions = member.assignment().topicPartitions() assertEquals(testNumPartitions, topicPartitions.size()) assertEquals(testNumPartitions, topicPartitions.asScala. diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2cf3e5db4093a..d7439b03b6bff 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -2517,7 +2517,7 @@ class GroupCoordinatorTest { @Test def testDescribeGroupStable() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols) + val joinGroupResult = staticJoinGroup(groupId, memberId, leaderInstanceId, protocolType, protocols) val assignedMemberId = joinGroupResult.memberId val generationId = joinGroupResult.generationId val joinGroupError = joinGroupResult.error @@ -2535,6 +2535,7 @@ class GroupCoordinatorTest { assertEquals(protocolType, summary.protocolType) assertEquals("range", summary.protocol) assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) + assertEquals(List(leaderInstanceId), summary.members.map(_.groupInstanceId)) } @Test From 3a7aee65dead771bbf85c96f82b0dd5662f4f2e7 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Fri, 21 Jun 2019 09:06:26 -0700 Subject: [PATCH 2/5] remove unused constructor --- .../org/apache/kafka/clients/admin/MemberDescription.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index 0fa8ba90679de..89489105f0f31 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -33,14 +33,6 @@ public class MemberDescription { private final String host; private final MemberAssignment assignment; - @Deprecated - public MemberDescription(String memberId, - String clientId, - String host, - MemberAssignment assignment) { - this(memberId, Optional.empty(), clientId, host, assignment); - } - public MemberDescription(String memberId, Optional groupInstanceId, String clientId, From eea7cf5939bcdabe92bb10e2922cc0972f352450 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Fri, 21 Jun 2019 11:52:12 -0700 Subject: [PATCH 3/5] reset deprecated admin client changes --- .../kafka/admin/ConsumerGroupCommand.scala | 27 ++++++------------- .../api/AdminClientIntegrationTest.scala | 4 --- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 4f7c99b0fd75c..1ca351565330c 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -116,11 +116,11 @@ object ConsumerGroupCommand extends Logging { private[admin] case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String], partition: Option[Int], offset: Option[Long], lag: Option[Long], - consumerId: Option[String], groupInstanceId: Option[String], host: Option[String], + consumerId: Option[String], host: Option[String], clientId: Option[String], logEndOffset: Option[Long]) - private[admin] case class MemberAssignmentState(group: String, consumerId: String, groupInstanceId: Option[String], - host: String, clientId: String, numPartitions: Int, assignment: List[TopicPartition]) + private[admin] case class MemberAssignmentState(group: String, consumerId: String, host: String, clientId: String, + numPartitions: Int, assignment: List[TopicPartition]) private[admin] case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int) @@ -325,18 +325,15 @@ object ConsumerGroupCommand extends Logging { topicPartitions: Seq[TopicPartition], getPartitionOffset: TopicPartition => Option[Long], consumerIdOpt: Option[String], - groupInstanceId: Option[String], hostOpt: Option[String], clientIdOpt: Option[String]): Array[PartitionAssignmentState] = { if (topicPartitions.isEmpty) { Array[PartitionAssignmentState]( - PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), - consumerIdOpt, groupInstanceId, hostOpt, clientIdOpt, None) + PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None) ) } else - describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, - consumerIdOpt, groupInstanceId, hostOpt, clientIdOpt) + describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt, hostOpt, clientIdOpt) } private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] = @@ -347,14 +344,13 @@ object ConsumerGroupCommand extends Logging { topicPartitions: Seq[TopicPartition], getPartitionOffset: TopicPartition => Option[Long], consumerIdOpt: Option[String], - groupInstanceId: Option[String], hostOpt: Option[String], clientIdOpt: Option[String]): Array[PartitionAssignmentState] = { def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]): PartitionAssignmentState = { val offset = getPartitionOffset(topicPartition) PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition), offset, - getLag(offset, logEndOffsetOpt), consumerIdOpt, groupInstanceId, hostOpt, clientIdOpt, logEndOffsetOpt) + getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt) } getLogEndOffsets(group, topicPartitions).map { @@ -432,13 +428,8 @@ object ConsumerGroupCommand extends Logging { .map { topicPartition => topicPartition -> committedOffsets.get(topicPartition).map(_.offset) }.toMap - collectConsumerAssignment(groupId, - Option(consumerGroup.coordinator), - topicPartitions.toList, - partitionOffsets, - Some(s"${consumerSummary.consumerId}"), - Option(consumerSummary.groupInstanceId.orElse(null)), - Some(s"${consumerSummary.host}"), + collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList, + partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), Some(s"${consumerSummary.clientId}")) } val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap { @@ -450,7 +441,6 @@ object ConsumerGroupCommand extends Logging { Map(topicPartition -> Some(offset.offset)), Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE), - Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE)) } groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer)) @@ -471,7 +461,6 @@ object ConsumerGroupCommand extends Logging { MemberAssignmentState( groupId, consumer.consumerId, - Option(consumer.groupInstanceId.orElse(null)), consumer.host, consumer.clientId, consumer.assignment.topicPartitions.size(), diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index c5d5eafcbfa8f..306bcb5fb91d6 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1160,12 +1160,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } val testGroupId = "test_group_id" val testClientId = "test_client_id" - val testGroupInstanceId = "test_group_instance_id" val fakeGroupId = "fake_group_id" val newConsumerConfig = new Properties(consumerConfig) newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) - newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, testGroupInstanceId) val consumer = createConsumer(configOverrides = newConsumerConfig) val latch = new CountDownLatch(1) try { @@ -1208,8 +1206,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(1, testGroupDescription.members().size()) val member = testGroupDescription.members().iterator().next() assertEquals(testClientId, member.clientId()) - assertTrue(member.groupInstanceId().isPresent) - assertEquals(testGroupInstanceId, member.groupInstanceId().get()) val topicPartitions = member.assignment().topicPartitions() assertEquals(testNumPartitions, topicPartitions.size()) assertEquals(testNumPartitions, topicPartitions.asScala. From e4e6c22b81439e5e44ca433cd6bc86ecc0e9e0e5 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Fri, 21 Jun 2019 12:28:12 -0700 Subject: [PATCH 4/5] separate static and dynamic member test --- .../group/GroupCoordinatorTest.scala | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index d7439b03b6bff..0a7bcc74e2804 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -2515,9 +2515,30 @@ class GroupCoordinatorTest { } @Test - def testDescribeGroupStable() { - val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = staticJoinGroup(groupId, memberId, leaderInstanceId, protocolType, protocols) + def testDescribeGroupStableForDynamicMember() { + val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(protocolType, summary.protocolType) + assertEquals("range", summary.protocol) + assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) + } + + @Test + def testDescribeGroupStableForStaticMember() { + val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocols) val assignedMemberId = joinGroupResult.memberId val generationId = joinGroupResult.generationId val joinGroupError = joinGroupResult.error From 7543ca384cfdbfe5416e42db00e921efc86a50f6 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Sat, 22 Jun 2019 19:23:10 -0700 Subject: [PATCH 5/5] address nit --- .../kafka/clients/admin/MemberDescription.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java index 89489105f0f31..5d6622791cb2d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java @@ -14,11 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.clients.admin; -import org.apache.kafka.common.TopicPartition; - import java.util.Collections; import java.util.Objects; import java.util.Optional; @@ -33,17 +30,17 @@ public class MemberDescription { private final String host; private final MemberAssignment assignment; - public MemberDescription(String memberId, - Optional groupInstanceId, - String clientId, - String host, - MemberAssignment assignment) { + MemberDescription(String memberId, + Optional groupInstanceId, + String clientId, + String host, + MemberAssignment assignment) { this.memberId = memberId == null ? "" : memberId; this.groupInstanceId = groupInstanceId; this.clientId = clientId == null ? "" : clientId; this.host = host == null ? "" : host; this.assignment = assignment == null ? - new MemberAssignment(Collections.emptySet()) : assignment; + new MemberAssignment(Collections.emptySet()) : assignment; }