Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/main/scala/kafka/tools/GetOffsetShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils

import scala.collection.JavaConverters._
import scala.collection.Seq

object GetOffsetShell {

Expand All @@ -52,6 +52,12 @@ object GetOffsetShell {
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)

val commandConfigOpt = parser.accepts("command.config", "Command config properties file(SSL etc).")
.withOptionalArg()
.describedAs("config file")
.ofType(classOf[String])

parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned")
.withRequiredArg
.describedAs("count")
Expand Down Expand Up @@ -89,8 +95,12 @@ object GetOffsetShell {
}.toSet
}
val listOffsetsTimestamp = options.valueOf(timeOpt).longValue

val config = new Properties

val config = if (options.has(commandConfigOpt))
Utils.loadProps(options.valueOf(commandConfigOpt))
else
new Properties

config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
Expand Down Expand Up @@ -128,9 +138,7 @@ object GetOffsetShell {
case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala
case _ =>
val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava
consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) =>
if (x == null) (k, null) else (k, x.offset: java.lang.Long)
}
consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset)
}

partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
Expand Down