Skip to content

KAFKA-7077: Use default producer settings in Connect Worker#11475

Merged
showuon merged 1 commit intoapache:trunkfrom
LiamClarkeNZ:KAFKA-7077_KIP-318
Mar 15, 2022
Merged

KAFKA-7077: Use default producer settings in Connect Worker#11475
showuon merged 1 commit intoapache:trunkfrom
LiamClarkeNZ:KAFKA-7077_KIP-318

Conversation

@LiamClarkeNZ
Copy link
Copy Markdown
Contributor

In KAFKA-7077 and the associated KIP-318, there's a desire to a) enable producer idempotence and b) set max in flight requests to 5 to improve throughput, as opposed to the previous hardwired 1.

With the change of producer defaults in Kafka 3.0.0 (acks = all, enable.idempotence = true, max.in.flight.requests.per.connection=5) all this behaviour comes for free if the explicit producer configuration in the Worker is removed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LiamClarkeNZ , thanks for the PR. LGTM. But there's a bug cause the default idempotent producer is not enabled and get fixed recently. (ref: #11691 ) Could you rebase to the latest trunk and trigger jenkins build again?

Also, I was wrong in this comment: https://github.com/apache/kafka/pull/11691/files#r791886361 , I checked again and found there's only KafkaStatusBackingStore class cannot enable idempotent producer because it implement its own retry logic, to include deleted record here.

Anyway, looks like even if we don't have this PR, the kafka connect will use idempotent producer by default (with just 1 in-flight request only). cc @mimaison

…by default, remove explicit setting of acks and max in flight in Connect Worker producer

Signed-off-by: Liam Clarke-Hutchinson <liam@steelsky.co.nz>
@LiamClarkeNZ
Copy link
Copy Markdown
Contributor Author

@LiamClarkeNZ , thanks for the PR. LGTM. But there's a bug cause the default idempotent producer is not enabled and get fixed recently. (ref: #11691 ) Could you rebase to the latest trunk and trigger jenkins build again?

Also, I was wrong in this comment: https://github.com/apache/kafka/pull/11691/files#r791886361 , I checked again and found there's only KafkaStatusBackingStore class cannot enable idempotent producer because it implement its own retry logic, to include deleted record here.

Anyway, looks like even if we don't have this PR, the kafka connect will use idempotent producer by default (with just 1 in-flight request only). cc @mimaison

Thanks @showuon, I have rebased onto trunk and force pushed. I appreciate the review :)

@LiamClarkeNZ LiamClarkeNZ reopened this Mar 4, 2022
@showuon showuon merged commit 76cf7a5 into apache:trunk Mar 15, 2022
@showuon
Copy link
Copy Markdown
Member

showuon commented Mar 15, 2022

Failed tests are unrelated:

Build / ARM / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing
Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing
Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT
Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()

@LiamClarkeNZ LiamClarkeNZ deleted the KAFKA-7077_KIP-318 branch March 18, 2022 01:10
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 21, 2022

@kkonstantine @rhauch Is it OK to enable idempotence by default in this case? Are there any compatibility or behavior concerns?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 21, 2022

@showuon We should add a note to upgrade.html for this (and similar types of changes in the future).

@showuon
Copy link
Copy Markdown
Member

showuon commented Mar 21, 2022

@ijuma , I see. Thanks for the reminder. Will do.
@LiamClarkeNZ , please help add a note in 3.2 notable change in upgrade.html. Thanks

@kkonstantine
Copy link
Copy Markdown
Contributor

kkonstantine commented Mar 22, 2022

Thanks for bring this to our attention @ijuma. This change should be fine from Connect's perspective. acks=all is now implied instead of explicitly defined and idempotence has been enabled since 3.0.0 and we had decided not to override this setting in Connect.

The property that would give someone pause is the value for max.in.flight.requests.per.connection that now is greater than 1. The reassurance that this will work comes from KAFKA-5494 which unfortunately is not mentioned in the original KIP-98 (I see it mentioned in the KIP-318 that is relevant to Connect) and should have been mentioned in this PR, if not in the code.

Having said that, I think it would have been a good idea to leave a comment in the code that would say that Connect requires these configs which are now enabled by default with the idempotent producer.
I also agree that this should go as a notable change in the docs.

@showuon
Copy link
Copy Markdown
Member

showuon commented Mar 22, 2022

@kkonstantine , thanks for the comment. I agree we add comments in the code to say the idempotent producer is enabled by default.
cc @LiamClarkeNZ

