Skip to content

KAFKA-17671: Create better documentation for transactions#17454

Merged
AndrewJSchofield merged 6 commits intoapache:trunkfrom
AndrewJSchofield:KAFKA-17671
Jan 10, 2025
Merged

KAFKA-17671: Create better documentation for transactions#17454
AndrewJSchofield merged 6 commits intoapache:trunkfrom
AndrewJSchofield:KAFKA-17671

Conversation

@AndrewJSchofield
Copy link
Copy Markdown
Member

https://issues.apache.org/jira/browse/KAFKA-17671

This PR adds better documentation for using transactions using the Apache Kafka producer and consumer clients.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions Bot added docs small Small PRs labels Oct 10, 2024
@jolshan jolshan added the transactions Transactions and EOS label Oct 10, 2024
Comment thread docs/design.html Outdated
servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.
<p>
Details on <a href="#producerconfigs">configuration</a> and the <a href="https://kafka.apache.org/{{version}}/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html">api</a> for the producer can be found
Details on <a href="#producerconfigs">configuration</a> and the <a href="https://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html">api</a> for the producer can be found
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we make this change? I don't think we want to link to the 0.8.2 docs (they also seem to be broken)
We want the latest ones right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are entirely correct. It's a merge mistake, that I'll fix immediately.

Comment thread docs/design.html Outdated
but in "read_committed," the consumer will only return messages from transactions which were committed (and any messages which were not part of a transaction).
So what about exactly-once semantics? When consuming from a Kafka topic and producing to another topic (as in a <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a> application), we can
leverage the new transactional producer capabilities in 0.11.0.0 that were mentioned above. The consumer's position is stored as a message in an internal topic, so we can write the offset to Kafka in the
same transaction as the output topics receiving the processed data. If the transaction is aborted, the consumer's stored position will revert to its old value and the produced data on the output topics will not
Copy link
Copy Markdown
Member

@jolshan jolshan Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want to change this line because it is misleading -- the position on the consumer doesn't revert unless the offsets are fetched again.

It is also a little misleading since the offset doesn't really revert in the coordinator -- it is just pending until the transaction is completed (at least as of KIP-447, which may not have been implemented when this was written)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really made very little change to this in the first draft. I'll try to tighten it up and see what you think.

Comment thread docs/design.html
<li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li>
</ol>
<p>
The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the consumer necessarily needs to be read_committed unless it is also reading from a topic written via transactions.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's true but it's a weird transactional application that wants to process aborted data.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also be a topic not written with transactions and in that case, read committed and read uncommitted have the same behavior.

Mostly just calling out it isn't a requirement, though probably a good practice.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems, I did not split hairs above :)

Comment thread docs/design.html
transactions. However, in the event of a transaction abort, the application's in-memory state and in particular the current position of the consumer must be reset explicitly so that it can
reprocess the records processed by the aborted transaction.
<p>
A simple policy for handling exceptions and aborted transactions is to discard and recreate the Kafka producer and consumer objects and start afresh. As part of recreating the consumer, the consumer
Copy link
Copy Markdown
Member

@jolshan jolshan Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are cases where we can safely abort without discarding the producer. KIP-1050 will help make the cases clearer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. That KIP is still not adopted, and I think we would update this documentation once it's there. It certainly helps.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP is now approved, but not yet implemented. :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I have a task to improve this further once we have KIP-1050 implemented.

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Oct 15, 2024

I think most of the documentation makes sense to me. I think my only remaining open question is whether we want to enumerate read committed consumers as a requirement for EOS as per the comment here: #17454 (comment)

I think we may also want to include some followups that go into more detail and perhaps provide another example.

Comment thread docs/design.html Outdated
</li>
<li>
<i>Exactly once</i>&mdash;this is what people actually want, each message is delivered once and only once.
<i>Exactly once</i>&mdash;Each message is delivered once and only once.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not say "delivered" (also further above) -- this term is too close to "delivery semantics at the network layer", -- we still retry internally, and deliver a message more than once (holds true for idempotent producer, as well as aborted TX which are retried, and aborted stuff is just filtered later).

Btw: Further above we says:

let's discuss the semantic guarantees Kafka provides between producer and consumer

This is also misleading, because EOS works for read-process-write, but not for upstream producer to downstream consumer. While the consumer is guarded to not return aborted messages to the app, it's still at-least-once as there is not guarantee that a committed messages is only read exactly-once.

Copy link
Copy Markdown
Member Author

@AndrewJSchofield AndrewJSchofield Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try for "processed" then, but really I think of it as delivered to the application exactly once.

Comment thread docs/design.html
<p>
Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that
replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be
Kafka's semantics are straightforward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed, it will not be lost as long as one broker that
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

straightforward

Sounds like a huge simplification...

Also, for non-transaction, there no such thing as "committed" -- seems we should clarify this better? Why not use "acknowledge" which would be the proper Kafka term?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This term "committed" is exactly the same as used in the reliability guarantees section of Kafka: The Definitive Guide. I'm not inclined to change it as a result.

Comment thread docs/design.html
experiences a network error, it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key.
<p>
Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the
message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resending

Might need to clarify the distinction between producer retries, vs app retires?

Comment thread docs/design.html Outdated
message may be written to the log again during resending if the original request had in fact succeeded. Since 0.11.0.0, the Kafka producer also supports an idempotent delivery option which guarantees that resending
will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message.
Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are.
Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to describe TX as "multi-message multi-topic/partition atomic write". Should we add something like this?

