Skip to content

KAFKA-10790: Add deadlock detection to producer#flush#17946

Merged
AndrewJSchofield merged 1 commit intoapache:trunkfrom
frankvicky:KAFKA-10790
Jan 7, 2025
Merged

KAFKA-10790: Add deadlock detection to producer#flush#17946
AndrewJSchofield merged 1 commit intoapache:trunkfrom
frankvicky:KAFKA-10790

Conversation

@frankvicky
Copy link
Copy Markdown
Contributor

JIRA: KAFKA-10790

As we already know, the callback of Producer#send is executed by the ioThread. Due to this design, many application bugs have been caused by calling Producer#flush in callback,which leads to deadlocks. This PR aims to add protection against such scenarios.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. This seems like a useful enhancement to me.

Please update the upgrade documentation too. This is a small change in behaviour which will affect anyone who is using flush() inside a callback.

public void shouldNotInvokeFlushInCallback() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this setting relevant? Having it in this test makes me think that the test would only work with it, but I do not believe that to be the case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting aims to simplify our test case. Here's my understanding—please correct me if I'm wrong:
Since idempotence is enabled by default, if we don't explicitly set it to false, we would need to prepare additional data to go through the idempotence flow. I believe this is unnecessary for this test case, so I will add a comment to clarify. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. It prevents needing to do the init producer id stuff. Makes sense to me.

