From 3092848bf677a594c171c482565d1c8f8007b70c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Jun 2018 01:13:46 -0700 Subject: [PATCH 1/5] MINOR: Use KafkaConsumer in GetOffsetShell This does the minimal amount of work so that the tool relies on public non-deprecated APIs (i.e. so that it doesn't rely on old clients code). Additional improvements have been proposed via KIP-308. There are a few other PRs that touch this class with overlapping goals: - https://github.com/apache/kafka/pull/2891 - https://github.com/apache/kafka/pull/3051 - https://github.com/apache/kafka/pull/3320 One of them remains relevant in the context of KIP-308, but the others are not. I included the authors of the 3 PRs as co-authors. Co-authored-by: Arseniy Tashoyan Co-authored-by: Vahid Hashemian Co-authored-by: Mohammed Amine GARMES Co-authored-by: Ismael Juma --- .../scala/kafka/tools/GetOffsetShell.scala | 110 +++++++++++------- 1 file changed, 70 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 4104dedb9e09a..8304c801a4e70 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -18,13 +18,15 @@ */ package kafka.tools -import kafka.consumer._ +import java.util.Properties + import joptsimple._ -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} -import kafka.common.TopicAndPartition -import kafka.client.ClientUtils import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.{PartitionInfo, TopicPartition} +import org.apache.kafka.common.requests.ListOffsetRequest +import scala.collection.JavaConverters._ object GetOffsetShell { @@ -47,19 +49,19 @@ object GetOffsetShell { .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) - .defaultsTo(-1) - val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") + .defaultsTo(-1L) + val nOffsetsOpt = parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") + val maxWaitMsOpt = parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - - if(args.length == 0) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") val options = parser.parse(args : _*) @@ -69,41 +71,69 @@ object GetOffsetShell { val clientId = "GetOffsetShell" val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser, brokerList) - val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) - val partitionList = options.valueOf(partitionOpt) + val partitionIdsRequested = options.valueOf(partitionOpt).split(",").map(_.toInt).toSet val time = options.valueOf(timeOpt).longValue - val nOffsets = options.valueOf(nOffsetsOpt).intValue - val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() - - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) { - System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + - "kafka-list-topic.sh to verify") - Exit.exit(1) + options.valueOf(nOffsetsOpt).intValue + options.valueOf(maxWaitMsOpt).intValue() + + val config = new Properties + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) + val consumer = new KafkaConsumer(config) + + val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match { + case None => + System.err.println(s"Topic $topic does not exist") + Exit.exit(1) + case Some(p) if p.isEmpty => + if (partitionIdsRequested.isEmpty) + System.err.println(s"Topic $topic has 0 partitions") + else + System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}") + Exit.exit(1) + case Some(p) => p } - val partitions = - if(partitionList == "") { - topicsMetadata.head.partitionsMetadata.map(_.partitionId) - } else { - partitionList.split(",").map(_.toInt).toSeq - } - partitions.foreach { partitionId => - val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId) - partitionMetadataOpt match { - case Some(metadata) => - metadata.leader match { - case Some(leader) => - val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) - val topicAndPartition = TopicAndPartition(topic, partitionId) - val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) - val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets - - println("%s:%d:%s".format(topic, partitionId, offsets.mkString(","))) - case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId)) - } - case None => System.err.println("Error: partition %d does not exist".format(partitionId)) + + if (partitionIdsRequested.nonEmpty) { + partitionIdsRequested.filterNot(partitionId => partitionInfos.exists(_.partition == partitionId)).foreach { partitionId => + s"Error: partition $partitionId does not exist" } } + + val topicPartitions = partitionInfos.flatMap { p => + if (p.leader == null) { + System.err.println(s"Error: partition ${p.partition} does not have a leader. Skip getting offsets") + None + } else + Some(new TopicPartition(p.topic, p.partition)) + } + + val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = time match { + case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala + case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala + case timestamp => + val timestampsToSearch = topicPartitions.map(tp => tp -> (timestamp: java.lang.Long)).toMap.asJava + consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(_.offset) + } + + partitionOffsets.foreach { case (tp, offset) => + println(s"$topic:${tp.partition}:$offset") + } + + } + + /** + * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + */ + private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { + val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer + if (partitionInfos.isEmpty) + None + else if (partitionIds.isEmpty) + Some(partitionInfos) + else + Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) } + } From aaa6c5c5dac9fe7061d2f12b5530745224866b1e Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Jun 2018 21:27:23 -0700 Subject: [PATCH 2/5] Minor clean-ups --- .../main/scala/kafka/tools/GetOffsetShell.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 8304c801a4e70..26131206ef40c 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -50,12 +50,12 @@ object GetOffsetShell { .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) - val nOffsetsOpt = parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") + parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") + parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) @@ -73,9 +73,7 @@ object GetOffsetShell { ToolsUtils.validatePortOrDie(parser, brokerList) val topic = options.valueOf(topicOpt) val partitionIdsRequested = options.valueOf(partitionOpt).split(",").map(_.toInt).toSet - val time = options.valueOf(timeOpt).longValue - options.valueOf(nOffsetsOpt).intValue - options.valueOf(maxWaitMsOpt).intValue() + val listOffsetsTimestamp = options.valueOf(timeOpt).longValue val config = new Properties config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) @@ -109,11 +107,11 @@ object GetOffsetShell { Some(new TopicPartition(p.topic, p.partition)) } - val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = time match { + val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala - case timestamp => - val timestampsToSearch = topicPartitions.map(tp => tp -> (timestamp: java.lang.Long)).toMap.asJava + case _ => + val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(_.offset) } From 7f424c3d422f062ca51069428a259e2189dec24a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 14 Jun 2018 07:58:55 -0700 Subject: [PATCH 3/5] Various fixes and improvements --- .../main/scala/kafka/server/KafkaApis.scala | 1 - .../scala/kafka/tools/GetOffsetShell.scala | 33 ++++++++++++++----- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cdd0d72e0c480..1202b1a2a5d1b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} -import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 26131206ef40c..6135378ebe7a3 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -25,6 +25,7 @@ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.requests.ListOffsetRequest +import org.apache.kafka.common.serialization.ByteArrayDeserializer import scala.collection.JavaConverters._ @@ -62,7 +63,7 @@ object GetOffsetShell { .defaultsTo(1000) if (args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") + CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.") val options = parser.parse(args : _*) @@ -72,13 +73,26 @@ object GetOffsetShell { val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser, brokerList) val topic = options.valueOf(topicOpt) - val partitionIdsRequested = options.valueOf(partitionOpt).split(",").map(_.toInt).toSet + val partitionIdsRequested: Set[Int] = { + val partitionsString = options.valueOf(partitionOpt) + if (partitionsString.isEmpty) + Set.empty + else + partitionsString.split(",").map { partitionString => + try partitionString.toInt + catch { + case e: NumberFormatException => + s"--partitions expected a comma separated list of numeric partition ids, but received: $partitionsString" + Exit.exit(1) + } + }.toSet + } val listOffsetsTimestamp = options.valueOf(timeOpt).longValue val config = new Properties config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) - val consumer = new KafkaConsumer(config) + val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match { case None => @@ -94,12 +108,12 @@ object GetOffsetShell { } if (partitionIdsRequested.nonEmpty) { - partitionIdsRequested.filterNot(partitionId => partitionInfos.exists(_.partition == partitionId)).foreach { partitionId => - s"Error: partition $partitionId does not exist" + (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId => + System.err.println(s"Error: partition $partitionId does not exist") } } - val topicPartitions = partitionInfos.flatMap { p => + val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p => if (p.leader == null) { System.err.println(s"Error: partition ${p.partition} does not have a leader. Skip getting offsets") None @@ -107,16 +121,17 @@ object GetOffsetShell { Some(new TopicPartition(p.topic, p.partition)) } + /* Note that the value of the map can be null */ val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala case _ => val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(_.offset) + consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset) } - partitionOffsets.foreach { case (tp, offset) => - println(s"$topic:${tp.partition}:$offset") + partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => + println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") } } From 3dc223a9f3c40d1a185064bac2657b02ae5a5206 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 14 Jun 2018 08:16:48 -0700 Subject: [PATCH 4/5] Add missing println --- core/src/main/scala/kafka/tools/GetOffsetShell.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 6135378ebe7a3..14b15a6daeed9 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -82,7 +82,7 @@ object GetOffsetShell { try partitionString.toInt catch { case e: NumberFormatException => - s"--partitions expected a comma separated list of numeric partition ids, but received: $partitionsString" + System.err.println(s"--partitions expected a comma separated list of numeric partition ids, but received: $partitionsString") Exit.exit(1) } }.toSet From 7e2b94375f5b3a366b6ca3d321fd159342fb1b90 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 14 Jun 2018 08:25:14 -0700 Subject: [PATCH 5/5] Minor wording fix --- core/src/main/scala/kafka/tools/GetOffsetShell.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 14b15a6daeed9..eafddc66de428 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -81,8 +81,8 @@ object GetOffsetShell { partitionsString.split(",").map { partitionString => try partitionString.toInt catch { - case e: NumberFormatException => - System.err.println(s"--partitions expected a comma separated list of numeric partition ids, but received: $partitionsString") + case _: NumberFormatException => + System.err.println(s"--partitions expects a comma separated list of numeric partition ids, but received: $partitionsString") Exit.exit(1) } }.toSet