Comment thread docs/design.html Outdated
will not result in duplicate entries in the log. To achieve this, the broker assigns each producer an ID and deduplicates messages using a sequence number that is sent by the producer along with every message.
Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transaction-like semantics: i.e. either all messages are successfully written or none of them are.
Also beginning with 0.11.0.0, the producer supports the ability to send messages to multiple topic partitions using transactional semantics, so that either all messages are successfully written or none of them are.
The main use case for this is exactly-once processing between Kafka topics (described below).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call it "exactly-once processing" -- it's just an "atomic write".

Comment thread docs/design.html Outdated
So effectively Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide
As a result, Kafka supports exactly-once delivery in <a href="https://kafka.apache.org/documentation/streams">Kafka Streams</a>, and the transactional producer/consumer can be used generally to provide
exactly-once delivery when transferring and processing data between Kafka topics. Exactly-once delivery for other destination systems generally requires cooperation with such systems, but Kafka provides the
offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
offset which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows
primitives which makes implementing this feasible (see also <a href="https://kafka.apache.org/documentation/#connect">Kafka Connect</a>). Otherwise, Kafka guarantees at-least-once delivery by default, and allows

Comment thread docs/design.html
<li>The producer uses transactions so that all of the records it produces, and any offsets it updates on behalf of the consumer, are performed atomically.</li>
</ol>
<p>
The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems, I did not split hairs above :)

Comment thread docs/design.html Outdated
<p>
The consumer configuration must include <code>isolation.level=read_committed</code> and <code>enable.auto.commit=false</code>. The producer configuration must set <code>transactional.id</code>
to the name of the transactional ID to be used, which configures the producer for transactional delivery and also makes sure that a restarted application causes any in-flight transaction from
the previous instance to abort. Only the producer has the <code>transactional.id</code> configuration. Strictly speaking, the consumer doesn't have to use read_committed isolation level, but if
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually much more to it... For KS, to do proper fencing in combination with rebalancing, we needed to use a producer-per-task. This is very expensive.

For EOSv2, we move to producer-per-thread model (which is basically described here) for which fencing does not happen on the producer side any longer (it might, be we don't rely on it), but we actually general random transactional.ids and rely on a build it "fencing" mechanism inside the ConsumerGroupCoordinator (in combination with broker side transaction timeouts...)

Not sure how detailed we want to be here, but bottom line is: this description might be over-simplified.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've added another couple of sentences for your delectation. Let me know what you think.

Comment thread docs/design.html
the previous instance to abort. Only the producer has the <code>transactional.id</code> configuration. Strictly speaking, the consumer doesn't have to use read_committed isolation level, but if
it does not, it will see records from aborted transactions and also open transactions which have not yet completed, which seems undesirable if trying to achieve exactly-once.
<p>
Here's an example of a <a href="https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java">transactional message copier</a>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example seems only to work correctly, if useGroupMetadata option is enables... w/o it, EOS cannot be guaranteed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. There's a better example which is in a draft PR as part of KIP-1050. I'd prefer to refer to that, once it's ready to go.

Comment thread docs/design.html Outdated
@AndrewJSchofield
Copy link
Copy Markdown
Member Author

https://issues.apache.org/jira/browse/KAFKA-17855 is the continuation of this work which depends upon KIP-1050.

Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Copy link
Copy Markdown
Member

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to me. We have a few followups on this that we may want to call out.

We may also want to get the +1 from @mjsax before merging :)

@AndrewJSchofield AndrewJSchofield merged commit c6f2276 into apache:trunk Jan 10, 2025
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-17671 branch January 10, 2025 09:55
AndrewJSchofield added a commit that referenced this pull request Jan 10, 2025
Co-authored-by: Matthias J. Sax <mjsax@apache.org>

Reviewers: Matthias Sax <mjsax@apache.org>, Justine Olshan <jolshan@confluent.io>
m1a2st pushed a commit to m1a2st/kafka that referenced this pull request Jan 10, 2025
Co-authored-by: Matthias J. Sax <mjsax@apache.org>

Reviewers: Matthias Sax <mjsax@apache.org>, Justine Olshan <jolshan@confluent.io>
ijuma added a commit to ijuma/kafka that referenced this pull request Jan 10, 2025
…emove-metadata-version-methods-for-versions-older-than-3.0

* apache-github/trunk:
  KAFKA-18340: Change Dockerfile to use log4j2 yaml instead log4j properties (apache#18378)
  MINOR: fix flaky RemoteLogManagerTest#testStopPartitionsWithDeletion (apache#18474)
  KAFKA-18311: Enforcing copartitioned topics (4/N) (apache#18397)
  KAFKA-18308; Update CoordinatorSerde (apache#18455)
  KAFKA-18440: Convert AuthorizationException to fatal error in AdminClient (apache#18435)
  KAFKA-17671: Create better documentation for transactions (apache#17454)
  KAFKA-18304; Introduce json converter generator (apache#18458)
  MINOR: Clean up classic group tests (apache#18473)
  KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): CONTROLLED_SHUTDOWN and ENVELOPE (apache#18422)
  MINOR: improve StreamThread periodic processing log (apache#18430)
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
Co-authored-by: Matthias J. Sax <mjsax@apache.org>

Reviewers: Matthias Sax <mjsax@apache.org>, Justine Olshan <jolshan@confluent.io>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
Co-authored-by: Matthias J. Sax <mjsax@apache.org>

Reviewers: Matthias Sax <mjsax@apache.org>, Justine Olshan <jolshan@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved docs small Small PRs transactions Transactions and EOS

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants