Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
Expand Down
127 changes: 85 additions & 42 deletions core/src/main/scala/kafka/tools/GetOffsetShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package kafka.tools

import kafka.consumer._
import java.util.Properties

import joptsimple._
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.ByteArrayDeserializer

import scala.collection.JavaConverters._

object GetOffsetShell {

Expand All @@ -47,20 +50,20 @@ object GetOffsetShell {
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1)
val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
.defaultsTo(-1L)
parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")

if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.")

val options = parser.parse(args : _*)

Expand All @@ -69,41 +72,81 @@ object GetOffsetShell {
val clientId = "GetOffsetShell"
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser, brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topic = options.valueOf(topicOpt)
val partitionList = options.valueOf(partitionOpt)
val time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
Exit.exit(1)
}
val partitions =
if(partitionList == "") {
topicsMetadata.head.partitionsMetadata.map(_.partitionId)
} else {
partitionList.split(",").map(_.toInt).toSeq
}
partitions.foreach { partitionId =>
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
partitionMetadataOpt match {
case Some(metadata) =>
metadata.leader match {
case Some(leader) =>
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
val partitionIdsRequested: Set[Int] = {
val partitionsString = options.valueOf(partitionOpt)
if (partitionsString.isEmpty)
Set.empty
else
partitionsString.split(",").map { partitionString =>
try partitionString.toInt
catch {
case _: NumberFormatException =>
System.err.println(s"--partitions expects a comma separated list of numeric partition ids, but received: $partitionsString")
Exit.exit(1)
}
case None => System.err.println("Error: partition %d does not exist".format(partitionId))
}.toSet
}
val listOffsetsTimestamp = options.valueOf(timeOpt).longValue

val config = new Properties
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing consumer deserializer props

val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)

val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match {
case None =>
System.err.println(s"Topic $topic does not exist")
Exit.exit(1)
case Some(p) if p.isEmpty =>
if (partitionIdsRequested.isEmpty)
System.err.println(s"Topic $topic has 0 partitions")
else
System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}")
Exit.exit(1)
case Some(p) => p
}

if (partitionIdsRequested.nonEmpty) {
(partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId =>
System.err.println(s"Error: partition $partitionId does not exist")
}
}

val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p =>
if (p.leader == null) {
System.err.println(s"Error: partition ${p.partition} does not have a leader. Skip getting offsets")
None
} else
Some(new TopicPartition(p.topic, p.partition))
}

/* Note that the value of the map can be null */
val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match {
case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala
case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala
case _ =>
val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava
consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset)
}

partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
}

}

/**
* Return the partition infos for `topic`. If the topic does not exist, `None` is returned.
*/
private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer
if (partitionInfos.isEmpty)
None
else if (partitionIds.isEmpty)
Some(partitionInfos)
else
Some(partitionInfos.filter(p => partitionIds.contains(p.partition)))
}

}