From 51f7cc6471cbf0e0cdbc350082980c59d8669e57 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 3 Nov 2015 14:42:21 -0800 Subject: [PATCH 1/2] KAFKA-2734: kafka-console-consumer throws NoSuchElementException on not specifying topic --- .../scala/kafka/tools/ConsoleConsumer.scala | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 639360c030433..039f128d9f88c 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -239,10 +239,22 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) val useNewConsumer = options.has(useNewConsumerOpt) - val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has) - val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - val topicArg = options.valueOf(topicOrFilterOpt.head) - val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) + + // If using old consumer, exactly one of whitelist/blacklist/topic is required. + // If using new consumer, topic must be specified. + var topicArg: String = null + var filterSpec: TopicFilter = null + if (useNewConsumer) { + if (!options.has(topicIdOpt)) + CommandLineUtils.printUsageAndDie(parser, "Topic must be specified.") + topicArg = options.valueOf(topicIdOpt) + } else { + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") + val topicArg = options.valueOf(topicOrFilterOpt.head) + filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) + } val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else @@ -262,9 +274,6 @@ object ConsoleConsumer extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt) - if (!useNewConsumer && topicOrFilterOpt.size != 1) - CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") - if (options.has(csvMetricsReporterEnabledOpt)) { val csvReporterProps = new Properties() csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") From 49c5ba9f80702466cb324d62064dc06eee31192a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 4 Nov 2015 16:48:18 -0800 Subject: [PATCH 2/2] Address test failure --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 039f128d9f88c..9f296bdad6f86 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -252,7 +252,7 @@ object ConsoleConsumer extends Logging { val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") - val topicArg = options.valueOf(topicOrFilterOpt.head) + topicArg = options.valueOf(topicOrFilterOpt.head) filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) } val consumerProps = if (options.has(consumerConfigOpt))