diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 1c4aa52278e82..4618127c9ed4f 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -147,42 +147,22 @@ class AdminClient(val time: Time, GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) } - def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]]) = { - val group = describeGroup(groupId) - try { - val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group) - val owners = getOwners(group) - (owners, membersAndTopicPartitions) - } catch { - case (ex: IllegalArgumentException) => - throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.") - } - } + case class ConsumerSummary( + memberId: String, + clientId: String, + clientHost: String, + assignment: List[TopicPartition]) - def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]] = { + def describeConsumerGroup(groupId: String): List[ConsumerSummary] = { + val group = describeGroup(groupId) if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) throw new IllegalArgumentException(s"${group} is not a valid GroupSummary") group.members.map { case member => val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - member.memberId -> assignment.partitions().asScala.toList - }.toMap - } - - def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = { - if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary") - - groupSummary.members.flatMap { - case member => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - val partitions = assignment.partitions().asScala.toList - partitions.map { - case partition: TopicPartition => - partition -> "%s_%s".format(member.memberId, member.clientHost) - }.toMap - }.toMap + new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) + } } } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 91dc4e3e0414c..b682812a6bd38 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -106,11 +106,11 @@ object ConsumerGroupCommand { } topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } else { - val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group) + val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group) - if (groupAndTopicPartitions.isEmpty) + if (consumers.isEmpty) warnNoTopicsForGroupFound - groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners)) + consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost)))) } } @@ -186,12 +186,12 @@ object ConsumerGroupCommand { describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions) } - def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owners: Map[TopicPartition, String] = null): Unit = { + def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owner: Option[String] = None): Unit = { val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => - describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owners) + describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owner) } } @@ -241,11 +241,11 @@ object ConsumerGroupCommand { partition: Int, offsetOpt: Option[Long], opts: ConsumerGroupCommandOptions, - owners: Map[TopicPartition, String] = null) { + ownerOpt: Option[String] = None) { val topicPartition = new TopicPartition(topic, partition) val groupDirs = new ZKGroupTopicDirs(group, topic) val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt) - val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 + val owner: Option[String] = if (useNewConsumer) ownerOpt else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 def print(logEndOffset: Long): Unit = { val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) println("%s, %s, %s, %s, %s, %s, %s" diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index 5b8cbc25411a8..072f8eb981772 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -105,10 +105,9 @@ class AdminClientTest extends IntegrationTestHarness with Logging { !consumers(0).assignment().isEmpty }, "Expected non-empty assignment") - val (_, assignment) = client.describeConsumerGroup(groupId) - assertEquals(1, assignment.size) - for (partitions <- assignment.values) - assertEquals(Set(tp, tp2), partitions.toSet) + val consumerSummaries = client.describeConsumerGroup(groupId) + assertEquals(1, consumerSummaries.size) + assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet) } @Test