Skip to content

KAFKA-8729, pt 2: Add error_records and error_message to PartitionResponse#7150

Merged
guozhangwang merged 17 commits intoapache:trunkfrom
tuvtran:kip-467-protocol
Oct 1, 2019
Merged

KAFKA-8729, pt 2: Add error_records and error_message to PartitionResponse#7150
guozhangwang merged 17 commits intoapache:trunkfrom
tuvtran:kip-467-protocol

Conversation

@tuvtran
Copy link
Copy Markdown
Contributor

@tuvtran tuvtran commented Aug 1, 2019

As noted in the KIP-467, the updated ProduceResponse is

Produce Response (Version: 8) => [responses] throttle_time_ms
  responses => topic [partition_responses]
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time log_start_offset
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64
      log_start_offset => INT64
      error_records => [INT32]         // new field, encodes the relative offset of the records that caused error
      error_message => STRING          // new field, encodes the error message that client can use to log itself
    throttle_time_ms => INT32

with a new error code:

INVALID_RECORD(86, "Some record has failed the validation on broker and hence be rejected.", InvalidRecordException::new);

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Aug 6, 2019

@guozhangwang could you take a look at this?

@tuvtran tuvtran changed the title KAFKA 8729, pt 2: Add error_records and error_message to PartitionResponse KAFKA-8729, pt 2: Add error_records and error_message to PartitionResponse Aug 28, 2019
@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Sep 3, 2019

retest this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over this PR. Also ping @rayokota @hachikuji for another look.

Comment thread clients/src/main/java/org/apache/kafka/common/record/Record.java Outdated

override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " +
s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]"
s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset, " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to print them here since if the error records / message are ever not null / none there will be no delayer produce generated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

"If LogAppendTime is used for the topic, the timestamp will be the broker local " +
"time when the messages are appended."),
LOG_START_OFFSET_FIELD,
new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32)),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do not have a description string for error records as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Comment thread clients/src/main/resources/common/message/ProduceResponse.json Outdated
@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please

import org.apache.kafka.common.errors.ApiException;

