KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started#6218
Conversation
calling `shutdown()`.
dac346a to
d20095e
Compare
shutdownComplete latch if the thread is not running.
| def shutdown(): Unit = { | ||
| initiateShutdown() | ||
| awaitShutdown() | ||
| if (this.isAlive) { |
There was a problem hiding this comment.
I think it'd be clearer if this logic was moved to awaitShutdown.
There was a problem hiding this comment.
Makes sense, thanks!
There was a problem hiding this comment.
@dhruvilshah3 actually I think awaitShutdown() needs to block regardless of the thread being alive or not. There could be cases where the user calls awaitShutdown() from a thread before starting the ShutdownableThread. The goal I had here was to just short-circuit shutdown() if the thread was never started.
dhruvilshah3
left a comment
There was a problem hiding this comment.
Thanks for the PR, LGTM. Just a minor comment.
awaitShutdown.
of the thread being started or not.
junrao
left a comment
There was a problem hiding this comment.
@gardnervickers : Thanks for the patch. A couple of comments below.
| if (this.isAlive) { | ||
| awaitShutdown() | ||
| } else { | ||
| shutdownComplete.countDown() |
There was a problem hiding this comment.
In terms of usage, a user could call (1) shutdown(), or (2) initiateShutdown(); some other code; awaitShutdown(). In the case when a thread is never started, the patch makes sure that (1) doesn't block. For consistency, it seems that we should make (2) not blocking too?
There was a problem hiding this comment.
Hmm I'm not sure, my interpretation of awaitShutdown() was that it should always block the caller until shutdown is complete, regardless of the thread being started or not.
Looking through the codebase it seems like it's mostly used when we know the thread is already started, so it's probably best to do the this.isAlive check in awaitShutdown() in order to make both (1) and (2) non-blocking.
junrao
left a comment
There was a problem hiding this comment.
@gardnervickers : Thanks for the updated patch. One more suggestion below.
| def awaitShutdown(): Unit = { | ||
| shutdownComplete.await() | ||
| if (this.isAlive) | ||
| shutdownComplete.await() |
There was a problem hiding this comment.
There are 2 ways to shut down a ShutdownableThread: (1) Just call shutdown(), which blocks until the thread completes. (2) First call initiateShutdown() and then call awaitShutdown(). The reason for this style is to allow the caller to add additional logic between the 2 calls. For example, if the thread can block in a selector, one can wake up the selector between the 2 calls. If the thread wasn't started or died unexpectedly, it's reasonable not to block the shutdown in either case.
Also, just calling awaitShutdown() w/o calling initiateShutdown() first is unexpected and not supported. To protect against this, perhaps we can add a check to make sure that shutdownInitiated is already 0 when awaitShutdown() is called. Otherwise, we can throw an IllegalStateException.
There was a problem hiding this comment.
Thanks! I believe we can cover both cases here if we check in awaitShutdown() that the thread has been started before blocking? Perhaps I'm misunderstanding though.
I'll add a check that shutdownInitiated == 0.
|
During testing I found |
it's set by the parent thread.
0b596ed to
7c6ac0d
Compare
|
I updated the PR to set an |
|
I also have an alternate patch available which removes the usage of Beyond attempting to make explicit the various states a ShutdownableThread can be in, it removes the need to ensure call ordering between |
|
retest this please |
junrao
left a comment
There was a problem hiding this comment.
@gardnervickers : Since the usage of ShutdownableThread is mostly simple, perhaps we can just take the current patch for now. Could you file a jira and include it in the PR title so that we can track it? Just one other comment below.
| super.start() | ||
| } | ||
| override def run(): Unit = { | ||
| isStarted = true |
There was a problem hiding this comment.
Hmm, do we need to set isStarted here again since it's already set in start()?
There was a problem hiding this comment.
I was trying to cover the case where run() is called directly, without start(), either directly or by an executor implementation.
There was a problem hiding this comment.
In that case, could we just set it in run() and not in start()?
There was a problem hiding this comment.
I added in the start() case because there could be a delay between start() getting called and run() getting called in the new thread, but I think this sends up producing inconsistent behavior depending on how the ShutdownableThread is started.
In the current state, if start() is used to start the ShutdownableThread, we're guaranteed to always correctly block in subsequent calls to awaitShutdown() since isStarted is set from the caller thread. If the ShutdownableThread is submitted to an executor, run() can be called at any time in the future, which makes it impossible for the caller thread to know if awaitShutdown() will block as we expect.
We end up with different guarantees based on how the class is run, which is not optimal. I think the only one we can guarantee across both start() and an Executor starting the thread is that the caller thread won't know when isStarted is true, and as a result, when it can rely on the blocking behavior of awaitShutdown.
|
retest this please |
junrao
left a comment
There was a problem hiding this comment.
@gardnervickers : Thanks for the patch. LGTM. Just waiting for the tests to pass.
|
retest this please |
|
@gardnervickers : The test actually revealed a real issue. The issue is that in ControllerEventManager.close(). We have the following logic. We put a ShutdownEventThread event into the queue, which will be processed by the ControllerEventThread. When this thread handles the ShutdownEventThread event, it calls initiateShutdown(). Since this happens async, there is no guarantee that initiateShutdown() will be called before awaitShutdown(). We can probably change this logic a bit. In ControllerEventManager.close(), we can do Then, we can just get rid of initiateShutdown() when handling the ShutdownEventThread event in the ControllerEventThread. |
|
Thanks! I pushed a fix for that. I noticed there were a few more tests failing but I missed the test result caching window (again) so I'll track those down too. |
junrao
left a comment
There was a problem hiding this comment.
@gardnervickers : Thanks for the updated PR. LGTM. Just a minor comment below.
| override def doWork(): Unit = { | ||
| queue.take() match { | ||
| case KafkaController.ShutdownEventThread => initiateShutdown() | ||
| case KafkaController.ShutdownEventThread => |
There was a problem hiding this comment.
Could we add a comment like "// The shutting down of the thread has been initiated at this point. Just ignore this event"?
There was a problem hiding this comment.
Yes, sounds good!
nothing in ControllerEventManager.
* AK/trunk: (36 commits) KAFKA-7962: Avoid NPE for StickyAssignor (apache#6308) Address flakiness of CustomQuotaCallbackTest#testCustomQuotaCallback (apache#6330) KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (apache#6327) MINOR: fix parameter naming (apache#6316) KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (apache#6218) MINOR: Refactor replica log dir fetching for improved logging (apache#6313) [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (apache#6227) MINOR: Increase produce timeout to 120 seconds (apache#6326) KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (apache#6293) MINOR: Fix line break issue in upgrade notes (apache#6320) KAFKA-7972: Use automatic RPC generation in SaslHandshake MINOR: Enable capture of full stack trace in StreamTask#process (apache#6310) KAFKA-7938: Fix test flakiness in DeleteConsumerGroupsTest (apache#6312) KAFKA-7937: Fix Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup (apache#6311) MINOR: Update docs to say 2.2 (apache#6315) KAFKA-7672 : force write checkpoint during StreamTask #suspend (apache#6115) KAFKA-7961; Ignore assignment for un-subscribed partitions (apache#6304) KAFKA-7672: Restoring tasks need to be closed upon task suspension (apache#6113) KAFKA-7864; validate partitions are 0-based (apache#6246) KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (apache#6285) ...
…f the thread has not been started (apache#6218) In some test cases it's desirable to instantiate a subclass of `ShutdownableThread` without starting it. Since most subclasses of `ShutdownableThread` put cleanup logic in `ShutdownableThread.shutdown()`, being able to call `shutdown()` on the non-running thread would be useful. This change allows us to avoid blocking in `ShutdownableThread.shutdown()` if the thread's `run()` method has not been called. We also add a check that `initiateShutdown()` was called before `awaitShutdown()`, to protect against the case where a user calls `awaitShutdown()` before the thread has been started, and unexpectedly is not blocked on the thread shutting down. Reviewers : Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
In some test cases it's desirable to instantiate a subclass of
ShutdownableThreadwithout starting it. Since most subclasses ofShutdownableThreadput cleanup logic inShutdownableThread.shutdown(), being able to callshutdown()on the non-running thread would be useful.This change allows us to avoid blocking in
ShutdownableThread.shutdown()if the thread'srun()method has not been called. We also add a check thatinitiateShutdown()was called beforeawaitShutdown(), to protect against the case where a user callsawaitShutdown()before the thread has been started, and unexpectedly is not blocked on the thread shutting down.Committer Checklist (excluded from commit message)