KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionManager#11991
Conversation
|
Hi, @hachikuji, could you please help review this? |
|
Also cc @kirktrue |
hachikuji
left a comment
There was a problem hiding this comment.
Nice find and thanks for the patch! Left a few small comments.
@tombentley This seems like a straightforward fix to consider for 3.1.1. What do you think?
| // responses which are due to the retention period elapsing, and those which are due to actual lost data. | ||
| private long lastAckedOffset; | ||
|
|
||
| private final Comparator<ProducerBatch> producerBatchComparator = (b1, b2) -> { |
There was a problem hiding this comment.
nit: could this be static? It doesn't have any state.
| sender.runOnce(); | ||
| assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); | ||
|
|
||
| time.sleep(10000); // request time out |
There was a problem hiding this comment.
nit: could introduce a variable requestTimeoutMs and drop the comment
| time.sleep(5000); // request time out again | ||
| sender.runOnce(); | ||
| assertTrue(transactionManager.hasInflightBatches(tp0)); // the latter batch failed and retried | ||
| assertTrue(responseFuture1.isDone()); |
There was a problem hiding this comment.
nit: could we move this after line 710? It makes the test a little easier to understand if we see when the first send fails. Also, maybe we could use TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class) to make the timeout expectation explicit?
|
|
||
| time.sleep(5000); // delivery time out | ||
| sender.runOnce(); // expired in accumulator | ||
| assertFalse(transactionManager.hasInFlightRequest()); |
There was a problem hiding this comment.
The behavior here puzzled me a little when I was trying to understand the test. Would a comment like this help?
// The retried request will remain inflight until the request timeout
// is reached even though the delivery timeout has expired and the
// future has completed exceptionally.
|
@hachikuji Thanks for your review! I've addressed your comments and please take a look |
|
I agree we should include this in 3.1.1. Do we know if this has always been like this or if it regressed at some point? |
|
I think it was introduced in #6883, but I'm not sure if it affects all the versions since then. What do you think? @hachikuji |
|
Thanks, looks like it's been there since 2.4.0 then. |
…ctionManager` (#11991) Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match. Reviewers: Jason Gustafson <jason@confluent.io>
…ctionManager` (#11991) Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match. Reviewers: Jason Gustafson <jason@confluent.io>
…ctionManager` (#11991) Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match. Reviewers: Jason Gustafson <jason@confluent.io>
| private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = (b1, b2) -> { | ||
| if (b1.baseSequence() < b2.baseSequence()) return -1; | ||
| else if (b1.baseSequence() > b2.baseSequence()) return 1; | ||
| else return b1.equals(b2) ? 0 : 1; |
There was a problem hiding this comment.
Wouldn't this violate the requirements for the compare method?
The implementor must ensure that sgn(compare(x, y)) == -sgn(compare(y, x)) for all x and y. (This implies that compare(x, y) must throw an exception if and only if compare(y, x) throws an exception.)
The implementor must also ensure that the relation is transitive: ((compare(x, y)>0) && (compare(y, z)>0)) implies compare(x, z)>0.
Objects that are not equal need to have a stable order otherwise, binary search may not find the objects.
There was a problem hiding this comment.
Hi @artemlivshits, thanks for your comment. I don't think it violate the requirements for the compare method since we are comparing two batches using an integer.
As for the stable order, I think it doesn't affect the current code, but I can fix this in another pr if you regard it necessary. What do you think?
There was a problem hiding this comment.
Say we have 2 batches b1 and b2 that have the same base sequence, but are not equal. Then compare(b1, b2) == 1 and compare(b2, b1) == 1, which violates the requirement of changing the sign when the argument order is changed.
This property is used in binary search tree to order and search elements, if it's violated, then we may not find the element because we follow the wrong branch. Say we have some elements in logical order a, b1, b2, x, y when we ordered them we did compare(b2, b1), which returned 1 meaning that b2 is greater than b1. When we search for b1, we may start with b2 and use compare(b1, b2), which would return 1 meaning that b1 is greater than b2, and we continue searching in the x, y part and conclude that it's not there.
There was a problem hiding this comment.
@artemlivshits Thanks for the explanation! I think it make sense. I will open another pr to fix it
…ctionManager` (apache#11991) Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match. Reviewers: Jason Gustafson <jason@confluent.io>
…cs-11-may-2022 * apache-kafka/3.1: (51 commits) MINOR: reload4j build dependency fixes (apache#12144) KAFKA-13255: Use config.properties.exclude when mirroring topics (apache#11401) KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (apache#12096) KAFKA-13794: Follow up to fix producer batch comparator (apache#12006) fix: make sliding window works without grace period (#kafka-13739) (apache#11980) 3.1.1 release notes (apache#12001) KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (apache#11991) KAFKA-13782; Ensure correct partition added to txn after abort on full batch (apache#11995) KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (apache#11908) KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 (apache#11962) ...
…cs-12-may-2022 * apache-kafka/3.0: (14 commits) fix: make sliding window works without grace period (#kafka-13739) (apache#11980) KAFKA-13794: Follow up to fix producer batch comparator (apache#12006) KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (apache#11991) KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (apache#11908) KAFKA-13418: Support key updates with TLS 1.3 (apache#11966) KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (apache#11946) KAFKA-13761: KafkaLog4jAppender deadlocks when idempotence is enabled (apache#11939) KAFKA-13759: Disable idempotence by default in producers instantiated by Connect (apache#11933) MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC` (apache#11915) KAFKA-13750; Client Compatability KafkaTest uses invalid idempotency configs (apache#11909) ...
As described in https://issues.apache.org/jira/browse/KAFKA-13794, producer batches in inflightBatchesBySequence are not being removed correctly. One batch may be removed by another batch with the same sequence number.
This patch defines a comparator explicitly, we can only remove a batch if it equals to the original one.
Committer Checklist (excluded from commit message)