Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class ProduceRequest extends AbstractRequest {
private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;

/**
* V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
* V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
* (See KIP-467)
*/
private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public class ProduceResponse extends AbstractResponse {
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
private static final String ERROR_RECORDS_KEY_NAME = "error_records";
private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset";
private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message";
private static final String RECORD_ERRORS_KEY_NAME = "record_errors";
private static final String BATCH_INDEX_KEY_NAME = "batch_index";
private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message";
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";

private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME,
"The start offset of the log at the time this produce response was created", INVALID_OFFSET);
private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME,
private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME,
"The error message of the record that caused the batch to be dropped");
private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME,
"The global error message summarizing the common root cause of the records that caused the batch to be dropped");
Expand Down Expand Up @@ -160,7 +160,7 @@ public class ProduceResponse extends AbstractResponse {
private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;

/**
* V8 adds error_records and error_message. (see KIP-467)
* V8 adds record_errors and error_message. (see KIP-467)
*/
public static final Schema PRODUCE_RESPONSE_V8 = new Schema(
new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
Expand All @@ -174,11 +174,11 @@ public class ProduceResponse extends AbstractResponse {
"If LogAppendTime is used for the topic, the timestamp will be the broker local " +
"time when the messages are appended."),
LOG_START_OFFSET_FIELD,
new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema(
new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " +
new Field(RECORD_ERRORS_KEY_NAME, new ArrayOf(new Schema(
new Field.Int32(BATCH_INDEX_KEY_NAME, "The batch index of the record " +
"that caused the batch to be dropped"),
RELATIVE_OFFSET_ERROR_MESSAGE_FIELD
)), "The relative offsets of records that caused the batch to be dropped"),
BATCH_INDEX_ERROR_MESSAGE_FIELD
)), "The batch indices of records that caused the batch to be dropped"),
ERROR_MESSAGE_FIELD)))))),
THROTTLE_TIME_MS);

Expand Down Expand Up @@ -225,19 +225,24 @@ public ProduceResponse(Struct struct) {
long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);

Map<Integer, String> errorRecords = new HashMap<>();
if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) {
for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) {
Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage;
Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME);
String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, "");
errorRecords.put(relativeOffset, relativeOffsetErrorMessage);
List<RecordError> recordErrors = Collections.emptyList();
if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
if (recordErrorsArray.length > 0) {
recordErrors = new ArrayList<>(recordErrorsArray.length);
for (Object indexAndMessage : recordErrorsArray) {
Struct indexAndMessageStruct = (Struct) indexAndMessage;
recordErrors.add(new RecordError(
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
));
}
}
}

String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, "");
String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
TopicPartition tp = new TopicPartition(topic, partition);
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage));
responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage));
}
}
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
Expand Down Expand Up @@ -266,19 +271,21 @@ protected Struct toStruct(short version) {
.set(PARTITION_ID, partitionEntry.getKey())
.set(ERROR_CODE, errorCode)
.set(BASE_OFFSET_KEY_NAME, part.baseOffset);
if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);

List<Struct> errorRecords = new ArrayList<>();
for (Map.Entry<Integer, String> recordOffsetAndMessage : part.errorRecords.entrySet()) {
Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME)
.set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey())
.setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue());
errorRecords.add(recordOffsetAndMessageStruct);
List<Struct> recordErrors = Collections.emptyList();
if (!part.recordErrors.isEmpty()) {
recordErrors = new ArrayList<>();
for (RecordError indexAndMessage : part.recordErrors) {
Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME)
.set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex)
.set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message);
recordErrors.add(indexAndMessageStruct);
}
}

partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray());
partStruct.setIfExists(RECORD_ERRORS_KEY_NAME, recordErrors.toArray());

partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
partitionArray.add(partStruct);
Expand Down Expand Up @@ -314,27 +321,27 @@ public static final class PartitionResponse {
public long baseOffset;
public long logAppendTime;
public long logStartOffset;
public Map<Integer, String> errorRecords;
public List<RecordError> recordErrors;
public String errorMessage;

public PartitionResponse(Errors error) {
this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET);
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) {
this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null);
this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyList(), null);
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords) {
this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, "");
public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors) {
this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, null);
}

public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords, String errorMessage) {
public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) {
this.error = error;
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset;
this.errorRecords = errorRecords;
this.recordErrors = recordErrors;
this.errorMessage = errorMessage;
}

