KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased#2859
KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased#2859lindong28 wants to merge 5 commits intoapache:trunkfrom
Conversation
…h position has already increased
|
In Fetcher.fetchRecords() we iterate over the partition data to collect the ConsumerRecords, after we collect some consumer records from a partition, we advance the position of that partition then move on to the next partition. If the next partition throws exceptions (e.g. OffsetOutOfRangeException), the messages that have already been read out of the buffer will not be delivered to the users. Since the positions of the previous partitions have been be updated, those messages will not be consumed again either. This patch fixes the problem by deferring exception to the next pollOnce() if consumer's fetch position has already increased. Ping @becketqin for review. |
|
Refer to this link for build results (access rights to CI server needed): |
| * the defaultResetPolicy is NONE | ||
| */ | ||
| public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { | ||
| if (nextInLineException != null) { |
There was a problem hiding this comment.
What if the subscription changes and the partition which had exception is not assigned anymore? Or if user seeks to a new position for the partition in question which should no longer throw exception? It seems we need to verify whether the partition state has changed or not before throw the cached exception.
There was a problem hiding this comment.
Good point. I have updated the patch to throw exception only if the partition is fetchable and the subscription position equals the fetched offset of the record that causes the exception.
| } | ||
| } | ||
| } catch (KafkaException e) { | ||
| if (fetched.isEmpty()) { |
There was a problem hiding this comment.
Kafka does not use {} when the if statement has only one line.
| assertEquals(0, fetcherNoAutoReset.fetchedRecords().size()); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Can we add some simple comments. So readers can understand the test more easily?
There was a problem hiding this comment.
Sure. I added the following comment:
"verify the advancement in the next fetch offset equals the number of fetched records when some of the fetched partition cause Exception. This ensures that consumer won't lose record upon exception"
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
@lindong28 The newly added test has failed also. Can you check? |
|
@becketqin I have fixed the newly added tests. I am still investigating why testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission fails randomly. Can you check if the patch addresses the problem you mentioned? |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| private final Deserializer<V> valueDeserializer; | ||
|
|
||
| private PartitionRecords nextInLineRecords = null; | ||
| private ExceptionMetadata nextInLineExceptionMetadata = null; |
There was a problem hiding this comment.
I am wondering that would it be simpler if we just peek the first CompletedFetch in the completedFetches, and only remove it after it is parsed. The logic would be:
- If
parseCompleted(completedFetches.peek())did not throw exception, remove the CompletedFetch. - If
parseCompleted(completedFetches.peek())threw exception andfetched.isEmpty() == false, catch the exception, return the ConsumerRecords that has already been fetched. - If
parseCompleted(completedFetches.peek())threw exception andfetched.isEmpty() == true, catch the exception, remove the first entry incompletedFetchesand re-throw the exception.
This way we don't need to cache any result.
There was a problem hiding this comment.
But I think we still need to cache these to handle the exception thrown from Fetcher.fetchRecords(), which in turn comes from Fetcher.maybeEnsureValid(). What do you think?
There was a problem hiding this comment.
BTW, it is probably OK to cache this exception since we already cache results such as nextInLineRecords.
There was a problem hiding this comment.
If an InvalidRecordException is thrown in Fetcher.fetchRecords() the position of that partition won't advance and no message for that partition would be added to fetched either. So it seems we can apply the same rule there as well?
There was a problem hiding this comment.
Discussed offline. We will keep the fix as is.
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| try { | ||
| consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) | ||
| consumeRecords(consumer) | ||
| consumer.subscribe(singletonList(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener) |
There was a problem hiding this comment.
It seems the test was trying to test the pattern subscription, so we probably want to keep that unchanged.
There was a problem hiding this comment.
Sure. It is fixed now.
| assertEquals(0, fetcherNoAutoReset.fetchedRecords().size()); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Can we add a unit test about subscription change / offset seek after an exception is cached?
| private final Deserializer<V> valueDeserializer; | ||
|
|
||
| private PartitionRecords nextInLineRecords = null; | ||
| private ExceptionMetadata nextInLineExceptionMetadata = null; |
There was a problem hiding this comment.
Discussed offline. We will keep the fix as is.
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Thanks for the patch. LGTM. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Good catch @lindong28. Thanks for fixing! |
|
@hachikuji My pleasure :) |
…h position has already increased Author: Dong Lin <lindong28@gmail.com> Author: Dong Lin <lindong28@users.noreply.github.com> Reviewers: Jiangjie Qin <becket.qin@gmail.com> Closes #2859 from lindong28/KAFKA-5075 This is a backport patch for 0.10.2 after resolving the following conflict. Conflicts: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
No description provided.