diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index e9a43f2bf422b..89536400e1cd9 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -103,6 +103,10 @@ object ConsoleConsumer extends Logging { consumer.stop() shutdownLatch.await() + + if (conf.enableSystestEventsLogging) { + System.out.println("shutdown_complete") + } } }) } @@ -253,6 +257,9 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) + val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", + "Log lifecycle events of the consumer in addition to logging consumed " + + "messages. (This is specific for system tests.)") if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -260,6 +267,7 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) val useNewConsumer = options.has(useNewConsumerOpt) + val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt) // If using old consumer, exactly one of whitelist/blacklist/topic is required. // If using new consumer, topic must be specified. diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index e5f2196e18418..37638e2a9812b 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -123,6 +123,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + self.clean_shutdown_nodes = set() self.client_id = client_id self.print_key = print_key self.log_level = "TRACE" @@ -185,6 +186,7 @@ def start_cmd(self, node): if node.version > LATEST_0_9: cmd+=" --formatter kafka.tools.LoggingMessageFormatter" + cmd += " --enable-systest-events" cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd @@ -226,10 +228,15 @@ def _worker(self, idx, node): for line in itertools.chain([first_line], consumer_output): msg = line.strip() - if self.message_validator is not None: - msg = self.message_validator(msg) - if msg is not None: - self.messages_consumed[idx].append(msg) + if msg == "shutdown_complete": + if node in self.clean_shutdown_nodes: + raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx) + self.clean_shutdown_nodes.add(node) + else: + if self.message_validator is not None: + msg = self.message_validator(msg) + if msg is not None: + self.messages_consumed[idx].append(msg) self.read_jmx_output(idx, node) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 500410f134f10..4fec776719481 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -71,6 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.acked_values = [] self.not_acked_values = [] self.produced_count = {} + self.clean_shutdown_nodes = set() self.acks = acks @@ -139,6 +140,11 @@ def _worker(self, idx, node): last_produced_time = t prev_msg = data + elif data["name"] == "shutdown_complete": + if node in self.clean_shutdown_nodes: + raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx) + self.clean_shutdown_nodes.add(node) + def start_cmd(self, node, idx): cmd = "" diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 9b10a9f0dd613..b511fb94c8c6a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -247,6 +247,14 @@ public String getValue(long val) { /** Close the producer to flush any remaining messages. */ public void close() { producer.close(); + System.out.println(shutdownString()); + } + + String shutdownString() { + Map data = new HashMap<>(); + data.put("class", this.getClass().toString()); + data.put("name", "shutdown_complete"); + return toJsonString(data); } /**