Skip to content

KAFKA-8483/KAFKA-8484; Ensure safe handling of producerId resets#6883

Merged
hachikuji merged 6 commits intoapache:trunkfrom
hachikuji:KAFKA-8483
Jun 12, 2019
Merged

KAFKA-8483/KAFKA-8484; Ensure safe handling of producerId resets#6883
hachikuji merged 6 commits intoapache:trunkfrom
hachikuji:KAFKA-8483

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji commented Jun 5, 2019

The idempotent producer attempts to detect spurious UNKNOWN_PRODUCER_ID errors and handle them by reassigning sequence numbers to the inflight batches. The inflight batches are tracked in a PriorityQueue. The problem is that the reassignment of sequence numbers depends on the iteration order of PriorityQueue, which does not guarantee any ordering. So this can result in sequence numbers being assigned in the wrong order. This patch fixes the problem by using a sorted set instead of a priority queue so that the iteration order preserves the sequence order. Note that resetting sequence numbers is an exceptional case.

This patch also fixes KAFKA-8484, which can cause an IllegalStateException when the producerId is reset while there are pending produce requests inflight. The solution is to ensure that sequence numbers are only reset if the producerId of a failed batch corresponds to the current producerId.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji
Copy link
Copy Markdown
Contributor Author

Note this patch also contains a fix to KAFKA-8484. I can separate them if preferable, but I needed some of the common testing logic.

@hachikuji hachikuji force-pushed the KAFKA-8483 branch 2 times, most recently from 4bd23f9 to d7d712c Compare June 5, 2019 17:58
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jun 8, 2019

It's fine to include both fixes, can we update the PR title and description to mention that?

@ijuma ijuma requested a review from guozhangwang June 8, 2019 17:03
@hachikuji hachikuji changed the title KAFKA-8483; Ensure message ordering is preserved after sequence resets KAFKA-8483/KAFKA-8484; Ensure safe handling of producerId resets Jun 11, 2019
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jun 12, 2019

I think it would be great to include this in 2.3, if possible.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Some minor comments.

}
}

public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify this function is just moved here, no logical changes?

Also if it is indeed the case, could you make some comments when creating the PR for ease of review :) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is moved to simplify the TransactionManager API so that I could write better test cases. Otherwise it was very difficult to hit the cases without effectively rewriting this logic in the test case.

synchronized void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
if (!topicPartitionBookkeeper.contains(batch.topicPartition))
private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
if (!topicPartitionBookkeeper.contains(batch.topicPartition) || !hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why adding the second condition?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the fix for KAFKA-8484. However, it seems redundant after I moved things around. Now handleFailedBatch verifies the producerId and epoch, so I think we can remove it.

}

public synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) {
maybeTransitionToErrorState(exception);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, the caller function moved here seems not changing any logic (the key change is in adjustSequencesDueToFailedBatch) right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main fix is the producerId and epoch check below.

int sequence = 0;
for (ProducerBatch inFlightBatch : topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence) {
private void startSequencesAtBeginning(TopicPartition topicPartition) {
final AtomicInteger sequence = new AtomicInteger(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller function canRetry is synchronized, do we need an atomic integer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was needed only because of the lambda. I guess this is the ugly side of Java 8.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right.

TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);

ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the iterator() code of PriorityQueue, I think three batches are sufficient to expose the randomness of its iterator(). Are there any reasons that you want to have 5, or it's just your favorite magic number?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really; 5 seemed like a sufficient interesting number to catch this bug and any future regressions.

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM, feel free to merge after jenkins green.

@hachikuji hachikuji merged commit af28010 into apache:trunk Jun 12, 2019
hachikuji added a commit that referenced this pull request Jun 12, 2019
The idempotent producer attempts to detect spurious UNKNOWN_PRODUCER_ID errors and handle them by reassigning sequence numbers to the inflight batches. The inflight batches are tracked in a PriorityQueue. The problem is that the reassignment of sequence numbers depends on the iteration order of PriorityQueue, which does not guarantee any ordering. So this can result in sequence numbers being assigned in the wrong order.  This patch fixes the problem by using a sorted set instead of a priority queue so that the iteration order preserves the sequence order. Note that resetting sequence numbers is an exceptional case.

This patch also fixes KAFKA-8484, which can cause an IllegalStateException when the producerId is reset while there are pending produce requests inflight. The solution is to ensure that sequence numbers are only reset if the producerId of a failed batch corresponds to the current producerId.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…che#6883)

The idempotent producer attempts to detect spurious UNKNOWN_PRODUCER_ID errors and handle them by reassigning sequence numbers to the inflight batches. The inflight batches are tracked in a PriorityQueue. The problem is that the reassignment of sequence numbers depends on the iteration order of PriorityQueue, which does not guarantee any ordering. So this can result in sequence numbers being assigned in the wrong order.  This patch fixes the problem by using a sorted set instead of a priority queue so that the iteration order preserves the sequence order. Note that resetting sequence numbers is an exceptional case.

This patch also fixes KAFKA-8484, which can cause an IllegalStateException when the producerId is reset while there are pending produce requests inflight. The solution is to ensure that sequence numbers are only reset if the producerId of a failed batch corresponds to the current producerId.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants