Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 9 additions & 29 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,42 +147,22 @@ class AdminClient(val time: Time,
GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
}

def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]]) = {
val group = describeGroup(groupId)
try {
val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group)
val owners = getOwners(group)
(owners, membersAndTopicPartitions)
} catch {
case (ex: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.")
}
}
case class ConsumerSummary(
memberId: String,
clientId: String,
clientHost: String,
assignment: List[TopicPartition])

def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]] = {
def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
val group = describeGroup(groupId)
if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"${group} is not a valid GroupSummary")

group.members.map {
case member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
member.memberId -> assignment.partitions().asScala.toList
}.toMap
}

def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = {
if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary")

groupSummary.members.flatMap {
case member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
val partitions = assignment.partitions().asScala.toList
partitions.map {
case partition: TopicPartition =>
partition -> "%s_%s".format(member.memberId, member.clientHost)
}.toMap
}.toMap
new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
}
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ object ConsumerGroupCommand {
}
topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts))
} else {
val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group)
val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group)

if (groupAndTopicPartitions.isEmpty)
if (consumers.isEmpty)
warnNoTopicsForGroupFound
groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners))
consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost))))
}
}

Expand Down Expand Up @@ -186,12 +186,12 @@ object ConsumerGroupCommand {
describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions)
}

def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owners: Map[TopicPartition, String] = null): Unit = {
def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owner: Option[String] = None): Unit = {
val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
topicPartitions
.sortBy { case topicPartition => topicPartition.partition }
.foreach { topicPartition =>
describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owners)
describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owner)
}
}

Expand Down Expand Up @@ -241,11 +241,11 @@ object ConsumerGroupCommand {
partition: Int,
offsetOpt: Option[Long],
opts: ConsumerGroupCommandOptions,
owners: Map[TopicPartition, String] = null) {
ownerOpt: Option[String] = None) {
val topicPartition = new TopicPartition(topic, partition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt)
val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
val owner: Option[String] = if (useNewConsumer) ownerOpt else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
def print(logEndOffset: Long): Unit = {
val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
println("%s, %s, %s, %s, %s, %s, %s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,9 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
!consumers(0).assignment().isEmpty
}, "Expected non-empty assignment")

val (_, assignment) = client.describeConsumerGroup(groupId)
assertEquals(1, assignment.size)
for (partitions <- assignment.values)
assertEquals(Set(tp, tp2), partitions.toSet)
val consumerSummaries = client.describeConsumerGroup(groupId)
assertEquals(1, consumerSummaries.size)
assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet)
}

@Test
Expand Down