KAFKA-8729: Change PartitionResponse to include all troubling records#7612
Merged
guozhangwang merged 4 commits intoapache:trunkfrom Nov 6, 2019
Merged
KAFKA-8729: Change PartitionResponse to include all troubling records#7612guozhangwang merged 4 commits intoapache:trunkfrom
PartitionResponse to include all troubling records#7612guozhangwang merged 4 commits intoapache:trunkfrom
Conversation
PartitionResponse to include all troubling records
Contributor
|
retest this please |
eea3dde to
b4f12d2
Compare
Contributor
Author
|
This PR will likely have some conflicts with #7628 , which is a blocker for 2.4 so I think we should wait for it to be merged first. |
Contributor
Author
|
Hi @guozhangwang , I have merged the changes from |
Contributor
|
LGTM! |
Member
|
Have we verified that this doesn't cause any performance regressions? |
Contributor
Author
|
I’ve been running |
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Background:
Currently, whenever a batch is dropped because of
InvalidRecordExceptionorInvalidTimestampException, only the culprit record appears inProduceResponse.PartitionResponse.recordErrors. However, after users try to resend that batch excluding the rejected message, the latter records are not guaranteed to be free of problems.Changes:
To address this issue, I changed the function signature of
validateKey,validateRecordandvalidateTimestampto return a Scala'sOptionobject. Specifically, this object will hold the reason/message the current record in iteration fails and leaves to the callers (convertAndAssignOffsetsNonCompressed,assignOffsetsNonCompressed,validateMessagesAndAssignOffsetsCompressed) to gathered all troubling records into one place. Then, all these records will be returned along with thePartitionResponseobject. As a result, if a batch contains more than one record errors, users see exactly which records cause the failure.PartitionResponse.recordErrorsis a list ofRecordErrorobjects introduced by #7167 which includebatchIndexdenoting the relative position of a record in a batch andmessageindicating the reason of failure.Gotchas:
Things are particularly tricky when a batch has records rejected because of both
InvalidRecordExceptionandInvalidTimestampException. In this case, theInvalidTimestampExceptiontakes higher precedence. Therefore, theErrorfield inPartitionResponsewill be encoded withINVALID_TIMESTAMP.Committer Checklist (excluded from commit message)