KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3)#12096
Conversation
…onsManager (round 3) Conceptually, the ordering is defined by the producer id, producer epoch and the sequence number. A given `TopicPartitionEntry` has a single producer id and hence we only need to compare the other two.
showuon
left a comment
There was a problem hiding this comment.
LGTM! Nice improvement! This way could fix the original issue and also avoiding the hashCode collide issue.
| else return Integer.compare(b1.hashCode(), b2.hashCode()); | ||
| }; | ||
| private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = | ||
| Comparator.comparingInt(ProducerBatch::producerEpoch).thenComparingInt(ProducerBatch::baseSequence); |
There was a problem hiding this comment.
Just wondering if we should compare the baseSequence first because most of the time, it's comparing the baseSequence under the same producerEpoch.
There was a problem hiding this comment.
Even though that's true from a performance perspective, it doesn't make sense conceptually. Conceptually, the ordering is defined by producer epoch and then sequence number. So, I think it's best to leave it as is.
There was a problem hiding this comment.
We might as well as add .thenComparingInt(ProducerBatch::hashCode) at the end to mitigate cases when epoch is also the same.
There was a problem hiding this comment.
I don't think that adds value (there are no cases where the hashCode would disambiguate given the way it works right now), so leaving as is.
dajac
left a comment
There was a problem hiding this comment.
LGTM. Thanks for fixing this @ijuma! I assume that the original test of KAFKA-13794 is still enough to validate this change. Am I right?
|
@dajac Yes, the original test still passes with this change, so it shows that this change also fixes the original issue. Creating a test that fails due to a hashCode collision is difficult since the hashCode is basically a random number in this case. |
|
@ijuma Sounds good to me, thanks. |
|
Thanks for fixing it! LGTM |
hachikuji
left a comment
There was a problem hiding this comment.
@ijuma Thanks for the patch. I should have been a little more careful with the initial fix.
Going back to the original bug, the main issue occurs when an inflight batch reaches the delivery timeout. The batch is failed in the loop here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L381. However, it may still be considered inflight by the network client. You can see the discrepancy in the test case that was written: https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java#L719-L720.
When the inflight request ultimately returns, we go through the same failBatch flow which will attempt a (duplicate) removal of the inflight batch from TransactionManager. This is where the bug gets hit. If we happened to have a sequence number which conflicted, then we would end up removing the wrong batch.
Note that at no point here did we actually have batches with different epochs inside inflightBatchesBySequence. I think that is intentional. The logic here basically forces the accumulator to hold onto new batches as long as there are any inflight with an older producerId/epoch: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L532.
Given all of this, I think there are two high-level ways to fix this:
- As in the current patch, we can modify the comparator so that the
removeoperation is safe even when we are trying to remove a batch from a different epoch. I think this works and seems safe. Perhaps a mild criticism is that it obscures the epoch invariant aboutinflightBatchesBySequence. - We can fix
Sender.completeBatchso that we don't go through the same logic twice after we've already failed the batch. My feeling is that this logic is risky and will be prone to future bugs. There's some dangerous stuff going on in this method and it would be nice if we could at least rule out the case when a batch gets completed multiple times.
I'm ok with either option. I tried the second one by adding a batch.isDone check at the top of Sender.completeBatch and confirmed that the new test case succeeds even with the original comparator. I guess I'm somewhat inclined to keep the current patch and address this in a follow-up (for the next release). It would also be nice to tighten up the invariant in inflightBatchesBySequence so that it is documented and clearly enforced. What do you think?
|
Thanks for the careful analysis @hachikuji. We discussed the options offline and agreed to go with the most conservative option for the backports (add the producer id and epoch to the comparator) and do a follow-up in master that changes the |
…onsManager (round 3) (#12096) Conceptually, the ordering is defined by the producer id, producer epoch and the sequence number. This set should generally only have entries for the same producer id and epoch, but there is one case where we can have conflicting `remove` calls and hence we add this as a temporary safe fix. We'll follow-up with a fix that ensures the original intended invariant. Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
| // See https://github.com/apache/kafka/pull/12096#pullrequestreview-955554191 for details. | ||
| private static final Comparator<ProducerBatch> PRODUCER_BATCH_COMPARATOR = | ||
| Comparator.comparingLong(ProducerBatch::producerId) | ||
| .thenComparing(ProducerBatch::producerEpoch) |
There was a problem hiding this comment.
I noticed after merging that this should have been thenComparingInt to avoid unnecessary boxing. I fixed that in the cherry-picks, but it was too late for the master change. I'll fix it in master via #12097.
…onsManager (round 3) (#12096) Conceptually, the ordering is defined by the producer id, producer epoch and the sequence number. This set should generally only have entries for the same producer id and epoch, but there is one case where we can have conflicting `remove` calls and hence we add this as a temporary safe fix. We'll follow-up with a fix that ensures the original intended invariant. Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
|
Merged to master and cherry-picked to 3.1 and 3.2. cc @cadonna @tombentley |
…onsManager (round 3) (apache#12096) Conceptually, the ordering is defined by the producer id, producer epoch and the sequence number. This set should generally only have entries for the same producer id and epoch, but there is one case where we can have conflicting `remove` calls and hence we add this as a temporary safe fix. We'll follow-up with a fix that ensures the original intended invariant. Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
…cs-11-may-2022 * apache-kafka/3.1: (51 commits) MINOR: reload4j build dependency fixes (apache#12144) KAFKA-13255: Use config.properties.exclude when mirroring topics (apache#11401) KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (apache#12096) KAFKA-13794: Follow up to fix producer batch comparator (apache#12006) fix: make sliding window works without grace period (#kafka-13739) (apache#11980) 3.1.1 release notes (apache#12001) KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (apache#11991) KAFKA-13782; Ensure correct partition added to txn after abort on full batch (apache#11995) KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (apache#11908) KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 (apache#11962) ...
Conceptually, the ordering is defined by the producer id, producer epoch
and the sequence number. This set should generally only have entries
for the same producer id and epoch, but there is one case where
we can have conflicting
removecalls and hence we add this asa temporary safe fix.
We'll follow-up with a fix that ensures the original intended invariant.
Committer Checklist (excluded from commit message)