Skip to content

KAFKA-15826: Close consumer when sink task is cancelled#14762

Closed
gharris1727 wants to merge 7 commits intoapache:trunkfrom
gharris1727:kafka-15826-sink-task-leaks-consumer
Closed

KAFKA-15826: Close consumer when sink task is cancelled#14762
gharris1727 wants to merge 7 commits intoapache:trunkfrom
gharris1727:kafka-15826-sink-task-leaks-consumer

Conversation

@gharris1727
Copy link
Copy Markdown
Contributor

Currently the only place that the consumer is closed is on the task thread, which may be blocked indefinitely by the task plugin. Similar to AbstractWorkerSourceTask, we should close the consumer here to allow for cleanup to take place.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix Greg!

One interesting side effect of this change though is that the following two tests in OffsetsApiIntegrationTest have begun failing - testAlterSinkConnectorOffsetsZombieSinkTasks and testResetSinkConnectorOffsetsZombieSinkTasks. These tests verify the behavior of the offsets modification endpoints (from KIP-875) when sink tasks haven't finished stopping completely (by simulating an indefinitely blocking Task::stop method). The verification expects the endpoints to return an error response on account of the zombie sink tasks that are detected from the failure in altering / resetting consumer group offsets (via the admin client) for an active consumer group. These tests now fail because the consumer group will no longer be active once the tasks are cancelled.

…nk tasks

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Copy Markdown
Contributor Author

@yashmayya I noticed those failures and wasn't sure they're affected, thanks for explaining exactly what is going on!

Since other workers in the cluster may not have this fix applied, I'll leave the error message unchanged and just update the assertions. There could also be unpredictable zombie conditions where cancel() is never called, and this fix is ineffective.

The change to the offsets API behavior was unintended, but I think it is beneficial, and indicates that this change is moving in the right direction. Now there are fewer situations in which a user needs to restart the workers to use the REST API 🙃.

@gharris1727
Copy link
Copy Markdown
Contributor Author

The only problem I have with this approach is that the official documentation states that the KafkaConsumer is not thread-safe, except for wakeup(). They recommend calling wakeup() on a different thread, and then close() on the primary thread. In the blocked connector case we're unlikely to be within a consumer call, but we technically don't have exclusive access to the consumer and could receive a ConcurrentModificationException on any of the consumer methods.

I think if we want cancellation to close the consumer, we'd also need to "take ownership" of the consumer and prevent the main thread from using the consumer. We already have isCancelled() guards on offset commit, but maybe that isn't sufficient to prevent ConcurrentModificationExceptions. I'll have to look into this more.

@hudeqi hudeqi self-requested a review November 16, 2023 04:10
@yashmayya
Copy link
Copy Markdown
Contributor

Since other workers in the cluster may not have this fix applied, I'll leave the error message unchanged and just update the assertions.

Hm, the original purpose of those tests were to verify that the admin client error handling here and here is correct and to safeguard against any regressions (from unexpected changes in the admin client or consumer group coordinator for instance) in that path. Considering that, these assertion changes defeat the purpose of those tests. Perhaps we could bump up the value for the task.shutdown.graceful.timeout.ms worker config (which would delay the task cancellation) in those particular tests in order to retain the original intent?

@gharris1727
Copy link
Copy Markdown
Contributor Author

Hey @yashmayya Thanks for letting me know the original intent of the test. I agree, changing the test to expect the call to succeed was not in the spirit of the original test, so i've reverted back to the old assertions.

I tried to increase the graceful shutdown timeout, but I wasn't able to get the calls to fail. I think that the graceful shutdown blocks the herder thread, and prevents the herder from servicing the alter/reset requests, so they always wait to execute until after the cancellation has been applied.

Rather than trying to get the requests to target a different worker and dealing with that nondeterminism, I elected to just simulate the zombie tasks with an out-of-band consumer that I create in the test. That should also test the case when a user has an external consumer in the group.

