Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,22 @@ object ConsoleConsumer extends Logging {
var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
val useNewConsumer = options.has(useNewConsumerOpt)
val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has)
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
val topicArg = options.valueOf(topicOrFilterOpt.head)
val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)

// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.
var topicArg: String = null
var filterSpec: TopicFilter = null
if (useNewConsumer) {
if (!options.has(topicIdOpt))
CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.")
topicArg = options.valueOf(topicIdOpt)
} else {
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")
topicArg = options.valueOf(topicOrFilterOpt.head)
filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
}
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
Expand All @@ -262,9 +274,6 @@ object ConsoleConsumer extends Logging {

CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt)

if (!useNewConsumer && topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")

if (options.has(csvMetricsReporterEnabledOpt)) {
val csvReporterProps = new Properties()
csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
Expand Down