Expand All @@ -350,15 +357,35 @@ public String toString() {
b.append(logAppendTime);
b.append(", logStartOffset: ");
b.append(logStartOffset);
b.append(", errorRecords: ");
b.append(errorRecords);
b.append(", recordErrors: ");
b.append(recordErrors);
b.append(", errorMessage: ");
b.append(errorMessage);
if (errorMessage != null) {
b.append(errorMessage);
} else {
b.append("null");
}
b.append('}');
return b.toString();
}
}

public static final class RecordError {
public final int batchIndex;
public final String message;

public RecordError(int batchIndex, String message) {
this.batchIndex = batchIndex;
this.message = message;
}

public RecordError(int batchIndex) {
this.batchIndex = batchIndex;
this.message = null;
}

}

public static ProduceResponse parse(ByteBuffer buffer, short version) {
return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
//
// Version 8 is the same as version 7 (but see KIP-467 for the response changes).
// Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467.
"validVersions": "0-8",
"flexibleVersions": "none",
"fields": [
Expand Down
12 changes: 6 additions & 6 deletions clients/src/main/resources/common/message/ProduceResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
// Version 5 added LogStartOffset to filter out spurious
// OutOfOrderSequenceExceptions on the client.
//
// Version 8 added ErrorRecords and ErrorMessage to include information about
// Version 8 added RecordErrors and ErrorMessage to include information about
// records that cause the whole batch to be dropped. See KIP-467 for details.
"validVersions": "0-8",
"flexibleVersions": "none",
Expand All @@ -49,11 +49,11 @@
"about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." },
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The log start offset." },
{ "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "ignorable": true,
"about": "The relative offsets of records that caused the batch to be dropped", "fields": [
{ "name": "RelativeOffset", "type": "int32", "versions": "8+",
"about": "The relative offset of the record that cause the batch to be dropped" },
{ "name": "RelativeOffsetErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+",
{ "name": "RecordErrors", "type": "[]BatchIndexAndErrorMessage", "versions": "8+", "ignorable": true,
"about": "The batch indices of records that caused the batch to be dropped", "fields": [
{ "name": "BatchIndex", "type": "int32", "versions": "8+",
"about": "The batch index of the record that cause the batch to be dropped" },
{ "name": "BatchIndexErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+",
"about": "The error message of the record that caused the batch to be dropped"}
]},
{ "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ public void testProduceResponseVersions() throws Exception {
int throttleTimeMs = 1234;
long logAppendTimeMs = 1234L;
long logStartOffset = 1234L;
int relativeOffset = 0;
String relativeOffsetErrorMessage = "error message";
int batchIndex = 0;
String batchIndexErrorMessage = "error message";
String errorMessage = "global error message";

testAllMessageRoundTrips(new ProduceResponseData()
Expand All @@ -542,18 +542,18 @@ public void testProduceResponseVersions() throws Exception {
.setBaseOffset(baseOffset)
.setLogAppendTimeMs(logAppendTimeMs)
.setLogStartOffset(logStartOffset)
.setErrorRecords(singletonList(
new ProduceResponseData.RelativeOffsetAndErrorMessage()
.setRelativeOffset(relativeOffset)
.setRelativeOffsetErrorMessage(relativeOffsetErrorMessage)))
.setRecordErrors(singletonList(
new ProduceResponseData.BatchIndexAndErrorMessage()
.setBatchIndex(batchIndex)
.setBatchIndexErrorMessage(batchIndexErrorMessage)))
.setErrorMessage(errorMessage)))))
.setThrottleTimeMs(throttleTimeMs);

for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) {
ProduceResponseData responseData = response.get();

if (version < 8) {
responseData.responses().get(0).partitions().get(0).setErrorRecords(Collections.emptyList());
responseData.responses().get(0).partitions().get(0).setRecordErrors(Collections.emptyList());
responseData.responses().get(0).partitions().get(0).setErrorMessage(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,8 @@ private ProduceResponse createProduceResponse() {
private ProduceResponse createProduceResponseWithErrorMessage() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonMap(0, "error message"), "global error message"));
10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonList(new ProduceResponse.RecordError(0, "error message")),
"global error message"));
return new ProduceResponse(responseData, 0);
}

Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/kafka/common/RecordValidationException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.common
Comment thread
tuvtran marked this conversation as resolved.

import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.requests.ProduceResponse.RecordError

class RecordValidationException(val invalidException: ApiException,
val recordErrors: List[RecordError]) extends RuntimeException {
}
Loading