PTAL, thanks so much for the review!

Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than trying to get the requests to target a different worker and dealing with that nondeterminism, I elected to just simulate the zombie tasks with an out-of-band consumer that I create in the test

Thanks Greg, I like the out-of-band consumer approach and it's definitely good enough to simulate the zombie task case here 👍

I tried to increase the graceful shutdown timeout, but I wasn't able to get the calls to fail. I think that the graceful shutdown blocks the herder thread, and prevents the herder from servicing the alter/reset requests, so they always wait to execute until after the cancellation has been applied.

Interesting, I'd assumed that they'd be stopped on the herder's separate startAndStopExecutor but looks like that isn't the case in this path.

I think if we want cancellation to close the consumer, we'd also need to "take ownership" of the consumer and prevent the main thread from using the consumer. We already have isCancelled() guards on offset commit, but maybe that isn't sufficient to prevent ConcurrentModificationExceptions. I'll have to look into this more.

What was the conclusion here? Is triggering wakeup on a separate thread (through WorkerSinkTask::stop) before also eventually closing the consumer on a separate thread (through WorkerSinkTask::cancel) safe? Looks like it probably might be, but I was only able to take a cursory look.

I only had a couple of trivial comments otherwise.

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Copy Markdown
Contributor Author

What was the conclusion here? Is triggering wakeup on a separate thread (through WorkerSinkTask::stop) before also eventually closing the consumer on a separate thread (through WorkerSinkTask::cancel) safe? Looks like it probably might be, but I was only able to take a cursory look.

I don't think it is. For example, it looks like doCommitSync catches the WakeupException and then makes another call to the consumer, and that call could be ongoing when cancel()/close() is called. I think also that after wakeup() exits, there's no guarantee that the other thread has actually thrown the WakeupException and released the consumer lock. Since the plugin has access to the WorkerSinkTaskContext, it can also perform an infinite loop and catch all of the WakeupExceptions.

I think that preventing ConcurrentModificationExceptions completely will require heavy synchronization, which gives more opportunities for the task thread to block the herder thread. If we assume that framework has no infinite loops (which may not be the case given the behavior of doCommitSync), we could make cancellation only happen if the task thread is in plugin code, and the consumer is unlikely to be in use. This could be done with an AtomicReference, which is the same synchronization primitive that the Consumer is using to fire the ConcurrentModificationExceptions.

I'll look into it more, I don't think we should merge this change until we have a handle on the CMEs.

@gharris1727
Copy link
Copy Markdown
Contributor Author

gharris1727 commented Dec 11, 2023

I found this same leak again when debugging some failures of the kafka.utils.TestUtils.verifyNoUnexpectedThreads assertion in core tests. Whenever we leak one of these clients in connect:runtime, all of the core tests using this assertion will fail. In particular, the BlockingConnectorTest causes this failure:

Found 1 unexpected threads during @BeforeAll: `kafka-coordinator-heartbeat-thread | connect-blocking-connector` ==> expected: <true> but was: <false>
Expected :true
Actual   :false
<Click to see difference>

org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @BeforeAll: `kafka-coordinator-heartbeat-thread | connect-blocking-connector` ==> expected: <true> but was: <false>
	at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
	at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
	at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
	at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
	at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
	at app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:2412)
	at app//kafka.utils.TestUtils.verifyNoUnexpectedThreads(TestUtils.scala)

This made me realize that the BlockingConnectorTest may be changed so that the operations block while the test is ongoing, and then unblock during the cleanup phase. This would stabilize the tests and resolve the leaks without changing the user-facing cancellation behavior or experiencing CMEs.

Here's the PR for that: #14987

@gharris1727
Copy link
Copy Markdown
Contributor Author

I think due to the locking concerns I raised earlier, and that we can resolve this resource leak in our tests, this PR is not viable to merge. We can revisit this in the future if the locking model of the consumer can be changed, or if this becomes a demonstrable problem in real situations and not just tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants