From 611481c0435d6123a639e56f1a1edd58fc947a45 Mon Sep 17 00:00:00 2001 From: "Mohammed Amine GARMES (contractor)" Date: Fri, 21 Apr 2017 21:46:06 +0200 Subject: [PATCH] kafka 3355 --- .../main/scala/kafka/admin/AdminClient.scala | 11 ++ .../scala/kafka/tools/GetOffsetShell.scala | 163 +++++++++++------- tests/kafkatest/services/kafka/kafka.py | 4 +- .../tests/core/get_offset_shell_test.py | 4 +- 4 files changed, 114 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 4d218c12e7640..95e70e769a3c7 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -186,6 +186,17 @@ class AdminClient(val time: Time, response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap } + def getTopicListOffset(req: ListOffsetRequest.Builder, node: Node): Map[TopicPartition, ListOffsetResponse.PartitionData] = { + val responseBody = send(node, ApiKeys.LIST_OFFSETS, req) + responseBody.asInstanceOf[ListOffsetResponse].responseData().asScala.toMap + } + + + def getMetadata(req: MetadataRequest.Builder, node: Node): MetadataResponse = { + val responseBody = send(node, ApiKeys.METADATA, req) + responseBody.asInstanceOf[MetadataResponse] + } + def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] = findAllBrokers.map { broker => broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava)) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9a19b1fa9195d..8d3661bc59125 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -18,92 +18,127 @@ */ package kafka.tools -import kafka.consumer._ +import java.util.Properties import joptsimple._ -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} -import kafka.common.TopicAndPartition +import kafka.admin.AdminClient import kafka.client.ClientUtils -import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.cluster.BrokerEndPoint +import kafka.utils.{CommandLineUtils, ToolsUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.requests.{ListOffsetRequest, MetadataRequest} +import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConverters._ +import scala.util.Random object GetOffsetShell { + val clientId = "GetOffsetShell" + + private def createAdminClient(props: Properties): AdminClient = { + AdminClient.create(props) + } + private def getNode(brokerEndPoint: BrokerEndPoint): Node = { + new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + } + + def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions") - .withRequiredArg - .describedAs("partition ids") - .ofType(classOf[String]) - .defaultsTo("") - val timeOpt = parser.accepts("time", "timestamp of the offsets before that") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1) - val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) - - if(args.length == 0) + .withRequiredArg + .describedAs("partition ids") + .ofType(classOf[String]) + .defaultsTo("") + val timeOpt = parser.accepts("time", " REQUIRED: timestamp of the offsets before that") + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + + val commandConfigOpt = parser.accepts("properties", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg + .describedAs("command config property file") + .ofType(classOf[String]) + + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") val options = parser.parse(args : _*) - CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt) + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt) + - val clientId = "GetOffsetShell" val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) - var partitionList = options.valueOf(partitionOpt) - var time = options.valueOf(timeOpt).longValue - val nOffsets = options.valueOf(nOffsetsOpt).intValue - val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() - - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) { - System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + - "kafka-list-topic.sh to verify") - Exit.exit(1) - } - val partitions = - if(partitionList == "") { - topicsMetadata.head.partitionsMetadata.map(_.partitionId) - } else { - partitionList.split(",").map(_.toInt).toSeq - } - partitions.foreach { partitionId => - val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId) - partitionMetadataOpt match { - case Some(metadata) => - metadata.leader match { - case Some(leader) => - val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) - val topicAndPartition = TopicAndPartition(topic, partitionId) - val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) - val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets - - println("%s:%d:%s".format(topic, partitionId, offsets.mkString(","))) - case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId)) + + val partitionList = options.valueOf(partitionOpt) + val time = options.valueOf(timeOpt).longValue + val commandConfig = if (options.has(commandConfigOpt)) { + Utils.loadProps(options.valueOf(commandConfigOpt)) + } else new Properties() + + + commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val adminClient = createAdminClient(commandConfig) + + val shuffledBrokers = Random.shuffle(metadataTargetBrokers) + + val metadataRes = adminClient.getMetadata(new MetadataRequest.Builder(List(topic).asJava), getNode(shuffledBrokers(0))) + + if(metadataRes.errors.containsKey(topic)){ + metadataRes.errors().get(topic).exception() + }else{ + + val topicsPartitions = metadataRes.cluster().availablePartitionsForTopic(topic).asScala + + val partitions = + if(partitionList == "") { + topicsPartitions.map(_.partition()) + } else { + partitionList.split(",").map(_.toInt).toSeq + } + + partitions.foreach { partitionId: Int => + val partitionMetadata = topicsPartitions.toList.find(_.partition == partitionId) + partitionMetadata match { + case Some(metadata) => { + + val partitions:java.util.Map[TopicPartition, java.lang.Long] = Map(new TopicPartition(metadata.topic(), metadata.partition()) -> + java.lang.Long.valueOf(time)).asJava + + val request: ListOffsetRequest.Builder = ListOffsetRequest.Builder.forConsumer(true) + .setTargetTimes(partitions) + + val listOffset= adminClient.getTopicListOffset(request,metadata.leader() ) + + listOffset.keys.foreach(topicPartition =>{ + val data = listOffset.get(topicPartition).get + + if (data.error.code() == Errors.NONE.code) { + println("%s:%d:%s".format(topic, partitionId, data.offset )) + } else { + val errormessage =Errors.forCode(data.error.code()).exception.getMessage + println(s"Attempt to fetch offsets for partition $topicPartition failed due to: $errormessage") + } + }) + } - case None => System.err.println("Error: partition %d does not exist".format(partitionId)) + } } } + } } + diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 7e4a5899e6c48..320d1534adf02 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -578,12 +578,12 @@ def is_registered(self, node): self.logger.debug("Broker info: %s", broker_info) return broker_info is not None - def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time): + def get_offset_shell(self, topic, partitions, time): node = self.nodes[0] cmd = self.path.script("kafka-run-class.sh", node) cmd += " kafka.tools.GetOffsetShell" - cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time) + cmd += " --topic %s --broker-list %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), time) if partitions: cmd += ' --partitions %s' % partitions diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py index e45365d32cf31..d139f766e2c13 100644 --- a/tests/kafkatest/tests/core/get_offset_shell_test.py +++ b/tests/kafkatest/tests/core/get_offset_shell_test.py @@ -80,7 +80,7 @@ def test_get_offset_shell(self, security_protocol='PLAINTEXT'): self.start_producer() # Assert that offset fetched without any consumers consuming is 0 - assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0) + assert self.kafka.get_offset_shell(TOPIC, None, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0) self.start_consumer(security_protocol) @@ -89,5 +89,5 @@ def test_get_offset_shell(self, security_protocol='PLAINTEXT'): wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") # Assert that offset is correctly indicated by GetOffsetShell tool - wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10, + wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, -1), timeout_sec=10, err_msg="Timed out waiting to reach expected offset.") \ No newline at end of file