@Override
public void flush() {
if (Thread.currentThread() == this.ioThread) {
log.error("flush invocation detected in the callback. This can cause a deadlock due to thread blocking.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change the wording here because actually the code is preventing the call and that means there is no risk of a deadlock. Something like "KafkaProducer.flush() invocation detected inside callback. This is not permitted because of the risk of deadlock."

The message in the exception could be the same. The choice of KafkaException seems most appropriate to me.

Copy link
Copy Markdown
Collaborator

@TaiJuWu TaiJuWu Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to know this log is necessary? In my opinion, this information is already recorded by KafkaException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to use both logging and exceptions simultaneously, as the user might want immediate hints displayed on the console.

@frankvicky frankvicky force-pushed the KAFKA-10790 branch 2 times, most recently from 2ac6d6b to 0815cef Compare November 27, 2024 09:06
Copy link
Copy Markdown
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that the KafkaProducer javadoc should be altered as well. That's the most likely place that a programmer will notice.

Comment thread docs/upgrade.html Outdated
</li>
<li>The deprecated <code>sendOffsetsToTransaction(Map&lt;TopicPartition, OffsetAndMetadata&gt;, String)</code> method has been removed from the Producer API.
</li>
<li>The <code>flush</code> method now includes deadlock detection when invoked inside a callback to prevent unintended blocking scenarios.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be more explicit, like the message in the exception added to KafkaProducer. You are not allowed to use flush() in the callback. Maybe "The flush method now includes deadlock detection preventing its use inside a callback. This avoids unintended blocking which has been known to occur prior to this change." Something like that. We want someone reading this to understand that they must not use flush in the callback.

@AndrewJSchofield
Copy link
Copy Markdown
Member

The code looks good to me, but I don't think we are ready to merge yet.

I've been thinking about the mechanics of delivering this into AK, and this is definitely a change in behaviour of a public interface, which means a KIP is required. It's true that it's preventing an unsafe behaviour which is categorically a good thing, but if someone has an application which is doing this, their code will break following this change. @frankvicky, do you agree? Can you create a KIP to make sure we have community alignment?

Without a KIP, we cannot throw the exception, but we can log the message and update the javadoc.

@chia7712
Copy link
Copy Markdown
Member

but if someone has an application which is doing this, their code will break following this change. @frankvicky, do you agree? Can you create a KIP to make sure we have community alignment?
Without a KIP, we cannot throw the exception, but we can log the message and update the javadoc.

Yes, it’s definitely worth having a KIP. The KIP should not only cover the behavior changes but also facilitate discussions on other potential approaches—for instance, adding a log message instead of throwing an exception.

@frankvicky
Copy link
Copy Markdown
Contributor Author

frankvicky commented Nov 29, 2024

Hello @AndrewJSchofield
I think you are right. Since Kafka usually considers exceptions as part of the public API, this change will require a KIP.
I will prepare one soon.

@AndrewJSchofield AndrewJSchofield added the kip Requires or implements a KIP label Nov 29, 2024
@frankvicky
Copy link
Copy Markdown
Contributor Author

Hello everyone,
I have filed KIP-1118 for this issue, PTAL.

@frankvicky frankvicky force-pushed the KAFKA-10790 branch 2 times, most recently from 9268221 to abd29bb Compare December 4, 2024 09:10
@AndrewJSchofield
Copy link
Copy Markdown
Member

Hi @frankvicky. I think we can get some of the benefit here in spite of the fact that the KIP is too late for AK 4.0. I suggest you change this PR to alter that javadoc to say "This method should not be used" rather than "must not". You can log the error if flush is used in the callback. You cannot throw the KafkaException. What do you think?

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Dec 9, 2024

I think we can get some of the benefit here in spite of the fact that the KIP is too late for AK 4.0. I suggest you change this PR to alter that javadoc to say "This method should not be used" rather than "must not". You can log the error if flush is used in the callback. You cannot throw the KafkaException. What do you think?

If we want to avoid introducing behavior changes in version 4.0, adding documentation and logging is an appropriate solution. @frankvicky, could you please file a MINOR issue to address @AndrewJSchofield's comment? In the minor PR, we don't introduce any behavior changes but enrich the documentation and logging.

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @AndrewJSchofield @chia7712
No problem. I will file a PR soon.

@frankvicky
Copy link
Copy Markdown
Contributor Author

This is minor PR: #18112

@frankvicky frankvicky force-pushed the KAFKA-10790 branch 3 times, most recently from 0c3d575 to ae3c967 Compare January 5, 2025 05:10
@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @AndrewJSchofield @chia7712
Since the KIP has been accepted, I think we could merge this one.
WDYT?

@AndrewJSchofield
Copy link
Copy Markdown
Member

@frankvicky It is unfortunate this behaviour change didn't make the 4.0 deadline. Personally, because the old behaviour was liable to cause deadlocks, I'm good with making such a change in 4.1. @chia7712 What do you think?

Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good to me. Just one documentation comment.

Comment thread docs/upgrade.html Outdated
</li>
<li>The deprecated <code>sendOffsetsToTransaction(Map&lt;TopicPartition, OffsetAndMetadata&gt;, String)</code> method has been removed from the Producer API.
</li>
<li>The <code>flush</code> method now detects potential deadlocks and prohibits its use inside a callback. This change prevents unintended blocking behavior, which was a known risk in earlier versions.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text now needs to be in an "upgrading to 4.1" section.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @AndrewJSchofield that this PR will not be backported to the 4.0 release. Therefore, this change should not be included in the "upgrade_4_0_0" chapter. @frankvicky, could you please create a new chapter titled "upgrade_4_1_0" and move this documentation to the "upgrade_4_1_0" chapter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have applied the suggestion. Here is the preview:
Screenshot from 2025-01-07 21-43-32

@AndrewJSchofield AndrewJSchofield merged commit abeed20 into apache:trunk Jan 7, 2025
ijuma added a commit to ijuma/kafka that referenced this pull request Jan 8, 2025
…og-compaction-write-record-v2

* apache-github/trunk: (34 commits)
  MINOR: Bump year to 2025 in NOTICE file (apache#18427)
  KAFKA-18411 Remove ZkProducerIdManager (apache#18413)
  KAFKA-18408 tweak the 'tag' field for BrokerHeartbeatRequest.json, BrokerRegistrationChangeRecord.json and RegisterBrokerRecord.json (apache#18421)
  KAFKA-18414 Remove KRaftRegistrationResult (apache#18401)
  KAFKA-17921 Support SASL_PLAINTEXT protocol with java.security.auth.login.config (apache#17671)
  KAFKA-18384 Remove ZkAlterPartitionManager (apache#18364)
  KAFKA-10790: Add deadlock detection to producer#flush (apache#17946)
  KAFKA-18412: Remove EmbeddedZookeeper (apache#18399)
  MINOR : Improve Exception log in NotEnoughReplicasException(apache#12394)
  MINOR: Improve PlaintextAdminIntegrationTest#testConsumerGroups (apache#18409)
  MINOR: Remove unused local variable (apache#18410)
  MINOR: Remove RaftManager.maybeDeleteMetadataLogDir and AutoTopicCreationManagerTest.scala (apache#17365)
  KAFKA-18368 Remove TestUtils#MockZkConnect and remove zkConnect from TestUtils#createBrokerConfig (apache#18352)
  MINOR: Update Consumer group timeout default to 30 sec (apache#16406)
  MINOR: Fix typo in CommitRequestManager (apache#18407)
  MINOR: cleanup JavaDocs for deprecation warnings (apache#18402)
  KAFKA-18303; Update ShareCoordinator to use new record format (apache#18396)
  MINOR: Update Consumer and Producer JavaDocs for committing offsets (apache#18336)
  KAFKA-16446: Improve controller event duration logging (apache#15622)
  KAFKA-18388 test-kraft-server-start.sh should use log4j2.yaml (apache#18370)
  ...
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>, TaiJuWu <tjwu1217@gmail.com>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>, TaiJuWu <tjwu1217@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients kip Requires or implements a KIP producer small Small PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants