Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ class AdminClient(val time: Time,
response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap
}

def getTopicListOffset(req: ListOffsetRequest.Builder, node: Node): Map[TopicPartition, ListOffsetResponse.PartitionData] = {
val responseBody = send(node, ApiKeys.LIST_OFFSETS, req)
responseBody.asInstanceOf[ListOffsetResponse].responseData().asScala.toMap
}


def getMetadata(req: MetadataRequest.Builder, node: Node): MetadataResponse = {
val responseBody = send(node, ApiKeys.METADATA, req)
responseBody.asInstanceOf[MetadataResponse]
}

def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
findAllBrokers.map { broker =>
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
Expand Down
163 changes: 99 additions & 64 deletions core/src/main/scala/kafka/tools/GetOffsetShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,127 @@
*/
package kafka.tools

import kafka.consumer._
import java.util.Properties
import joptsimple._
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.admin.AdminClient
import kafka.client.ClientUtils
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
import kafka.cluster.BrokerEndPoint
import kafka.utils.{CommandLineUtils, ToolsUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.requests.{ListOffsetRequest, MetadataRequest}
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.util.Random


object GetOffsetShell {

val clientId = "GetOffsetShell"

private def createAdminClient(props: Properties): AdminClient = {
AdminClient.create(props)
}
private def getNode(brokerEndPoint: BrokerEndPoint): Node = {
new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
}


def main(args: Array[String]): Unit = {
val parser = new OptionParser
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
.withRequiredArg
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions")
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
.defaultsTo("")
val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1)
val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)

if(args.length == 0)
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
.defaultsTo("")
val timeOpt = parser.accepts("time", " REQUIRED: timestamp of the offsets before that")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])

val commandConfigOpt = parser.accepts("properties", "Property file containing configs to be passed to Admin Client.")
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])

if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")

val options = parser.parse(args : _*)

CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt)
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)


val clientId = "GetOffsetShell"
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser, brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topic = options.valueOf(topicOpt)
var partitionList = options.valueOf(partitionOpt)
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
Exit.exit(1)
}
val partitions =
if(partitionList == "") {
topicsMetadata.head.partitionsMetadata.map(_.partitionId)
} else {
partitionList.split(",").map(_.toInt).toSeq
}
partitions.foreach { partitionId =>
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
partitionMetadataOpt match {
case Some(metadata) =>
metadata.leader match {
case Some(leader) =>
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))

val partitionList = options.valueOf(partitionOpt)
val time = options.valueOf(timeOpt).longValue
val commandConfig = if (options.has(commandConfigOpt)) {
Utils.loadProps(options.valueOf(commandConfigOpt))
} else new Properties()


commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val adminClient = createAdminClient(commandConfig)

val shuffledBrokers = Random.shuffle(metadataTargetBrokers)

val metadataRes = adminClient.getMetadata(new MetadataRequest.Builder(List(topic).asJava), getNode(shuffledBrokers(0)))

if(metadataRes.errors.containsKey(topic)){
metadataRes.errors().get(topic).exception()
}else{

val topicsPartitions = metadataRes.cluster().availablePartitionsForTopic(topic).asScala

val partitions =
if(partitionList == "") {
topicsPartitions.map(_.partition())
} else {
partitionList.split(",").map(_.toInt).toSeq
}

partitions.foreach { partitionId: Int =>
val partitionMetadata = topicsPartitions.toList.find(_.partition == partitionId)
partitionMetadata match {
case Some(metadata) => {

val partitions:java.util.Map[TopicPartition, java.lang.Long] = Map(new TopicPartition(metadata.topic(), metadata.partition()) ->
java.lang.Long.valueOf(time)).asJava

val request: ListOffsetRequest.Builder = ListOffsetRequest.Builder.forConsumer(true)
.setTargetTimes(partitions)

val listOffset= adminClient.getTopicListOffset(request,metadata.leader() )

listOffset.keys.foreach(topicPartition =>{
val data = listOffset.get(topicPartition).get

if (data.error.code() == Errors.NONE.code) {
println("%s:%d:%s".format(topic, partitionId, data.offset ))
} else {
val errormessage =Errors.forCode(data.error.code()).exception.getMessage
println(s"Attempt to fetch offsets for partition $topicPartition failed due to: $errormessage")
}
})

}
case None => System.err.println("Error: partition %d does not exist".format(partitionId))
}
}
}

}
}

4 changes: 2 additions & 2 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,12 @@ def is_registered(self, node):
self.logger.debug("Broker info: %s", broker_info)
return broker_info is not None

def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
def get_offset_shell(self, topic, partitions, time):
node = self.nodes[0]

cmd = self.path.script("kafka-run-class.sh", node)
cmd += " kafka.tools.GetOffsetShell"
cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time)
cmd += " --topic %s --broker-list %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), time)

if partitions:
cmd += ' --partitions %s' % partitions
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/tests/core/get_offset_shell_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
self.start_producer()

# Assert that offset fetched without any consumers consuming is 0
assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
assert self.kafka.get_offset_shell(TOPIC, None, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)

self.start_consumer(security_protocol)

Expand All @@ -89,5 +89,5 @@ def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")

# Assert that offset is correctly indicated by GetOffsetShell tool
wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, -1), timeout_sec=10,
err_msg="Timed out waiting to reach expected offset.")