diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index b8b2ad60e3776..0a59a18d05122 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -26,9 +26,9 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.requests.ListOffsetRequest import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ -import scala.collection.Seq object GetOffsetShell { @@ -52,6 +52,12 @@ object GetOffsetShell { .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) .defaultsTo(-1L) + + val commandConfigOpt = parser.accepts("command.config", "Command config properties file(SSL etc).") + .withOptionalArg() + .describedAs("config file") + .ofType(classOf[String]) + parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") .withRequiredArg .describedAs("count") @@ -89,8 +95,12 @@ object GetOffsetShell { }.toSet } val listOffsetsTimestamp = options.valueOf(timeOpt).longValue - - val config = new Properties + + val config = if (options.has(commandConfigOpt)) + Utils.loadProps(options.valueOf(commandConfigOpt)) + else + new Properties + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) @@ -128,9 +138,7 @@ object GetOffsetShell { case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala case _ => val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava - consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) => - if (x == null) (k, null) else (k, x.offset: java.lang.Long) - } + consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset) } partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>