public class InvalidRecordException extends CorruptRecordException {
public class InvalidRecordException extends ApiException {
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Isn't this an incompatible change? Does this exception not get exposed anywhere currently? Maybe it would be better to deprecate common/record/InvalidRecordException, but still leave it around.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think InvalidRecordException was not exposed as a public API before, and now we are moving its package as well to make it a public class.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments. It's worth considering this in the context of KIP-482. I think we would make these fields optional?

Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
@guozhangwang
Copy link
Copy Markdown
Contributor

I'd recommend we do not block this PR on KIP-482, since that is not merged in yet. If KIP-482 did gets in before the release, we can have another PR incorporating the feature and declare those additional fields as optional.

@guozhangwang
Copy link
Copy Markdown
Contributor

Other than @hachikuji 's comments and the failed jenkins test, this PR LGTM.

@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Sep 10, 2019

kafka.api.SaslOAuthBearerSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl is flaky (it passed on my machine).

@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Sep 11, 2019

@guozhangwang I noticed that in DefaultRecordBatch and Fetcher we still use InvalidRecordException to check if a record (batch) is valid or not, so I changed those to use CorruptRecordException

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang I noticed that in DefaultRecordBatch and Fetcher we still use InvalidRecordException to check if a record (batch) is valid or not, so I changed those to use CorruptRecordException

SGTM, throwing and catching CorruptRecordException is a better.

Maybe do a quick call-trace checking to make sure such exception thrown changes does not affect any public facing functions which would then throw different exceptions as well -- I did a quick look and it seems callers / callees are all internal classes, but always better to have another pair of eyes.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit comments, otherwise LGTM. cc @hachikuji

* {@link Errors#NOT_LEADER_FOR_PARTITION} (6)
* {@link Errors#MESSAGE_TOO_LARGE} (10)
* {@link Errors#INVALID_TOPIC} (17)
* {@link Errors#INVALID_TOPIC_EXCEPTION} (17)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to add the _EXCEPTION suffix? Others do not have it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I think it was in this case long time ago, we can fix it in another PR if we want.

]},
{ "name": "ErrorMessage", "type": "string", "versions": "8+", "default": "", "ignorable": true,
"about": "The error message of the records that cause the batch to be dropped"}
"about": "The error message of records that caused the batch to be dropped"}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about The global error message summarizing the common root cause of the records that makes the batch to be dropped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few comments.

Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
Comment thread clients/src/main/resources/common/message/ProduceResponse.json Outdated
"about": "The relative offsets of records that caused the batch to be dropped", "fields": [
{ "name": "RelativeOffset", "type": "int32", "versions": "8+",
"about": "The relative offset of the record that cause the batch to be dropped" },
{ "name": "RelativeOffsetErrorMessage", "type": "string", "versions": "8+",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this to be nullable? Same for ErrorMessage below. We'll need to make a similar change to the old style spec.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is great, but it can't be used programmatically on the client/app, so we'll also need the 16-bit error code here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill I've thought about this when proposing KIP-467 and the reason I did not try to encode the 16 bit error code is that:

  1. At the moment, from all the possible errors that can be returned from the broker, except the InvalidRecordException all others would cause the response to be returned immediately, which means that we would not have more than one error code that would trigger the response to be rejected.

  2. For the only possible InvalidRecordException that may be co-exist with other exceptions in the rejected response, that would cause the error records to be trimmed, whereas all other error codes would still cause the whole batch to be dropped. Therefore we've decided to make the broker-side logic to execute the following: when any non-InvalidRecordException is thrown, since that would cause the whole batch to be dropped we would just return that error code on the per-partition and ignore any other InvalidRecordException; if the only exception thrown is InvalidRecordException on specific record(s), then we encode it on per-partition level.

That being said, if people feel that in the future or even now we would like to change the broker-side logic and it's worth paying the 16bits in addition to 8bit for encoding an empty string, then I'm fine doing that in this PR as well.

@hachikuji @tuvtran @junrao please let me know.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining your reasoning @guozhangwang.

I think the key here is "At the moment": having an error code field in this response would allow any future error code to be added without having to bump the protocol version, more specifically the range of returned error codes for the per-message errors should be left undefined as this is a means for communication between some future message verification mechanism on the broker and the application producing the data, rather than between the broker and the producer.

For space efficiency I'd suggest using a varint, but I doubt it matters in practice since these errors should be rare, and the two bytes of error code are just a fraction of the error message. And the erroneous Produce response is most likely far smaller than the request, or a fetch response for that matter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill Honestly I'm still not convinced of the future usage of per-record error codes -- atm we have not observed concrete error codes that would split within a single batch that would like to be handled differently by clients. If we do find out that become a common scenario to support we can still always bump up the version, which is not a very costly evolution anyways. On the other hand, since we do not have varint in place yet we would still need to encode 2bytes with the empty string: note that the per-record error message would be commonly empty in practice also.

Another thing worth mentioning is that even if we add the fields now, any new error code beyond InvalidRecord that we'd like to add in the future would still require the clients to upgrade because otherwise they cannot recognize them anyways.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Okay, sounds good, I'll rest my case.

Comment thread clients/src/main/resources/common/message/ProduceResponse.json Outdated
Comment thread clients/src/main/resources/common/message/ProduceResponse.json
Comment thread clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java Outdated
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) {
this(error, baseOffset, logAppendTime, logStartOffset, new HashMap<>(), "");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use Collections.emptyMap()?

@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Sep 24, 2019

retest this please

@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Sep 25, 2019

retest this please

@tuvtran
Copy link
Copy Markdown
Contributor Author

tuvtran commented Sep 30, 2019

retest this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, LGTM!

if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can probably do some cleanup in pr.3, e.g. line 269 above can be replaced with setIfExist as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack :)

@guozhangwang guozhangwang merged commit f6f24c4 into apache:trunk Oct 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants