Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ object ConsoleConsumer extends Logging {
consumer.stop()

shutdownLatch.await()

if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}
Expand Down Expand Up @@ -253,13 +257,17 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")

if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")

var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
val useNewConsumer = options.has(useNewConsumerOpt)
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)

// If using old consumer, exactly one of whitelist/blacklist/topic is required.
// If using new consumer, topic must be specified.
Expand Down
15 changes: 11 additions & 4 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.clean_shutdown_nodes = set()
self.client_id = client_id
self.print_key = print_key
self.log_level = "TRACE"
Expand Down Expand Up @@ -185,6 +186,7 @@ def start_cmd(self, node):
if node.version > LATEST_0_9:
cmd+=" --formatter kafka.tools.LoggingMessageFormatter"

cmd += " --enable-systest-events"
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd

Expand Down Expand Up @@ -226,10 +228,15 @@ def _worker(self, idx, node):

for line in itertools.chain([first_line], consumer_output):
msg = line.strip()
if self.message_validator is not None:
msg = self.message_validator(msg)
if msg is not None:
self.messages_consumed[idx].append(msg)
if msg == "shutdown_complete":
if node in self.clean_shutdown_nodes:
raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
self.clean_shutdown_nodes.add(node)
else:
if self.message_validator is not None:
msg = self.message_validator(msg)
if msg is not None:
self.messages_consumed[idx].append(msg)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to validate here that idx not in self.nodes_clean_shutdown? (although we might want a set instead of a list). Seems like it should be an error to receive a valid message after receiving a "shutdown_complete" message

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree about a set. Based on java consumer code, should be called just once. I guess it's always better to catch weird behavior earlier -- I will raise an exception on a duplicate shutdown event.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was actually thinking that any message (in particular a consumed message?) after a shutdown complete would be a thing to raise an error over.

Really just a sanity check, might not actually be of use.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I am a bit worried since we are printing out messages if the printing itself can come out of order. Since we are not flushing after every print. Not sure how rare that behavior is...


self.read_jmx_output(idx, node)

Expand Down
6 changes: 6 additions & 0 deletions tests/kafkatest/services/verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput
self.acked_values = []
self.not_acked_values = []
self.produced_count = {}
self.clean_shutdown_nodes = set()
self.acks = acks


Expand Down Expand Up @@ -139,6 +140,11 @@ def _worker(self, idx, node):
last_produced_time = t
prev_msg = data

elif data["name"] == "shutdown_complete":
if node in self.clean_shutdown_nodes:
raise Exception("Unexpected shutdown event from producer, already shutdown. Producer index: %d" % idx)
self.clean_shutdown_nodes.add(node)

def start_cmd(self, node, idx):

cmd = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ public String getValue(long val) {
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
System.out.println(shutdownString());
}

String shutdownString() {
Map<String, Object> data = new HashMap<>();
data.put("class", this.getClass().toString());
data.put("name", "shutdown_complete");
return toJsonString(data);
}

/**
Expand Down