diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index 52cd5fa1e5f28..ced43917d724c 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -18,6 +18,9 @@ package kafka.consumer import java.util.Properties +import java.util.regex.Pattern + +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener /** * A base consumer used to abstract both old and new consumer @@ -33,13 +36,19 @@ trait BaseConsumer { case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte]) -class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer { +class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer { import org.apache.kafka.clients.consumer.KafkaConsumer -import scala.collection.JavaConversions._ + import scala.collection.JavaConversions._ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) - consumer.subscribe(List(topic)) + if (topic.isDefined) + consumer.subscribe(List(topic.get)) + else if (whitelist.isDefined) + consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener()) + else + throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.") + var recordIter = consumer.poll(0).iterator override def receive(): BaseConsumerRecord = { diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 9f296bdad6f86..2b1a69a14aac0 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -52,7 +52,7 @@ object ConsoleConsumer extends Logging { val consumer = if (conf.useNewConsumer) { val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue - new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs) + new NewShinyConsumer(Option(conf.topicArg), Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs) } else { checkZk(conf) new OldConsumer(conf.filterSpec, getOldConsumerProps(conf)) @@ -243,11 +243,14 @@ object ConsoleConsumer extends Logging { // If using old consumer, exactly one of whitelist/blacklist/topic is required. // If using new consumer, topic must be specified. var topicArg: String = null + var whitelistArg: String = null var filterSpec: TopicFilter = null if (useNewConsumer) { - if (!options.has(topicIdOpt)) - CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.") + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.") topicArg = options.valueOf(topicIdOpt) + whitelistArg = options.valueOf(whitelistOpt) } else { val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1)