From 501ae1ad97b6b77a2078ed234bbbbafe9f828ecd Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Wed, 27 Apr 2016 15:56:43 -0700 Subject: [PATCH 1/4] KAFKA-3597: Enable query ConsoleConsumer and VerifiableProducer if they shutdown cleanly --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 2 ++ tests/kafkatest/services/console_consumer.py | 15 +++++++++++---- tests/kafkatest/services/verifiable_producer.py | 7 +++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index e9a43f2bf422b..5c860b0d8f2cb 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -103,6 +103,8 @@ object ConsoleConsumer extends Logging { consumer.stop() shutdownLatch.await() + + System.out.println("shutdown_complete") } }) } diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index e5f2196e18418..740447b02d64f 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -123,6 +123,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + self.nodes_clean_shutdown = [] self.client_id = client_id self.print_key = print_key self.log_level = "TRACE" @@ -199,6 +200,9 @@ def pids(self, node): def alive(self, node): return len(self.pids(node)) > 0 + def clean_shutdown(self, node): + return self.idx(node) in self.nodes_clean_shutdown + def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) @@ -226,10 +230,13 @@ def _worker(self, idx, node): for line in itertools.chain([first_line], consumer_output): msg = line.strip() - if self.message_validator is not None: - msg = self.message_validator(msg) - if msg is not None: - self.messages_consumed[idx].append(msg) + if msg == "shutdown_complete": + self.nodes_clean_shutdown.append(idx) + else: + if self.message_validator is not None: + msg = self.message_validator(msg) + if msg is not None: + self.messages_consumed[idx].append(msg) self.read_jmx_output(idx, node) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 414da84ad9e64..c53e21918cf17 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -71,6 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.acked_values = [] self.not_acked_values = [] self.produced_count = {} + self.nodes_clean_shutdown = [] @property @@ -135,6 +136,9 @@ def _worker(self, idx, node): last_produced_time = t prev_msg = data + elif data["name"] == "tool_data": + self.nodes_clean_shutdown.append(idx) + def start_cmd(self, node, idx): cmd = "" @@ -180,6 +184,9 @@ def pids(self, node): def alive(self, node): return len(self.pids(node)) > 0 + def clean_shutdown(self, node): + return self.idx(node) in self.nodes_clean_shutdown + @property def acked(self): with self.lock: From 48e9a1cf1680ea236c1ed9958ccbbc6dbcb92805 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Thu, 28 Apr 2016 15:00:56 -0700 Subject: [PATCH 2/4] addressed comments --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 6 +++++- tests/kafkatest/services/console_consumer.py | 10 +++++----- tests/kafkatest/services/verifiable_producer.py | 8 +++----- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 5c860b0d8f2cb..f3b61fbce6479 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -104,7 +104,9 @@ object ConsoleConsumer extends Logging { shutdownLatch.await() - System.out.println("shutdown_complete") + if (conf.enableLifecycleLogging) { + System.out.println("shutdown_complete") + } } }) } @@ -255,6 +257,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) + val enableLifecycleLoggingOpt = parser.accepts("enable-lifecycle-logging", "Log lifecycle events of the consumer.") if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -262,6 +265,7 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) val useNewConsumer = options.has(useNewConsumerOpt) + val enableLifecycleLogging = options.has(enableLifecycleLoggingOpt) // If using old consumer, exactly one of whitelist/blacklist/topic is required. // If using new consumer, topic must be specified. diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 740447b02d64f..03cd25e8b106d 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -123,7 +123,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} - self.nodes_clean_shutdown = [] + self.node_indexes_clean_shutdown = set() self.client_id = client_id self.print_key = print_key self.log_level = "TRACE" @@ -186,6 +186,7 @@ def start_cmd(self, node): if node.version > LATEST_0_9: cmd+=" --formatter kafka.tools.LoggingMessageFormatter" + cmd += " --enable-lifecycle-logging" cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd @@ -200,9 +201,6 @@ def pids(self, node): def alive(self, node): return len(self.pids(node)) > 0 - def clean_shutdown(self, node): - return self.idx(node) in self.nodes_clean_shutdown - def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) @@ -231,7 +229,9 @@ def _worker(self, idx, node): for line in itertools.chain([first_line], consumer_output): msg = line.strip() if msg == "shutdown_complete": - self.nodes_clean_shutdown.append(idx) + if idx in self.node_indexes_clean_shutdown: + raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx) + self.node_indexes_clean_shutdown.add(idx) else: if self.message_validator is not None: msg = self.message_validator(msg) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index c53e21918cf17..6d5080cf5a651 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -71,7 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.acked_values = [] self.not_acked_values = [] self.produced_count = {} - self.nodes_clean_shutdown = [] + self.node_indexes_clean_shutdown = set() @property @@ -137,7 +137,8 @@ def _worker(self, idx, node): prev_msg = data elif data["name"] == "tool_data": - self.nodes_clean_shutdown.append(idx) + # the stats summary is printed after producer's close(), which means shutdown was clean + self.node_indexes_clean_shutdown.add(idx) def start_cmd(self, node, idx): @@ -184,9 +185,6 @@ def pids(self, node): def alive(self, node): return len(self.pids(node)) > 0 - def clean_shutdown(self, node): - return self.idx(node) in self.nodes_clean_shutdown - @property def acked(self): with self.lock: From ad71aae5675662f66b9fe5af79df7624a8129bed Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Thu, 28 Apr 2016 15:52:57 -0700 Subject: [PATCH 3/4] Extended description of --enable-lifecycle-logging --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index f3b61fbce6479..579fef74062b8 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -257,7 +257,8 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) - val enableLifecycleLoggingOpt = parser.accepts("enable-lifecycle-logging", "Log lifecycle events of the consumer.") + val enableLifecycleLoggingOpt = parser.accepts("enable-lifecycle-logging", "Log lifecycle events of the consumer in addition to logging consumed " + + "messages. (This is specific for system tests.)") if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") From 38db16804bf168acb0bbe30276830af715176b03 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Fri, 29 Apr 2016 00:16:28 -0700 Subject: [PATCH 4/4] adding shutdown_complete event to the producer --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 9 +++++---- tests/kafkatest/services/console_consumer.py | 8 ++++---- tests/kafkatest/services/verifiable_producer.py | 9 +++++---- .../java/org/apache/kafka/tools/VerifiableProducer.java | 8 ++++++++ 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 579fef74062b8..89536400e1cd9 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -104,7 +104,7 @@ object ConsoleConsumer extends Logging { shutdownLatch.await() - if (conf.enableLifecycleLogging) { + if (conf.enableSystestEventsLogging) { System.out.println("shutdown_complete") } } @@ -257,8 +257,9 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("deserializer for values") .ofType(classOf[String]) - val enableLifecycleLoggingOpt = parser.accepts("enable-lifecycle-logging", "Log lifecycle events of the consumer in addition to logging consumed " + - "messages. (This is specific for system tests.)") + val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", + "Log lifecycle events of the consumer in addition to logging consumed " + + "messages. (This is specific for system tests.)") if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -266,7 +267,7 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) val useNewConsumer = options.has(useNewConsumerOpt) - val enableLifecycleLogging = options.has(enableLifecycleLoggingOpt) + val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt) // If using old consumer, exactly one of whitelist/blacklist/topic is required. // If using new consumer, topic must be specified. diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 03cd25e8b106d..37638e2a9812b 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -123,7 +123,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} - self.node_indexes_clean_shutdown = set() + self.clean_shutdown_nodes = set() self.client_id = client_id self.print_key = print_key self.log_level = "TRACE" @@ -186,7 +186,7 @@ def start_cmd(self, node): if node.version > LATEST_0_9: cmd+=" --formatter kafka.tools.LoggingMessageFormatter" - cmd += " --enable-lifecycle-logging" + cmd += " --enable-systest-events" cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd @@ -229,9 +229,9 @@ def _worker(self, idx, node): for line in itertools.chain([first_line], consumer_output): msg = line.strip() if msg == "shutdown_complete": - if idx in self.node_indexes_clean_shutdown: + if node in self.clean_shutdown_nodes: raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx) - self.node_indexes_clean_shutdown.add(idx) + self.clean_shutdown_nodes.add(node) else: if self.message_validator is not None: msg = self.message_validator(msg) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 9bf4c24992acf..4fec776719481 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -71,7 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.acked_values = [] self.not_acked_values = [] self.produced_count = {} - self.node_indexes_clean_shutdown = set() + self.clean_shutdown_nodes = set() self.acks = acks @@ -140,9 +140,10 @@ def _worker(self, idx, node): last_produced_time = t prev_msg = data - elif data["name"] == "tool_data": - # the stats summary is printed after producer's close(), which means shutdown was clean - self.node_indexes_clean_shutdown.add(idx) + elif data["name"] == "shutdown_complete": + if node in self.clean_shutdown_nodes: + raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx) + self.clean_shutdown_nodes.add(node) def start_cmd(self, node, idx): diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 9b10a9f0dd613..b511fb94c8c6a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -247,6 +247,14 @@ public String getValue(long val) { /** Close the producer to flush any remaining messages. */ public void close() { producer.close(); + System.out.println(shutdownString()); + } + + String shutdownString() { + Map data = new HashMap<>(); + data.put("class", this.getClass().toString()); + data.put("name", "shutdown_complete"); + return toJsonString(data); } /**