Thank you.

@LiamClarkeNZ
Copy link
Copy Markdown
Contributor Author

LiamClarkeNZ commented Mar 22, 2022 via email

@showuon
Copy link
Copy Markdown
Member

showuon commented Mar 22, 2022

Thanks @LiamClarkeNZ !

The wording overall LGTM! Please submit a PR when available. We can comment on the wording there.

Actually, do we need to add similar documentation to the upgrade
documentation for 3.0.1, 3.1.1 also?

Yes, please add them to 3.0.1 and 3.1.1, too.
Thanks.

@LiamClarkeNZ
Copy link
Copy Markdown
Contributor Author

LiamClarkeNZ commented Mar 22, 2022 via email

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 22, 2022

@kkonstantine What is the expected broker version compatibility for connect? If it's older than 0.11, then we cannot rely on idempotence.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 22, 2022

One other thing: the IDEMPOTENT_WRITE ACL is required for the Connect worker principal if the kafka cluster is older than 2.8.

@kkonstantine
Copy link
Copy Markdown
Contributor

kkonstantine commented Mar 22, 2022

@showuon I noticed that you recently disable idempotency for the status backing store. Now that we remove acks=all should we add it there? Is this something we missed with the changes here? And taking an even closer look at the status backing store implementation, I see that retries are handled by that class. So we disable idempotence but are we sure max.in.flight.requests.per.connection is 1 there?

Also, re:

@kkonstantine , thanks for the comment. I agree we add comments in the code to say the idempotent producer is enabled by default.

my intention was actually to add a comment in the code to say what are the configs that we definitely require for the producers in Connect to work as intended. And these are acks=all, max.in.flight.requests.per.connection=1 unless idempotency is enabled given that we have KAFKA-5494 and infinite retries. These requirements were reflected in actual code until this PR was merged. But now they are implied by the defaults of the idempotent producer and that's something I'd like to leave a trail of, so we know what works for Connect.

@kkonstantine
Copy link
Copy Markdown
Contributor

@Ismael you're bringing up a good point, which I missed in your latest comment before I send my recent reply.

In light of the requirement to explicitly add the IDEMPOTENT_WRITE ACL for Connect workers when talking to Kafka brokers older 2.8 (which I wasn't aware of), I'd like to suggest changing course here as follows:

  • Revert KAFKA-7077 from all the branches that has been merged.
  • Return to KIP-318, update it and actually vote for it. @LiamClarkeNZ you referred to this KIP in the description of this PR but I missed that this KIP hasn't been approved and is actually currently marked as inactive. I think we should raise it again after we update it to include all the compatibility requirements and have it target the next major version (4.0).
  • Issue a new PR that will explicitly disable idempotency by default in Connect and will allow users to override the setting via the worker and/or the connector configs like we allow it today.
  • In the same PR, update our docs to say that despite the Kafka producer enabling idempotency by default in 3.0, due to compatibility requirements Connect chooses to disable idempotency for all the producers that instantiates by default.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 22, 2022

That sounds good to me. Let's make sure we include these changes in 3.0.x and 3.1.x as well. For the latter, we still have time to make it for 3.1.1.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Mar 22, 2022

Sounds good to me, too. Thanks, @kkonstantine.

kkonstantine added a commit that referenced this pull request Mar 23, 2022
…11475)" (#11932)

This reverts commit 76cf7a5.

Connect already allows users to enable idempotent producers for connectors and the Connect workers. Although Kafka producers enabled idempotency by default in 3.0, due to compatibility requirements and the fact that [KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent) hasn't been explicitly approved, the changes here are reverted. A separate commit will explicitly disable idempotency in producers instantiated by Connect by default until KIP-318 is approved and scheduled for release.
kkonstantine added a commit that referenced this pull request Mar 23, 2022
…11475)" (#11932)

This reverts commit 76cf7a5.

Connect already allows users to enable idempotent producers for connectors and the Connect workers. Although Kafka producers enabled idempotency by default in 3.0, due to compatibility requirements and the fact that [KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent) hasn't been explicitly approved, the changes here are reverted. A separate commit will explicitly disable idempotency in producers instantiated by Connect by default until KIP-318 is approved and scheduled for release.
@showuon
Copy link
Copy Markdown
Member

showuon commented Mar 23, 2022

Thanks @kkonstantine !

@LiamClarkeNZ
Copy link
Copy Markdown
Contributor Author

LiamClarkeNZ commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants