Skip to content

KAFKA-10340: Proactively close producer when cancelling source tasks#10016

Merged
rhauch merged 6 commits intoapache:trunkfrom
C0urante:kafka-10340
Mar 1, 2021
Merged

KAFKA-10340: Proactively close producer when cancelling source tasks#10016
rhauch merged 6 commits intoapache:trunkfrom
C0urante:kafka-10340

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

@C0urante C0urante commented Feb 1, 2021

Jira

When a source task produces records for a topic that doesn't exist on the Kafka cluster and automatic topic creation is disabled on the broker and not configured à la KIP-158 on the connector, the task hangs indefinitely until and unless the topic is created. Even if the task is scheduled for shutdown by the worker, it continues to hang; the SourceTask instance isn't stopped and the producer isn't closed.

One possible approach to handle this situation is to proactively close the producer for the task when it is abandoned after exceeding the graceful shutdown timeout period. This can increase the likelihood of duplicate records for tasks that are blocked on shutdown for other reasons (high throughput, for example), as offsets will not be committed for any outstanding batches. However, given that the Connect framework's overall delivery guarantees of Connect source connectors still remain intact with this approach (either at-least-once or at-most-once, but not exactly-once), the tradeoff seems acceptable in order to prevent resource leaks that, if stacked over a long enough period, will require worker restarts to deal with.

An existing unit test for the WorkerSourceTask class is expanded to ensure that the producer is closed when the task is abandoned, and a new integration test is added that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Feb 1, 2021

@gharris1727 @ncliang @SajanaW would any of you have time to take a look?

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Feb 2, 2021

Thanks for taking a look @gharris1727. I've tried to address your comments; let me know what you think.

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, casting Runnable::run to Executor is pretty clever.

LGTM, thanks @C0urante!

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @C0urante. Overall, this looks great. This PR has a merge conflict that needs to be addressed, and I do have a few other pretty minor suggestions below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The producer's close(...) method can throw an InterruptException if the method fails to join the IO thread. This can theoretically happen even if the timeout is 0 if the thread is interrupted (e.g., the executor is shutdown) before the join can wait. Although the likelihood of this is small, what do you think about catching InterruptException and ignoring the error?

Suggested change
} catch (Throwable t) {
} catch (InterruptException t) {
// ignore, since this is likely due to the worker's executor being shut down
} catch (Throwable t) {

Two things. First, the producer throws InterruptExeption, not InterruptedException. Second, even though the WorkerSourceTask::close() that calls this closeProducer(Duration) method doesn't directly use the executor, the Worker does use that same executor to stop this WorkerSourceTask, which ultimately does call WorkerSourceTask::close(). IOW, this closeProducer(Duration) method is always called from the executor, and the executor could be shutdown at any moment, thus the potential InterruptException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now there aren't any code paths that lead to the worker's executor being shut down. One might be added in the future, but it seems a little premature to try to catch it right now and may mislead readers of the code base. Plus, if an InterruptException does get generated somehow, it might be worth knowing about as it may indicate unhealthy (or at least unexpected) behavior from the worker, the task, an interceptor, etc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now there aren't any code paths that lead to the worker's executor being shut down.

Hmm, that seems to have been done a long time ago. I wonder if that was an oversight, or whether that was intentional since in Connect the Worker::stop() is called when the herder is stopped, which only happens (in Connect) when the shutdown hook is called -- at which point the JVM is terminating anyway. Luckily MM2 works the same way.

But in our test cases that use EmbeddedConnectCluster, those tests are not cleaning up all resources of the Worker (and thus Herder) -- we might have threads that still keep running. Seems like we should address that in a different issue. I'll log something.

Copy link
Copy Markdown
Contributor

@rhauch rhauch Feb 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for shutting down the worker's executor. Again, it's not an issue in runtime, but a potential issue in our tests.

Comment thread connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java Outdated
C0urante and others added 4 commits February 26, 2021 11:54
…tion/ConnectWorkerIntegrationTest.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…tion/ConnectWorkerIntegrationTest.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…tion/ConnectWorkerIntegrationTest.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks @rhauch. I've added all of your suggested changes except for the special handling for InterruptException (which I've responded to via comment) and fixed the merge conflict. Ready for another round when you have a chance.

@rhauch rhauch merged commit a63e5be into apache:trunk Mar 1, 2021
@C0urante C0urante deleted the kafka-10340 branch March 1, 2021 16:12
rhauch pushed a commit that referenced this pull request Mar 1, 2021
…10016)

Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker.

Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
rhauch pushed a commit to rhauch/kafka that referenced this pull request Mar 1, 2021
…pache#10016)

Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker.

Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
rhauch pushed a commit that referenced this pull request Mar 1, 2021
…10016)

Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker.

Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
ijuma added a commit to ijuma/kafka that referenced this pull request Mar 2, 2021
* apache-github/trunk: (37 commits)
  KAFKA-10357: Extract setup of changelog from Streams partition assignor (apache#10163)
  KAFKA-10251: increase timeout for consuming records (apache#10228)
  KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (apache#10223)
  MINOR: Disable transactional/idempotent system tests for Raft quorums (apache#10224)
  KAFKA-10766: Unit test cases for RocksDBRangeIterator (apache#9717)
  KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore (apache#10052)
  KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137)
  MINOR: Time and log producer state recovery phases (apache#10241)
  MINOR: correct the error message of validating uint32 (apache#10193)
  MINOR: Format the revoking active log output in `StreamsPartitionAssignor` (apache#10242)
  KAFKA-12323 Follow-up: Refactor the unit test a bit (apache#10205)
  MINOR: Remove stack trace of the lock exception in a debug log4j (apache#10231)
  MINOR: Word count should account for extra whitespaces between words (apache#10229)
  MINOR; Small refactor in `GroupMetadata` (apache#10236)
  KAFKA-10340: Proactively close producer when cancelling source tasks (apache#10016)
  KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist (apache#10141)
  KAFKA-12254: Ensure MM2 creates topics with source topic configs (apache#10217)
  MINOR: fix kafka-metadata-shell.sh (apache#10226)
  KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (apache#10199)
  KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (apache#8812)
  ...
liukrimhrim added a commit to liukrimhrim/kafka that referenced this pull request Mar 15, 2021
liukrimhrim added a commit to confluentinc/kafka that referenced this pull request Mar 16, 2021
rhauch pushed a commit that referenced this pull request May 5, 2021
…10016)

Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker.

Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants