From 64707c8ed1d8a98370eeaf697ae35851aa85212e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 21 Sep 2015 10:46:47 -0700 Subject: [PATCH 1/6] KAFKA-2490: support new consumer in ConsumerGroupCommand --- .../kafka/admin/ConsumerGroupCommand.scala | 106 +++++++++++++----- 1 file changed, 79 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index a30c12ddbb7e2..125ac7e16a615 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,21 +18,22 @@ package kafka.admin -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import kafka.common._ import java.util.Properties + +import joptsimple.{OptionParser, OptionSpec} +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, OffsetFetchRequest, OffsetFetchResponse} import kafka.client.ClientUtils -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest} -import org.I0Itec.zkclient.exception.ZkNoNodeException -import kafka.common.TopicAndPartition -import joptsimple.{OptionSpec, OptionParser} -import scala.collection.{Set, mutable} +import kafka.common.{TopicAndPartition, _} import kafka.consumer.SimpleConsumer -import collection.JavaConversions._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.security.JaasUtils +import scala.collection.JavaConversions._ +import scala.collection.{Set, mutable} object ConsumerGroupCommand { @@ -83,7 +84,7 @@ object ConsumerGroupCommand { if (topics.isEmpty) { println("No topic available for consumer group provided") } - topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { @@ -152,7 +153,8 @@ object ConsumerGroupCommand { group: String, topic: String, channelSocketTimeoutMs: Int, - channelRetryBackoffMs: Int) { + channelRetryBackoffMs: Int, + opts: ConsumerGroupCommandOptions) { val topicPartitions = getTopicPartitions(zkUtils, topic) val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) println("%s, %s, %s, %s, %s, %s, %s" @@ -160,7 +162,7 @@ object ConsumerGroupCommand { topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => - describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition)) + describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts) } } @@ -208,30 +210,78 @@ object ConsumerGroupCommand { group: String, topic: String, partition: Int, - offsetOpt: Option[Long]) { - val topicAndPartition = TopicAndPartition(topic, partition) + offsetOpt: Option[Long], + opts: ConsumerGroupCommandOptions) { + val topicPartition = new TopicPartition(topic, partition) val groupDirs = new ZKGroupTopicDirs(group, topic) val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 + def print(logEndOffset: Long): Unit = { + val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) + println("%s, %s, %s, %s, %s, %s, %s" + .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) + } zkUtils.getLeaderForPartition(topic, partition) match { case Some(-1) => println("%s, %s, %s, %s, %s, %s, %s" .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) case Some(brokerId) => - val consumerOpt = getConsumer(zkUtils, brokerId) - consumerOpt match { - case Some(consumer) => - val request = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - consumer.close() - - val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) - case None => // ignore + if (opts.options.has(opts.newConsumerOpt)) { + val consumerOpt = getNewConsumer(zkUtils, brokerId) + consumerOpt match { + case Some(consumer) => + consumer.assign(List(topicPartition)) + consumer.seekToEnd(topicPartition) + val logEndOffset = consumer.position(topicPartition) + consumer.close() + print(logEndOffset) + case None => // ignore + } + } else { + val consumerOpt = getConsumer(zkUtils, brokerId) + consumerOpt match { + case Some(consumer) => + val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition()) + val request = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + consumer.close() + print(logEndOffset) + case None => // ignore + } } case None => - println("No broker for partition %s".format(topicAndPartition)) + println("No broker for partition %s".format(topicPartition)) + } + } + + private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String, String]] = { + try { + zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + val properties: Properties = new Properties() + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port) + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand") + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + Some(new KafkaConsumer[String, String](properties)) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getMessage) + None } } @@ -274,6 +324,7 @@ object ConsumerGroupCommand { "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + "WARNING: Only does deletions on consumer groups that are not active." + val NewConsumerDoc = "Use new consumer." val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg @@ -294,6 +345,7 @@ object ConsumerGroupCommand { val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) + val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc) val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) From 822471bfb2f0b6d38de409ed9388542aa51ff67b Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 4 Nov 2015 12:26:34 -0800 Subject: [PATCH 2/6] Use AdminUtils to get group info on new-consumer --- .../kafka/admin/ConsumerGroupCommand.scala | 56 ++++++++++++++++--- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 125ac7e16a615..c0e0825a9c5b2 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -57,7 +57,7 @@ object ConsumerGroupCommand { try { if (opts.options.has(opts.listOpt)) - list(zkUtils) + list(zkUtils, opts) else if (opts.options.has(opts.describeOpt)) describe(zkUtils, opts) else if (opts.options.has(opts.deleteOpt)) @@ -71,20 +71,43 @@ object ConsumerGroupCommand { } } - def list(zkUtils: ZkUtils) { - zkUtils.getConsumerGroups().foreach(println) + def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { + val useNewConsumer = opts.options.has(opts.newConsumerOpt) + if (!useNewConsumer) + zkUtils.getConsumerGroups().foreach(println) + else { + val adminClient = createAndGetAdminClient(opts) + adminClient.listAllGroupsFlattened().foreach(x => println(x.groupId)) + } + } + + def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = { + AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt)) } def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { + val useNewConsumer = opts.options.has(opts.newConsumerOpt) + val group = opts.options.valueOf(opts.groupOpt) val configs = parseConfigs(opts) val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt - val group = opts.options.valueOf(opts.groupOpt) - val topics = zkUtils.getTopicsByConsumerGroup(group) - if (topics.isEmpty) { + def warnNoTopicsForGroupFound: Unit = { println("No topic available for consumer group provided") } - topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) + if (!useNewConsumer) { + val topics = zkUtils.getTopicsByConsumerGroup(group) + if (topics.isEmpty) { + warnNoTopicsForGroupFound + } + topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) + } else { + val groupAndTopicPartitions: Map[String, List[TopicPartition]] = createAndGetAdminClient(opts).describeConsumerGroup(group) + if (groupAndTopicPartitions.isEmpty) + warnNoTopicsForGroupFound + groupAndTopicPartitions.foreach(x => + describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())))) + + } } def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { @@ -156,6 +179,10 @@ object ConsumerGroupCommand { channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions) { val topicPartitions = getTopicPartitions(zkUtils, topic) + describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions) + } + + def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition]): Unit = { val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) println("%s, %s, %s, %s, %s, %s, %s" .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) @@ -311,6 +338,7 @@ object ConsumerGroupCommand { class ConsumerGroupCommandOptions(args: Array[String]) { val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over." + val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to." val GroupDoc = "The consumer group we wish to act on." val TopicDoc = "The topic whose consumer group information should be deleted." val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600" @@ -323,13 +351,18 @@ object ConsumerGroupCommand { "information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl + "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + - "WARNING: Only does deletions on consumer groups that are not active." + "WARNING: Only does deletions on consumer groups that are not active." + nl + + "WARNING: Does not work for new-consumer, as group deletion happens automatically once a group has no active member left in new-consumer." val NewConsumerDoc = "Use new consumer." val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc) + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) val groupOpt = parser.accepts("group", GroupDoc) .withRequiredArg .describedAs("consumer group") @@ -352,7 +385,12 @@ object ConsumerGroupCommand { def checkArgs() { // check required args - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (options.has(newConsumerOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) + if (options.has(deleteOpt)) + CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt, newConsumerOpt)) + } else + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) if (options.has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) From 044230aca548f57472c5d7088488d03c01256ac6 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 4 Nov 2015 15:43:40 -0800 Subject: [PATCH 3/6] Get owner information from MemberMetadata for new-consumers. Address a few reciew comments. --- .../main/scala/kafka/admin/AdminClient.scala | 6 +- .../kafka/admin/ConsumerGroupCommand.scala | 61 +++++++++++++------ 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ddd3114934e00..e8bb96733103b 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -19,9 +19,9 @@ import kafka.common.KafkaException import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} import kafka.utils.Logging import org.apache.kafka.clients._ -import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture} +import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException} import org.apache.kafka.common.config.ConfigDef.{Importance, Type} -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.DisconnectException import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector @@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{SystemTime, Time, Utils} -import org.apache.kafka.common.{TopicPartition, Cluster, Node} +import org.apache.kafka.common.{Cluster, Node, TopicPartition} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index c0e0825a9c5b2..fed16761aa6aa 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,21 +18,26 @@ package kafka.admin +import java.nio.ByteBuffer import java.util.Properties import joptsimple.{OptionParser, OptionSpec} -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, OffsetFetchRequest, OffsetFetchResponse} +import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} import kafka.client.ClientUtils import kafka.common.{TopicAndPartition, _} import kafka.consumer.SimpleConsumer +import kafka.coordinator.{GroupSummary, MemberSummary} import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.{Set, mutable} object ConsumerGroupCommand { @@ -101,12 +106,31 @@ object ConsumerGroupCommand { } topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } else { - val groupAndTopicPartitions: Map[String, List[TopicPartition]] = createAndGetAdminClient(opts).describeConsumerGroup(group) + val groupSummary: GroupSummary = createAndGetAdminClient(opts).describeGroup(group) + if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE) + println(s"Group ${group} is not a consumer group") + + val members: List[MemberSummary] = groupSummary.members + val owners: Map[TopicPartition, String] = members.flatMap { + case member => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + val partitions = assignment.partitions().asScala.toList + partitions.map { + case partition: TopicPartition => + partition -> "%s_%s".format(member.memberId, member.clientHost) + }.toMap + }.toMap + + val groupAndTopicPartitions: Map[String, List[TopicPartition]] = members.map { + case member => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + member.memberId -> assignment.partitions().toList + }.toMap + if (groupAndTopicPartitions.isEmpty) warnNoTopicsForGroupFound groupAndTopicPartitions.foreach(x => - describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())))) - + describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners)) } } @@ -182,14 +206,14 @@ object ConsumerGroupCommand { describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions) } - def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition]): Unit = { + def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owners: Map[TopicPartition, String] = null): Unit = { val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) println("%s, %s, %s, %s, %s, %s, %s" .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => - describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts) + describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owners) } } @@ -238,10 +262,12 @@ object ConsumerGroupCommand { topic: String, partition: Int, offsetOpt: Option[Long], - opts: ConsumerGroupCommandOptions) { + opts: ConsumerGroupCommandOptions, + owners: Map[TopicPartition, String] = null) { val topicPartition = new TopicPartition(topic, partition) val groupDirs = new ZKGroupTopicDirs(group, topic) - val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 + val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt) + val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 def print(logEndOffset: Long): Unit = { val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) println("%s, %s, %s, %s, %s, %s, %s" @@ -252,7 +278,7 @@ object ConsumerGroupCommand { println("%s, %s, %s, %s, %s, %s, %s" .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) case Some(brokerId) => - if (opts.options.has(opts.newConsumerOpt)) { + if (useNewConsumer) { val consumerOpt = getNewConsumer(zkUtils, brokerId) consumerOpt match { case Some(consumer) => @@ -264,7 +290,7 @@ object ConsumerGroupCommand { case None => // ignore } } else { - val consumerOpt = getConsumer(zkUtils, brokerId) + val consumerOpt = getZkConsumer(zkUtils, brokerId) consumerOpt match { case Some(consumer) => val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition()) @@ -290,14 +316,14 @@ object ConsumerGroupCommand { val brokerInfo = m.asInstanceOf[Map[String, Any]] val host = brokerInfo.get("host").get.asInstanceOf[String] val port = brokerInfo.get("port").get.asInstanceOf[Int] + val deserializer: String = (new StringDeserializer).getClass.getName val properties: Properties = new Properties() properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port) properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand") - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") - properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) Some(new KafkaConsumer[String, String](properties)) case None => throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) @@ -312,7 +338,7 @@ object ConsumerGroupCommand { } } - private def getConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = { + private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = { try { zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { case Some(brokerInfoString) => @@ -351,8 +377,7 @@ object ConsumerGroupCommand { "information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl + "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + - "WARNING: Only does deletions on consumer groups that are not active." + nl + - "WARNING: Does not work for new-consumer, as group deletion happens automatically once a group has no active member left in new-consumer." + "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active." val NewConsumerDoc = "Use new consumer." val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) From 3ddcd91264ad54c8c5b546363c03306d6dbb895d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 4 Nov 2015 16:32:51 -0800 Subject: [PATCH 4/6] Avoid printing header for group description in case of new consumer --- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index fed16761aa6aa..454c78b963ff5 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -99,6 +99,10 @@ object ConsumerGroupCommand { def warnNoTopicsForGroupFound: Unit = { println("No topic available for consumer group provided") } + + println("%s, %s, %s, %s, %s, %s, %s" + .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) + if (!useNewConsumer) { val topics = zkUtils.getTopicsByConsumerGroup(group) if (topics.isEmpty) { @@ -208,8 +212,6 @@ object ConsumerGroupCommand { def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owners: Map[TopicPartition, String] = null): Unit = { val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) - println("%s, %s, %s, %s, %s, %s, %s" - .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => From 398a489204339bd7b59fe4e03fd5b69ace6854da Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 4 Nov 2015 23:27:28 -0800 Subject: [PATCH 5/6] Move reusable code to AdminClient --- .../main/scala/kafka/admin/AdminClient.scala | 29 ++++++++++++++++-- .../kafka/admin/ConsumerGroupCommand.scala | 30 ++----------------- .../kafka/api/AdminClientTest.scala | 2 +- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index e8bb96733103b..93f6d9cb84f3e 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -147,10 +147,21 @@ class AdminClient(val time: Time, GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) } - def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = { + def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]]) = { val group = describeGroup(groupId) + try { + val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group) + val owners = getOwners(group) + (owners, membersAndTopicPartitions) + } catch { + case (ex: scala.IllegalArgumentException) => + throw new scala.IllegalArgumentException(s"Group ${groupId} is not a consumer group.") + } + } + + def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]] = { if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group") + throw new scala.IllegalArgumentException(s"${group} is not a valid GroupSummary") group.members.map { case member => @@ -159,6 +170,20 @@ class AdminClient(val time: Time, }.toMap } + def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = { + if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE) + throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary") + + groupSummary.members.flatMap { + case member => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + val partitions = assignment.partitions().asScala.toList + partitions.map { + case partition: TopicPartition => + partition -> "%s_%s".format(member.memberId, member.clientHost) + }.toMap + }.toMap + } } object AdminClient { diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 454c78b963ff5..91dc4e3e0414c 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,7 +18,6 @@ package kafka.admin -import java.nio.ByteBuffer import java.util.Properties import joptsimple.{OptionParser, OptionSpec} @@ -26,10 +25,8 @@ import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, Partit import kafka.client.ClientUtils import kafka.common.{TopicAndPartition, _} import kafka.consumer.SimpleConsumer -import kafka.coordinator.{GroupSummary, MemberSummary} import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNoNodeException -import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.security.JaasUtils @@ -37,7 +34,6 @@ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import scala.collection.{Set, mutable} object ConsumerGroupCommand { @@ -82,7 +78,7 @@ object ConsumerGroupCommand { zkUtils.getConsumerGroups().foreach(println) else { val adminClient = createAndGetAdminClient(opts) - adminClient.listAllGroupsFlattened().foreach(x => println(x.groupId)) + adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) } } @@ -110,31 +106,11 @@ object ConsumerGroupCommand { } topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } else { - val groupSummary: GroupSummary = createAndGetAdminClient(opts).describeGroup(group) - if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - println(s"Group ${group} is not a consumer group") - - val members: List[MemberSummary] = groupSummary.members - val owners: Map[TopicPartition, String] = members.flatMap { - case member => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - val partitions = assignment.partitions().asScala.toList - partitions.map { - case partition: TopicPartition => - partition -> "%s_%s".format(member.memberId, member.clientHost) - }.toMap - }.toMap - - val groupAndTopicPartitions: Map[String, List[TopicPartition]] = members.map { - case member => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - member.memberId -> assignment.partitions().toList - }.toMap + val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group) if (groupAndTopicPartitions.isEmpty) warnNoTopicsForGroupFound - groupAndTopicPartitions.foreach(x => - describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners)) + groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners)) } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index 97b49dd9029e4..063020e452c45 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -105,7 +105,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { !consumers(0).assignment().isEmpty }, "Expected non-empty assignment") - val assignment = client.describeConsumerGroup(groupId) + val (_, assignment) = client.describeConsumerGroup(groupId) assertEquals(1, assignment.size) for (partitions <- assignment.values) assertEquals(Set(tp, tp2), partitions.toSet) From 6941c158fae5629a7bd76b51eb38d459ef036bb2 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 4 Nov 2015 23:32:03 -0800 Subject: [PATCH 6/6] Some cleanup --- core/src/main/scala/kafka/admin/AdminClient.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 93f6d9cb84f3e..1c4aa52278e82 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -154,14 +154,14 @@ class AdminClient(val time: Time, val owners = getOwners(group) (owners, membersAndTopicPartitions) } catch { - case (ex: scala.IllegalArgumentException) => - throw new scala.IllegalArgumentException(s"Group ${groupId} is not a consumer group.") + case (ex: IllegalArgumentException) => + throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.") } } def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]] = { if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new scala.IllegalArgumentException(s"${group} is not a valid GroupSummary") + throw new IllegalArgumentException(s"${group} is not a valid GroupSummary") group.members.map { case member =>