Skip to content

KAFKA-3597: Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly#1278

Closed
apovzner wants to merge 5 commits into
apache:trunkfrom
apovzner:kafka-3597
Closed

KAFKA-3597: Query ConsoleConsumer and VerifiableProducer if they shutdown cleanly#1278
apovzner wants to merge 5 commits into
apache:trunkfrom
apovzner:kafka-3597

Conversation

@apovzner
Copy link
Copy Markdown
Contributor

Even if a test calls stop() on console_consumer or verifiable_producer, it is still possible that producer/consumer will not shutdown cleanly, and will be killed forcefully after a timeout. It will be useful for some tests to know whether a clean shutdown happened or not. This PR adds methods to console_consumer and verifiable_producer to query whether clean shutdown happened or not.

@hachikuji and/or @granders Please review.


shutdownLatch.await()

System.out.println("shutdown_complete")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a risk of breaking tools which depend on console consumer to read messages? You'd need to filter this message if you're doing any processing on the results. Would it be better to add an option to console consumer to explicitly enable this? For example, "--enable-lifecycle-logging."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe just log at trace level perhaps? Looks like console_consumer.py is logging at TRACE, so we should pick this up?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might work, but it seems a little weird for a test's correctness to depend on the log level. Do we do that elsewhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I like the --enable-lifecycle-logging idea. I will do that.

@apovzner
Copy link
Copy Markdown
Contributor Author

@hachikuji and @granders Thanks a lot for your comments. I addressed all of them. Could you take one more look?

.withRequiredArg
.describedAs("deserializer for values")
.ofType(classOf[String])
val enableLifecycleLoggingOpt = parser.accepts("enable-lifecycle-logging", "Log lifecycle events of the consumer.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main question about this approach is that I'm not sure which lifecycle events are best to "log". If we're providing this flag, it seems surprising to only log a clean shutdown.

And, does "log" mean print?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to extend to more events later when we use them.. VerifiableConsumer has more of these events. But it also seems ok to add a flag for future additions of events when needed.

I think you can use both 'print' or 'log' as a more general term, no? logging to standard output

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sounds fine

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for now, we could just log startup and shutdown?

self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.node_indexes_clean_shutdown = set()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: the name is a little awkward. Maybe it would be clear enough even if you leave out "indexes" (e.g. clean_shutdown_nodes)?

Copy link
Copy Markdown
Contributor

@granders granders Apr 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Actually I asked for a name which made it clearer that this object contains numerical indices, and not actual node objects... :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, fair enough. Another option would be to actually use the node objects in the set. Anyway, just a nitpick.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok lets just do nodes in the set: clean_shutdown_nodes

@apovzner
Copy link
Copy Markdown
Contributor Author

@granders and @hachikuji Ready for another look.

@hachikuji
Copy link
Copy Markdown
Contributor

LGTM

1 similar comment
@gwenshap
Copy link
Copy Markdown
Contributor

LGTM

@asfgit asfgit closed this in e29eac4 Apr 29, 2016
gfodor pushed a commit to AltspaceVR/kafka that referenced this pull request Jun 3, 2016
…down cleanly

Even if a test calls stop() on console_consumer or verifiable_producer, it is still possible that producer/consumer will not shutdown cleanly, and will be killed forcefully after a timeout. It will be useful for some tests to know whether a clean shutdown happened or not. This PR adds methods to console_consumer and verifiable_producer to query whether clean shutdown happened or not.

hachikuji and/or granders Please review.

Author: Anna Povzner <anna@confluent.io>

Reviewers: Jason Gustafson, Geoff Anderson, Gwen Shapira

Closes apache#1278 from apovzner/kafka-3597
mumrah pushed a commit to mumrah/kafka that referenced this pull request Aug 14, 2024
* Impl for writeState in DefaultSharePersister, ShareCoordinatorService and ShareCoordinatorShard
* Setup in KafkaApis to intercept the RPC and return the result.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants