KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message#7167
Conversation
|
Depending on #7150 to be merged |
37f1d53 to
a42ef2f
Compare
| brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() | ||
| error(s"Error processing append operation on partition $topicPartition", t) | ||
| (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) | ||
| (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, List(), ""), Some(t))) |
There was a problem hiding this comment.
I think we can improve this by doing the following:
-
inside LogValidator, when throwing
InvalidTimestampExceptionandInvalidRecordException, encode the relative offset inside the exception object. -
then here we can add that relative offset as a singleton to the response.
By doing this on the client side, we can improve our error messaging such that we only tries to drop that single record but not the whole batch.
|
Ready for another round of reviews @guozhangwang . The failed tests passed on my machine so likely they're flaky |
|
retest this please. |
guozhangwang
left a comment
There was a problem hiding this comment.
ping @hachikuji for another look.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a few comments.
1d3c187 to
4fcfd7a
Compare
|
I have added a new |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a few more comments.
|
retest this please |
28d75a0 to
877cde8
Compare
…ge is not null anymore
|
All tests have run and succeeded. Please let me know what you think about the latest changes @guozhangwang @hachikuji |
|
|
||
| val expectedOffset = expectedInnerOffset.getAndIncrement() | ||
|
|
||
| // inner records offset should always be continuous |
|
Just minor comment, otherwise LGTM! Thanks @tuvtran |
… are record_errors and error_message (#7167) All the changes are in ReplicaManager.appendToLocalLog and ReplicaManager.appendRecords. Also, replaced LogAppendInfo.unknownLogAppendInfoWithLogStartOffset with LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo to include those 2 new fields. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
|
Merged to trunk, also pushed to 2.4 |
|
Thanks @hachikuji @guozhangwang for the reviews! |
…batch does have continuous incremental offsets #7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2. Author: Tu Tran <tu@confluent.io> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> Closes #7628 from tuvtran/KAFKA-9080
…batch does have continuous incremental offsets #7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2. Author: Tu Tran <tu@confluent.io> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> Closes #7628 from tuvtran/KAFKA-9080
…batch does have continuous incremental offsets apache#7167 added a check for non-incremental offsets in `assignOffsetsNonCompressed`, which is not applicable for message format V0 and V1. Therefore, I added a condition to disable the check if the record version precedes V2. Author: Tu Tran <tu@confluent.io> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> Closes apache#7628 from tuvtran/KAFKA-9080
…ds (#7612) Background: Currently, whenever a batch is dropped because ofInvalidRecordException or InvalidTimestampException, only the culprit record appears in ProduceResponse.PartitionResponse.recordErrors. However, after users try to resend that batch excluding the rejected message, the latter records are not guaranteed to be free of problems. Changes: To address this issue, I changed the function signature of validateKey, validateRecord and validateTimestamp to return a Scala's Option object. Specifically, this object will hold the reason/message the current record in iteration fails and leaves to the callers (convertAndAssignOffsetsNonCompressed, assignOffsetsNonCompressed, validateMessagesAndAssignOffsetsCompressed) to gathered all troubling records into one place. Then, all these records will be returned along with the PartitionResponse object. As a result, if a batch contains more than one record errors, users see exactly which records cause the failure. PartitionResponse.recordErrors is a list of RecordError objects introduced by #7167 which include batchIndex denoting the relative position of a record in a batch and message indicating the reason of failure. Gotchas: Things are particularly tricky when a batch has records rejected because of both InvalidRecordException and InvalidTimestampException. In this case, the InvalidTimestampException takes higher precedence. Therefore, the Error field in PartitionResponse will be encoded with INVALID_TIMESTAMP. Reviewers: Guozhang Wang <wangguoz@gmail.com>
All the changes are in
ReplicaManager.appendToLocalLogandReplicaManager.appendRecords. Also, replacedLogAppendInfo. unknownLogAppendInfoWithLogStartOffsetwithLogAppendInfo. unknownLogAppendInfoWithAdditionalInfoto include those 2 new fieldsCommitter Checklist (excluded from commit message)