KAFKA-12380: Executor in Connect's Worker is not shut down when the worker is#10337
KAFKA-12380: Executor in Connect's Worker is not shut down when the worker is#10337sridhav wants to merge 4 commits intoapache:trunkfrom
Conversation
|
@rhauch can you please review? |
…pping the execution of all in-progress and queued tasks
There was a problem hiding this comment.
Thanks for this simple PR, @sridhav. A pretty minor question/suggestion below.
Also, it's probably worth pointing out a potential edge case that comes with explicitly closing this executor. Recall that this worker is only called by the herder via the herder's enqueued requests. When the herder is stopped/halted, the herder first cleans out this request queue (failing any outstanding requests) and then stops its services, which includes the worker. That means that when the worker is finally stopped, there should be no more requests to start connectors or tasks. And since those are the only methods that potentially submit work to this executor, we should never submit work to the executor once it is shutdown, and thus those executor.submit(...) calls don't need to handle the RejectedExecutionHandler exceptions.
When the herder stops, it also stops its assigned connectors and tasks, and those requests wait until the connectors and tasks are actually stopped. This means that the Worker's executor should have no running tasks when the worker is stopped and it calls executor.shutdownNow(). However, if the executor were still running tasks when the shutdownNow() method is called, that method may attempt to interrupt those running threads, which would cause exceptions in those runnables of the connector and task, though WorkerTask.run() catches all exceptions and only propagates Errors. But none of that should happen, though, since the herder stops all running connectors and tasks before stopping the worker.
While these should always be the case, is there any way of asserting that with a unit test?
|
|
||
| offsetBackingStore.stop(); | ||
| metrics.stop(); | ||
| stopExecutor(); |
There was a problem hiding this comment.
Are there advantages of putting this simple if-check in a separate methods? Would it be simpler and more straightforward to just do the check here:
| stopExecutor(); | |
| if (executor != null) { | |
| executor.shutdownNow(); | |
| } |
and then remove the stopExecutor() method?
There is already precedence for an if-check a few lines above.
|
Also, there are 24 failed unit tests on the different jobs, of the form: Please make sure you run the tests locally. The |
|
Close this PR because it's pending for a long time, and another PR already addressed this issue. |
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
When the worker is stopped, it does not shutdown this executor.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
The following tests are run:
./gradlew connect:test./gradlew connect:unitTest./gradlew connect:integrationTestCommitter Checklist (excluded from commit message)