diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 639360c030433..9f296bdad6f86 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -239,10 +239,22 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) val useNewConsumer = options.has(useNewConsumerOpt) - val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has) - val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - val topicArg = options.valueOf(topicOrFilterOpt.head) - val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) + + // If using old consumer, exactly one of whitelist/blacklist/topic is required. + // If using new consumer, topic must be specified. + var topicArg: String = null + var filterSpec: TopicFilter = null + if (useNewConsumer) { + if (!options.has(topicIdOpt)) + CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.") + topicArg = options.valueOf(topicIdOpt) + } else { + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") + topicArg = options.valueOf(topicOrFilterOpt.head) + filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) + } val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else @@ -262,9 +274,6 @@ object ConsoleConsumer extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt) - if (!useNewConsumer && topicOrFilterOpt.size != 1) - CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") - if (options.has(csvMetricsReporterEnabledOpt)) { val csvReporterProps = new Properties() csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")