Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ import kafka.common.KafkaException
import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture}
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.DisconnectException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Cluster, Node}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -147,10 +147,21 @@ class AdminClient(val time: Time,
GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
}

def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = {
def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]]) = {
val group = describeGroup(groupId)
try {
val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group)
val owners = getOwners(group)
(owners, membersAndTopicPartitions)
} catch {
case (ex: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.")
}
}

def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]] = {
if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group")
throw new IllegalArgumentException(s"${group} is not a valid GroupSummary")

group.members.map {
case member =>
Expand All @@ -159,6 +170,20 @@ class AdminClient(val time: Time,
}.toMap
}

def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = {
if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary")

groupSummary.members.flatMap {
case member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
val partitions = assignment.partitions().asScala.toList
partitions.map {
case partition: TopicPartition =>
partition -> "%s_%s".format(member.memberId, member.clientHost)
}.toMap
}.toMap
}
}

object AdminClient {
Expand Down
173 changes: 133 additions & 40 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
package kafka.admin


import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import kafka.common._
import java.util.Properties

import joptsimple.{OptionParser, OptionSpec}
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest}
import org.I0Itec.zkclient.exception.ZkNoNodeException
import kafka.common.TopicAndPartition
import joptsimple.{OptionSpec, OptionParser}
import scala.collection.{Set, mutable}
import kafka.common.{TopicAndPartition, _}
import kafka.consumer.SimpleConsumer
import collection.JavaConversions._
import org.apache.kafka.common.utils.Utils
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils

import scala.collection.JavaConversions._
import scala.collection.{Set, mutable}

object ConsumerGroupCommand {

Expand All @@ -56,7 +58,7 @@ object ConsumerGroupCommand {

try {
if (opts.options.has(opts.listOpt))
list(zkUtils)
list(zkUtils, opts)
else if (opts.options.has(opts.describeOpt))
describe(zkUtils, opts)
else if (opts.options.has(opts.deleteOpt))
Expand All @@ -70,20 +72,46 @@ object ConsumerGroupCommand {
}
}

def list(zkUtils: ZkUtils) {
zkUtils.getConsumerGroups().foreach(println)
def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
val useNewConsumer = opts.options.has(opts.newConsumerOpt)
if (!useNewConsumer)
zkUtils.getConsumerGroups().foreach(println)
else {
val adminClient = createAndGetAdminClient(opts)
adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
}
}

def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = {
AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
}

def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
val useNewConsumer = opts.options.has(opts.newConsumerOpt)
val group = opts.options.valueOf(opts.groupOpt)
val configs = parseConfigs(opts)
val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
val group = opts.options.valueOf(opts.groupOpt)
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty) {
def warnNoTopicsForGroupFound: Unit = {
println("No topic available for consumer group provided")
}
topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))

println("%s, %s, %s, %s, %s, %s, %s"
.format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))

if (!useNewConsumer) {
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty) {
warnNoTopicsForGroupFound
}
topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts))
} else {
val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group)

if (groupAndTopicPartitions.isEmpty)
warnNoTopicsForGroupFound
groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners))
}
}

def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
Expand Down Expand Up @@ -152,15 +180,18 @@ object ConsumerGroupCommand {
group: String,
topic: String,
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int) {
channelRetryBackoffMs: Int,
opts: ConsumerGroupCommandOptions) {
val topicPartitions = getTopicPartitions(zkUtils, topic)
describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions)
}

def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owners: Map[TopicPartition, String] = null): Unit = {
val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
println("%s, %s, %s, %s, %s, %s, %s"
.format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
topicPartitions
.sortBy { case topicPartition => topicPartition.partition }
.foreach { topicPartition =>
describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owners)
}
}

Expand Down Expand Up @@ -208,34 +239,84 @@ object ConsumerGroupCommand {
group: String,
topic: String,
partition: Int,
offsetOpt: Option[Long]) {
val topicAndPartition = TopicAndPartition(topic, partition)
offsetOpt: Option[Long],
opts: ConsumerGroupCommandOptions,
owners: Map[TopicPartition, String] = null) {
val topicPartition = new TopicPartition(topic, partition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt)
val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
def print(logEndOffset: Long): Unit = {
val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
println("%s, %s, %s, %s, %s, %s, %s"
.format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
}
zkUtils.getLeaderForPartition(topic, partition) match {
case Some(-1) =>
println("%s, %s, %s, %s, %s, %s, %s"
.format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
case Some(brokerId) =>
val consumerOpt = getConsumer(zkUtils, brokerId)
consumerOpt match {
case Some(consumer) =>
val request =
OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
consumer.close()

val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
println("%s, %s, %s, %s, %s, %s, %s"
.format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
case None => // ignore
if (useNewConsumer) {
val consumerOpt = getNewConsumer(zkUtils, brokerId)
consumerOpt match {
case Some(consumer) =>
consumer.assign(List(topicPartition))
consumer.seekToEnd(topicPartition)
val logEndOffset = consumer.position(topicPartition)
consumer.close()
print(logEndOffset)
case None => // ignore
}
} else {
val consumerOpt = getZkConsumer(zkUtils, brokerId)
consumerOpt match {
case Some(consumer) =>
val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition())
val request =
OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
consumer.close()
print(logEndOffset)
case None => // ignore
}
}
case None =>
println("No broker for partition %s".format(topicAndPartition))
println("No broker for partition %s".format(topicPartition))
}
}

private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String, String]] = {
try {
zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
val deserializer: String = (new StringDeserializer).getClass.getName
val properties: Properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
Some(new KafkaConsumer[String, String](properties))
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
} catch {
case t: Throwable =>
println("Could not parse broker info due to " + t.getMessage)
None
}
}

private def getConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
try {
zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) =>
Expand All @@ -261,6 +342,7 @@ object ConsumerGroupCommand {
class ConsumerGroupCommandOptions(args: Array[String]) {
val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over."
val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to."
val GroupDoc = "The consumer group we wish to act on."
val TopicDoc = "The topic whose consumer group information should be deleted."
val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
Expand All @@ -273,12 +355,17 @@ object ConsumerGroupCommand {
"information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl +
"Pass in just a topic to delete the given topic's partition offsets and ownership information " +
"for every consumer group. For instance --topic t1" + nl +
"WARNING: Only does deletions on consumer groups that are not active."
"WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
val NewConsumerDoc = "Use new consumer."
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val groupOpt = parser.accepts("group", GroupDoc)
.withRequiredArg
.describedAs("consumer group")
Expand All @@ -294,13 +381,19 @@ object ConsumerGroupCommand {
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
val options = parser.parse(args : _*)

val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)

def checkArgs() {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
if (options.has(newConsumerOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (options.has(deleteOpt))
CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt, newConsumerOpt))
} else
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
if (options.has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
!consumers(0).assignment().isEmpty
}, "Expected non-empty assignment")

val assignment = client.describeConsumerGroup(groupId)
val (_, assignment) = client.describeConsumerGroup(groupId)
assertEquals(1, assignment.size)
for (partitions <- assignment.values)
assertEquals(Set(tp, tp2), partitions.toSet)
Expand Down