Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/consumer/BaseConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package kafka.consumer

import java.util.Properties
import java.util.regex.Pattern

import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener

/**
* A base consumer used to abstract both old and new consumer
Expand All @@ -33,13 +36,19 @@ trait BaseConsumer {

case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte])

class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
consumer.subscribe(List(topic))
if (topic.isDefined)
consumer.subscribe(List(topic.get))
else if (whitelist.isDefined)
consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
else
throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this happen with the check in ConsoleConsumer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is actually redundant, as it is also happening in ConsoleConsumer. However, the check just seemed to be natural here. Let me know if you are in favor of removing it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally thinking if it should not ever happen let's remove it, after another thought I feel it may be OK to leave it as is.


var recordIter = consumer.poll(0).iterator

override def receive(): BaseConsumerRecord = {
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object ConsoleConsumer extends Logging {
val consumer =
if (conf.useNewConsumer) {
val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf), timeoutMs)
new NewShinyConsumer(Option(conf.topicArg), Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs)
} else {
checkZk(conf)
new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
Expand Down Expand Up @@ -243,11 +243,14 @@ object ConsoleConsumer extends Logging {
// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.
var topicArg: String = null
var whitelistArg: String = null
var filterSpec: TopicFilter = null
if (useNewConsumer) {
if (!options.has(topicIdOpt))
CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.")
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.")
topicArg = options.valueOf(topicIdOpt)
whitelistArg = options.valueOf(whitelistOpt)
} else {
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
Expand Down