From 9bcf5edb5f32015e4f11335f3340e02ba821fea0 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 1 Aug 2019 11:54:21 -0700 Subject: [PATCH 01/13] Add error_records and error_message to PartitionResponse --- .../clients/consumer/internals/Fetcher.java | 2 +- .../{record => }/InvalidRecordException.java | 6 +-- .../apache/kafka/common/protocol/Errors.java | 4 +- .../record/AbstractLegacyRecordBatch.java | 1 + .../common/record/ControlRecordType.java | 1 + .../kafka/common/record/DefaultRecord.java | 1 + .../common/record/DefaultRecordBatch.java | 1 + .../common/record/EndTransactionMarker.java | 1 + .../kafka/common/record/LegacyRecord.java | 1 + .../apache/kafka/common/record/Record.java | 2 +- .../kafka/common/requests/ProduceRequest.java | 14 +++-- .../common/requests/ProduceResponse.java | 53 ++++++++++++++++++- .../common/message/ProduceRequest.json | 4 +- .../common/message/ProduceResponse.json | 11 +++- .../record/AbstractLegacyRecordBatchTest.java | 1 + .../common/record/DefaultRecordBatchTest.java | 1 + .../common/record/DefaultRecordTest.java | 1 + .../record/EndTransactionMarkerTest.java | 1 + .../kafka/common/record/LegacyRecordTest.java | 1 + .../common/record/SimpleLegacyRecordTest.java | 1 + .../common/requests/ProduceRequestTest.java | 2 +- .../kafka/connect/util/ConnectUtils.java | 2 +- .../connect/runtime/WorkerSourceTaskTest.java | 2 +- core/src/main/scala/kafka/log/Log.scala | 9 +++- .../src/main/scala/kafka/log/LogSegment.scala | 7 +-- .../main/scala/kafka/log/LogValidator.scala | 15 ++++-- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../scala/kafka/server/DelayedProduce.scala | 3 +- .../unit/kafka/log/LogValidatorTest.scala | 1 + .../kafka/server/ProduceRequestTest.scala | 4 +- 30 files changed, 127 insertions(+), 30 deletions(-) rename clients/src/main/java/org/apache/kafka/common/{record => }/InvalidRecordException.java (85%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d4d028d38dc73..87e7da47e1969 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -56,7 +56,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.ControlRecordType; -import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java similarity index 85% rename from clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java rename to clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java index 49f616611589e..4c2815bb3bda5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common; -import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.ApiException; -public class InvalidRecordException extends CorruptRecordException { +public class InvalidRecordException extends ApiException { private static final long serialVersionUID = 1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 9f11fc89adf05..2a3b9c66fb939 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -312,7 +313,8 @@ public enum Errors { EligibleLeadersNotAvailableException::new), ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new), NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.", - NoReassignmentInProgressException::new); + NoReassignmentInProgressException::new), + INVALID_RECORD(86, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index cf38d3123067e..83637640af49d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java index d5ead14df9126..ad41f1d9cd8dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index bf1320ea30c4a..976b5567a3709 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteUtils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 5b53f19dd07b8..e52a818c9d7da 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.ByteBufferOutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java index 726b52a197378..4bf1ebf94a098 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java +++ b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index 482c4a65efc1e..69051d0b3a663 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index ab52befc70b08..37bebd2949d5e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -66,7 +66,7 @@ public interface Record { boolean isValid(); /** - * Raise a {@link org.apache.kafka.common.errors.CorruptRecordException} if the record does not have a valid checksum. + * Raise a {@link org.apache.kafka.common.InvalidRecordException} if the record does not have a valid checksum. */ void ensureValid(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index ad23f3f61d83f..4318addce0af5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.protocol.ApiKeys; @@ -26,7 +27,6 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; @@ -120,9 +120,15 @@ public class ProduceRequest extends AbstractRequest { */ private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; + /** + * V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} + * (See KIP-467) + */ + private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7; + public static Schema[] schemaVersions() { return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, - PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7}; + PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7, PRODUCE_REQUEST_V8}; } public static class Builder extends AbstractRequest.Builder { @@ -337,6 +343,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { case 5: case 6: case 7: + case 8: return new ProduceResponse(responseMap, throttleTimeMs); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -401,7 +408,7 @@ public static void validateRecords(short version, MemoryRecords records) { if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain record batches with magic version 2"); - if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { + if (version < 8 && entry.compressionType() == CompressionType.ZSTD) { throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + "use ZStandard compression"); } @@ -434,6 +441,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) { case 5: case 6: case 7: + case 8: return RecordBatch.MAGIC_VALUE_V2; default: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 7d3e4fed36280..dc5b4e70a73e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -36,6 +36,7 @@ import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; /** @@ -67,14 +68,19 @@ public class ProduceResponse extends AbstractResponse { * INVALID_PRODUCER_EPOCH (47) * CLUSTER_AUTHORIZATION_FAILED (31) * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53) + * INVALID_RECORD (86) */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; + private static final String ERROR_RECORDS_KEY_NAME = "error_records"; + private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); + private static final Field.Str ERROR_MESSAGE_FIELD = new Field.Str(ERROR_MESSAGE_KEY_NAME, + "The error message of the records that cause the batch to be dropped"); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -149,9 +155,28 @@ public class ProduceResponse extends AbstractResponse { */ private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6; + /** + * V8 adds error_records and error_message. (see KIP-467) + */ + public static final Schema PRODUCE_RESPONSE_V8 = new Schema( + new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( + TOPIC_NAME, + new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( + PARTITION_ID, + ERROR_CODE, + new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + + "If LogAppendTime is used for the topic, the timestamp will be the broker local " + + "time when the messages are appended."), + LOG_START_OFFSET_FIELD, + new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32)), + ERROR_MESSAGE_FIELD)))))), + THROTTLE_TIME_MS); + public static Schema[] schemaVersions() { return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, - PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7}; + PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7, PRODUCE_RESPONSE_V8}; } private final Map responses; @@ -190,8 +215,14 @@ public ProduceResponse(Struct struct) { long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); + List errorRecords = new ArrayList<>(); + if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) { + for (Object recordOffset : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) + errorRecords.add((Integer) recordOffset); + } + String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, ""); TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset)); + responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage)); } } this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); @@ -223,6 +254,8 @@ protected Struct toStruct(short version) { if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); + partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, part.errorRecords.toArray()); + partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); } topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); @@ -256,16 +289,28 @@ public static final class PartitionResponse { public long baseOffset; public long logAppendTime; public long logStartOffset; + public List errorRecords; + public String errorMessage; public PartitionResponse(Errors error) { this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET); } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { + this(error, baseOffset, logAppendTime, logStartOffset, new ArrayList<>(), ""); + } + + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords) { + this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, ""); + } + + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords, String errorMessage) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; + this.errorRecords = errorRecords; + this.errorMessage = errorMessage; } @Override @@ -280,6 +325,10 @@ public String toString() { b.append(logAppendTime); b.append(", logStartOffset: "); b.append(logStartOffset); + b.append(", errorRecords: "); + b.append(errorRecords); + b.append(", errorMessage: "); + b.append(errorMessage); b.append('}'); return b.toString(); } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 2da4ed793b295..5a1556d2652d6 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -28,7 +28,9 @@ // Version 5 and 6 are the same as version 3. // // Starting in version 7, records can be produced using ZStandard compression. See KIP-110. - "validVersions": "0-7", + // + // Starting in Version 8, response has ErrorRecords and ErrorMEssage. See KIP-467. + "validVersions": "0-8", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", "about": "The transactional ID, or null if the producer is not transactional." }, diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 0659b4c63db53..4b52391428024 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -27,7 +27,10 @@ // // Version 5 added LogStartOffset to filter out spurious // OutOfOrderSequenceExceptions on the client. - "validVersions": "0-7", + // + // Version 8 added ErrorRecords and ErrorMessage to include information about + // records that cause the whole batch to be dropped + "validVersions": "0-8", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", "about": "Each produce response", "fields": [ @@ -44,7 +47,11 @@ { "name": "LogAppendTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true, "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, - "about": "The log start offset." } + "about": "The log start offset." }, + { "name": "ErrorRecords", "type": "[]int32", "versions": "8+", "default": "", "ignorable": true, + "about": "The relative offset of records that cause the batch to be dropped"}, + { "name": "ErrorMessage", "type": "string", "versions": "8+", "default": "", "ignorable": true, + "about": "The error message of the records that cause the batch to be dropped"} ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java index fe6ffabaf61eb..87811b8bb0d80 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch; import org.apache.kafka.common.utils.Utils; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 34e46adee125f..973def75d8dad 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index 198f9945d3f16..822b3b9c6aff0 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteBufferInputStream; diff --git a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java index 903f674ed1965..8698c7cfca86b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.junit.Test; import java.nio.ByteBuffer; diff --git a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java index 9480c60ca940b..7e4a84414b344 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java index 5f578a873d0dc..2e50097972697 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index 809d64f1e327b..95d719adb3bb7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -17,10 +17,10 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.RecordBatch; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 86ed42e1a604c..e1e4a874035ab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 24a13c2be267d..080280dea2cd5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f8df2115e9aa0..9b4f70634f09e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -1350,7 +1350,12 @@ class Log(@volatile var dir: File, } // check the validity of the message by checking CRC - batch.ensureValid() + try { + batch.ensureValid() + } catch { + case e: InvalidRecordException => + throw new CorruptRecordException(e.getMessage) + } if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4c16291c170ae..31f400db023eb 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -26,6 +26,7 @@ import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils._ +import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ @@ -367,9 +368,9 @@ class LogSegment private[log] (val log: FileRecords, } } } catch { - case e: CorruptRecordException => - warn("Found invalid messages in log segment %s at byte offset %d: %s." - .format(log.file.getAbsolutePath, validBytes, e.getMessage)) + case e@ (_: CorruptRecordException | _: InvalidRecordException) => + warn("Found invalid messages in log segment %s at byte offset %d: %s. %s" + .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause)) } val truncated = log.sizeInBytes - validBytes if (truncated > 0) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 775ea6e8b6a39..336e693a9f4b4 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -22,8 +22,9 @@ import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.LongRef import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.utils.Logging -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier} +import org.apache.kafka.common.InvalidRecordException +import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.utils.Time import scala.collection.{Seq, mutable} @@ -138,8 +139,14 @@ private[kafka] object LogValidator extends Logging { // set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1, // so we depend on the batch-level CRC check in Log.analyzeAndValidateRecords(). For magic v2 and above, // there is no record-level CRC to check. - if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) - record.ensureValid() + if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) { + try { + record.ensureValid() + } catch { + case e: InvalidRecordException => + throw new CorruptRecordException(e.getMessage) + } + } validateKey(record, compactedTopic) validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2dd59445b4cf0..2a5187a6d4cb9 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -39,7 +39,7 @@ import java.util.function.Consumer import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{InvalidRecordException, TopicPartition} import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} import org.apache.kafka.common.requests._ @@ -331,7 +331,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime: CorruptRecordException => + case ime@( _: CorruptRecordException | _: InvalidRecordException) => // we log the error and continue. This ensures two things // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread // down and cause other topic partition to also lag diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 14b6aca23128e..d023ec5b270b4 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -34,7 +34,8 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Partitio @volatile var acksPending = false override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " + - s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]" + s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset, " + + s"error_records: ${responseStatus.errorRecords}, error_message: ${responseStatus.errorMessage}]" } /** diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index e2a8e17cf07ec..1b80298e864b7 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1} import kafka.common.LongRef import kafka.message._ +import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 21644fab4d6ae..93b2f07ea06fe 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -53,6 +53,8 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(Errors.NONE, partitionResponse.error) assertEquals(expectedOffset, partitionResponse.baseOffset) assertEquals(-1, partitionResponse.logAppendTime) + assertTrue(partitionResponse.errorRecords.isEmpty) + assertTrue(partitionResponse.errorMessage.isEmpty) partitionResponse } @@ -133,7 +135,7 @@ class ProduceRequestTest extends BaseRequestTest { // produce request with v7: works fine! val res1 = sendProduceRequest(leader, - new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) + new ProduceRequest.Builder(8, 8, -1, 3000, partitionRecords.asJava, null).build()) val (tp1, partitionResponse1) = res1.responses.asScala.head assertEquals(topicPartition, tp1) assertEquals(Errors.NONE, partitionResponse1.error) From abf342c730c0a063633cd096481fdf7f33a4d9c1 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 1 Aug 2019 15:55:57 -0700 Subject: [PATCH 02/13] Change CorruptCorrupException in LogTest to InvalidRecordException --- core/src/test/scala/unit/kafka/log/LogTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index fa7be7eb88c43..6ec92fe1fea5d 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -30,7 +30,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -1806,19 +1806,19 @@ class LogTest { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: CorruptRecordException => // this is good + case _: InvalidRecordException => // this is good } try { log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: CorruptRecordException => // this is good + case _: InvalidRecordException => // this is good } try { log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: CorruptRecordException => // this is good + case _: InvalidRecordException => // this is good } // the following should succeed without any InvalidMessageException From 460a3b4c485e20a0826ece0c3db1073c2502a136 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 5 Sep 2019 16:42:19 -0700 Subject: [PATCH 03/13] address GH comments --- .../java/org/apache/kafka/common/record/LegacyRecord.java | 6 +++--- .../main/java/org/apache/kafka/common/record/Record.java | 2 +- .../org/apache/kafka/common/requests/ProduceResponse.java | 3 ++- core/src/main/scala/kafka/server/DelayedProduce.scala | 3 +-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index 69051d0b3a663..32c5aa81d7530 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.common.record; -import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.Checksums; @@ -136,11 +136,11 @@ public TimestampType wrapperRecordTimestampType() { */ public void ensureValid() { if (sizeInBytes() < RECORD_OVERHEAD_V0) - throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too " + throw new CorruptRecordException("Record is corrupt (crc could not be retrieved as the record is too " + "small, size = " + sizeInBytes() + ")"); if (!isValid()) - throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum() + throw new CorruptRecordException("Record is corrupt (stored crc = " + checksum() + ", computed crc = " + computeChecksum() + ")"); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 37bebd2949d5e..ab52befc70b08 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -66,7 +66,7 @@ public interface Record { boolean isValid(); /** - * Raise a {@link org.apache.kafka.common.InvalidRecordException} if the record does not have a valid checksum. + * Raise a {@link org.apache.kafka.common.errors.CorruptRecordException} if the record does not have a valid checksum. */ void ensureValid(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index dc5b4e70a73e0..9640bb161242a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -170,7 +170,8 @@ public class ProduceResponse extends AbstractResponse { "If LogAppendTime is used for the topic, the timestamp will be the broker local " + "time when the messages are appended."), LOG_START_OFFSET_FIELD, - new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32)), + new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32), "The relative offset of records" + + "that cause the batch to be dropped"), ERROR_MESSAGE_FIELD)))))), THROTTLE_TIME_MS); diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 12bef9ea11424..6ec20c1d518ff 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -34,8 +34,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Partitio @volatile var acksPending = false override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " + - s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset, " + - s"error_records: ${responseStatus.errorRecords}, error_message: ${responseStatus.errorMessage}]" + s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]" } /** From 61285e4fd3a30418597268e55f5b739469ad8150 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Sat, 7 Sep 2019 17:15:32 -0700 Subject: [PATCH 04/13] change InvalidRecordException to CorruptRecordException in LegacyRecordTest --- .../java/org/apache/kafka/common/record/LegacyRecordTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java index 7e4a84414b344..848f0a3c5f28e 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.record; -import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -94,7 +94,7 @@ public void testChecksum() { try { copy.ensureValid(); fail("Should fail the above test."); - } catch (InvalidRecordException e) { + } catch (CorruptRecordException e) { // this is good } } From 6288ad05e0cd6bc545888d5adb8867b5ec8ef7f7 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 10 Sep 2019 10:56:42 -0700 Subject: [PATCH 05/13] address Jason's comments --- .../kafka/common/requests/ProduceRequest.java | 2 +- .../common/requests/ProduceResponse.java | 36 +++++++++---------- .../common/record/SimpleLegacyRecordTest.java | 5 +-- .../kafka/server/ProduceRequestTest.scala | 2 +- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 4318addce0af5..932473cb869ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -408,7 +408,7 @@ public static void validateRecords(short version, MemoryRecords records) { if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain record batches with magic version 2"); - if (version < 8 && entry.compressionType() == CompressionType.ZSTD) { + if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + "use ZStandard compression"); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 9640bb161242a..6afe8cfd3fec5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -54,21 +54,21 @@ public class ProduceResponse extends AbstractResponse { /** * Possible error code: * - * CORRUPT_MESSAGE (2) - * UNKNOWN_TOPIC_OR_PARTITION (3) - * NOT_LEADER_FOR_PARTITION (6) - * MESSAGE_TOO_LARGE (10) - * INVALID_TOPIC (17) - * RECORD_LIST_TOO_LARGE (18) - * NOT_ENOUGH_REPLICAS (19) - * NOT_ENOUGH_REPLICAS_AFTER_APPEND (20) - * INVALID_REQUIRED_ACKS (21) - * TOPIC_AUTHORIZATION_FAILED (29) - * UNSUPPORTED_FOR_MESSAGE_FORMAT (43) - * INVALID_PRODUCER_EPOCH (47) - * CLUSTER_AUTHORIZATION_FAILED (31) - * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53) - * INVALID_RECORD (86) + * {@link Errors#CORRUPT_MESSAGE} (2) + * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} (3) + * {@link Errors#NOT_LEADER_FOR_PARTITION} (6) + * {@link Errors#MESSAGE_TOO_LARGE} (10) + * {@link Errors#INVALID_TOPIC} (17) + * {@link Errors#RECORD_LIST_TOO_LARGE} (18) + * {@link Errors#NOT_ENOUGH_REPLICAS} (19) + * {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND} (20) + * {@link Errors#INVALID_REQUIRED_ACKS} (21) + * {@link Errors#TOPIC_AUTHORIZATION_FAILED} (29) + * {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} (43) + * {@link Errors#INVALID_PRODUCER_EPOCH} (47) + * {@link Errors#CLUSTER_AUTHORIZATION_FAILED} (31) + * {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} (53) + * {@link Errors#INVALID_RECORD} (86) */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; @@ -80,7 +80,7 @@ public class ProduceResponse extends AbstractResponse { private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); private static final Field.Str ERROR_MESSAGE_FIELD = new Field.Str(ERROR_MESSAGE_KEY_NAME, - "The error message of the records that cause the batch to be dropped"); + "The error message of the records that caused the batch to be dropped"); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -170,8 +170,8 @@ public class ProduceResponse extends AbstractResponse { "If LogAppendTime is used for the topic, the timestamp will be the broker local " + "time when the messages are appended."), LOG_START_OFFSET_FIELD, - new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32), "The relative offset of records" + - "that cause the batch to be dropped"), + new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32), "The relative offset of records " + + "that caused the batch to be dropped"), ERROR_MESSAGE_FIELD)))))), THROTTLE_TIME_MS); diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java index 2e50097972697..cd287bbbf1c5b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; import org.junit.Test; @@ -67,7 +68,7 @@ public void testCompressedIterationWithEmptyRecords() throws Exception { } /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */ - @Test(expected = InvalidRecordException.class) + @Test(expected = CorruptRecordException.class) public void testIsValidWithTooSmallBuffer() { ByteBuffer buffer = ByteBuffer.allocate(2); LegacyRecord record = new LegacyRecord(buffer); @@ -75,7 +76,7 @@ public void testIsValidWithTooSmallBuffer() { record.ensureValid(); } - @Test(expected = InvalidRecordException.class) + @Test(expected = CorruptRecordException.class) public void testIsValidWithChecksumMismatch() { ByteBuffer buffer = ByteBuffer.allocate(4); // set checksum diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index f5facac1c04ed..f3e1ca29eca3e 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -140,7 +140,7 @@ class ProduceRequestTest extends BaseRequestTest { // produce request with v7: works fine! val res1 = sendProduceRequest(leader, - new ProduceRequest.Builder(8, 8, -1, 3000, partitionRecords.asJava, null).build()) + new ProduceRequest.Builder(8, 7, -1, 3000, partitionRecords.asJava, null).build()) val (tp1, partitionResponse1) = res1.responses.asScala.head assertEquals(topicPartition, tp1) assertEquals(Errors.NONE, partitionResponse1.error) From 501a82415f1b591210dbba7ed8fc76c7c272dd06 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 10 Sep 2019 14:49:06 -0700 Subject: [PATCH 06/13] fix minor issue in ProduceRequestTest --- core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index f3e1ca29eca3e..62edadc14fd5e 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -140,7 +140,7 @@ class ProduceRequestTest extends BaseRequestTest { // produce request with v7: works fine! val res1 = sendProduceRequest(leader, - new ProduceRequest.Builder(8, 7, -1, 3000, partitionRecords.asJava, null).build()) + new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) val (tp1, partitionResponse1) = res1.responses.asScala.head assertEquals(topicPartition, tp1) assertEquals(Errors.NONE, partitionResponse1.error) From 4f03549de3b61e49c4d211576a87eb0db7800af7 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Wed, 11 Sep 2019 10:39:09 -0700 Subject: [PATCH 07/13] Change InvalidRecordException to CorruptRecordException in Fetcher and DefaultRecordBatch --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 6 +++--- .../org/apache/kafka/common/record/DefaultRecordBatch.java | 5 +++-- .../apache/kafka/common/record/DefaultRecordBatchTest.java | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index a3834365c96b6..ed80dc4d9b777 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RetriableException; @@ -56,7 +57,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.ControlRecordType; -import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; @@ -1430,7 +1430,7 @@ private void maybeEnsureValid(RecordBatch batch) { if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) { try { batch.ensureValid(); - } catch (InvalidRecordException e) { + } catch (CorruptRecordException e) { throw new KafkaException("Record batch for partition " + partition + " at offset " + batch.baseOffset() + " is invalid, cause: " + e.getMessage()); } @@ -1441,7 +1441,7 @@ private void maybeEnsureValid(Record record) { if (checkCrcs) { try { record.ensureValid(); - } catch (InvalidRecordException e) { + } catch (CorruptRecordException e) { throw new KafkaException("Record for partition " + partition + " at offset " + record.offset() + " is invalid, cause: " + e.getMessage()); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index e52a818c9d7da..6d79b268575ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; @@ -145,11 +146,11 @@ public byte magic() { @Override public void ensureValid() { if (sizeInBytes() < RECORD_BATCH_OVERHEAD) - throw new InvalidRecordException("Record batch is corrupt (the size " + sizeInBytes() + + throw new CorruptRecordException("Record batch is corrupt (the size " + sizeInBytes() + " is smaller than the minimum allowed overhead " + RECORD_BATCH_OVERHEAD + ")"); if (!isValid()) - throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum() + throw new CorruptRecordException("Record is corrupt (stored crc = " + checksum() + ", computed crc = " + computeChecksum() + ")"); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 973def75d8dad..beee10ab77426 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.CloseableIterator; @@ -174,7 +175,7 @@ public void testSizeInBytes() { assertEquals(actualSize, DefaultRecordBatch.sizeInBytes(Arrays.asList(records))); } - @Test(expected = InvalidRecordException.class) + @Test(expected = CorruptRecordException.class) public void testInvalidRecordSize() { MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, @@ -234,7 +235,7 @@ public void testInvalidRecordCountTooLittleCompressedV2() { } } - @Test(expected = InvalidRecordException.class) + @Test(expected = CorruptRecordException.class) public void testInvalidCrc() { MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, From ccb559f848f446d351a59ac5570b907c34e67619 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Mon, 16 Sep 2019 15:06:14 -0700 Subject: [PATCH 08/13] bump the error code to 87 --- .../src/main/java/org/apache/kafka/common/protocol/Errors.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2a3b9c66fb939..22c6dd270ae15 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -314,7 +314,7 @@ public enum Errors { ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new), NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.", NoReassignmentInProgressException::new), - INVALID_RECORD(86, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new); + INVALID_RECORD(87, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); From 3535b69ba3f095f69f366fae996aca3fd6cfb37a Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Wed, 18 Sep 2019 17:06:40 -0700 Subject: [PATCH 09/13] modify error_records from [int32] to map[int32, string] --- .../common/requests/ProduceResponse.java | 41 ++++++++++++------- .../common/message/ProduceResponse.json | 11 +++-- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6afe8cfd3fec5..d7c07180de360 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -36,7 +36,6 @@ import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; /** @@ -58,7 +57,7 @@ public class ProduceResponse extends AbstractResponse { * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} (3) * {@link Errors#NOT_LEADER_FOR_PARTITION} (6) * {@link Errors#MESSAGE_TOO_LARGE} (10) - * {@link Errors#INVALID_TOPIC} (17) + * {@link Errors#INVALID_TOPIC_EXCEPTION} (17) * {@link Errors#RECORD_LIST_TOO_LARGE} (18) * {@link Errors#NOT_ENOUGH_REPLICAS} (19) * {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND} (20) @@ -68,19 +67,23 @@ public class ProduceResponse extends AbstractResponse { * {@link Errors#INVALID_PRODUCER_EPOCH} (47) * {@link Errors#CLUSTER_AUTHORIZATION_FAILED} (31) * {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} (53) - * {@link Errors#INVALID_RECORD} (86) + * {@link Errors#INVALID_RECORD} (87) */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; private static final String ERROR_RECORDS_KEY_NAME = "error_records"; + private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset"; + private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message"; private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); + private static final Field.Str RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.Str(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, + "The error message of the record that caused the batch to be dropped"); private static final Field.Str ERROR_MESSAGE_FIELD = new Field.Str(ERROR_MESSAGE_KEY_NAME, - "The error message of the records that caused the batch to be dropped"); + "The error message of records that caused the batch to be dropped"); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -170,8 +173,11 @@ public class ProduceResponse extends AbstractResponse { "If LogAppendTime is used for the topic, the timestamp will be the broker local " + "time when the messages are appended."), LOG_START_OFFSET_FIELD, - new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32), "The relative offset of records " + - "that caused the batch to be dropped"), + new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema( + new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " + + "that caused the batch to be dropped"), + RELATIVE_OFFSET_ERROR_MESSAGE_FIELD + )), "The relative offsets of records that caused the batch to be dropped"), ERROR_MESSAGE_FIELD)))))), THROTTLE_TIME_MS); @@ -209,6 +215,7 @@ public ProduceResponse(Struct struct) { for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicRespStruct = (Struct) topicResponse; String topic = topicRespStruct.get(TOPIC_NAME); + for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { Struct partRespStruct = (Struct) partResponse; int partition = partRespStruct.get(PARTITION_ID); @@ -216,11 +223,17 @@ public ProduceResponse(Struct struct) { long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - List errorRecords = new ArrayList<>(); + + Map errorRecords = new HashMap<>(); if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) { - for (Object recordOffset : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) - errorRecords.add((Integer) recordOffset); + for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) { + Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage; + Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME); + String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, ""); + errorRecords.put(relativeOffset, relativeOffsetErrorMessage); + } } + String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, ""); TopicPartition tp = new TopicPartition(topic, partition); responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage)); @@ -255,7 +268,7 @@ protected Struct toStruct(short version) { if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, part.errorRecords.toArray()); + partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, part.errorRecords.entrySet().toArray()); partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); } @@ -290,7 +303,7 @@ public static final class PartitionResponse { public long baseOffset; public long logAppendTime; public long logStartOffset; - public List errorRecords; + public Map errorRecords; public String errorMessage; public PartitionResponse(Errors error) { @@ -298,14 +311,14 @@ public PartitionResponse(Errors error) { } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { - this(error, baseOffset, logAppendTime, logStartOffset, new ArrayList<>(), ""); + this(error, baseOffset, logAppendTime, logStartOffset, new HashMap<>(), ""); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords) { + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords) { this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, ""); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords, String errorMessage) { + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords, String errorMessage) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 4b52391428024..6ca1e173f0732 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -48,10 +48,15 @@ "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The log start offset." }, - { "name": "ErrorRecords", "type": "[]int32", "versions": "8+", "default": "", "ignorable": true, - "about": "The relative offset of records that cause the batch to be dropped"}, + { "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "default": "", "ignorable": true, + "about": "The relative offsets of records that caused the batch to be dropped", "fields": [ + { "name": "RelativeOffset", "type": "int32", "versions": "8+", + "about": "The relative offset of the record that cause the batch to be dropped" }, + { "name": "RelativeOffsetErrorMessage", "type": "string", "versions": "8+", + "about": "The error message of the record that caused the batch to be dropped"} + ]}, { "name": "ErrorMessage", "type": "string", "versions": "8+", "default": "", "ignorable": true, - "about": "The error message of the records that cause the batch to be dropped"} + "about": "The error message of records that caused the batch to be dropped"} ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, From 4b830e56318d0239b6844ad93d60c89688167f7b Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 19 Sep 2019 09:54:42 -0700 Subject: [PATCH 10/13] change global error message --- .../java/org/apache/kafka/common/requests/ProduceResponse.java | 2 +- clients/src/main/resources/common/message/ProduceResponse.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index d7c07180de360..862ce4598a9ba 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -83,7 +83,7 @@ public class ProduceResponse extends AbstractResponse { private static final Field.Str RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.Str(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, "The error message of the record that caused the batch to be dropped"); private static final Field.Str ERROR_MESSAGE_FIELD = new Field.Str(ERROR_MESSAGE_KEY_NAME, - "The error message of records that caused the batch to be dropped"); + "The global error message summarizing the common root cause of the records that caused the batch to be dropped"); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 6ca1e173f0732..c4b225a6386b4 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -56,7 +56,7 @@ "about": "The error message of the record that caused the batch to be dropped"} ]}, { "name": "ErrorMessage", "type": "string", "versions": "8+", "default": "", "ignorable": true, - "about": "The error message of records that caused the batch to be dropped"} + "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"} ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, From d34f1b7353f858c85c241b9f2e12c8e573931095 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Mon, 23 Sep 2019 18:36:06 -0700 Subject: [PATCH 11/13] add tests in messageTest and RequestResponseTest and addressed GH's comments --- .../common/requests/ProduceResponse.java | 49 ++++++++----- .../common/message/ProduceResponse.json | 6 +- .../kafka/common/message/MessageTest.java | 72 +++++++++++++++++++ .../common/requests/RequestResponseTest.java | 8 +++ 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 862ce4598a9ba..a6df88002d582 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,21 +54,21 @@ public class ProduceResponse extends AbstractResponse { /** * Possible error code: * - * {@link Errors#CORRUPT_MESSAGE} (2) - * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} (3) - * {@link Errors#NOT_LEADER_FOR_PARTITION} (6) - * {@link Errors#MESSAGE_TOO_LARGE} (10) - * {@link Errors#INVALID_TOPIC_EXCEPTION} (17) - * {@link Errors#RECORD_LIST_TOO_LARGE} (18) - * {@link Errors#NOT_ENOUGH_REPLICAS} (19) - * {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND} (20) - * {@link Errors#INVALID_REQUIRED_ACKS} (21) - * {@link Errors#TOPIC_AUTHORIZATION_FAILED} (29) - * {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} (43) - * {@link Errors#INVALID_PRODUCER_EPOCH} (47) - * {@link Errors#CLUSTER_AUTHORIZATION_FAILED} (31) - * {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} (53) - * {@link Errors#INVALID_RECORD} (87) + * {@link Errors#CORRUPT_MESSAGE} + * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} + * {@link Errors#NOT_LEADER_FOR_PARTITION} + * {@link Errors#MESSAGE_TOO_LARGE} + * {@link Errors#INVALID_TOPIC_EXCEPTION} + * {@link Errors#RECORD_LIST_TOO_LARGE} + * {@link Errors#NOT_ENOUGH_REPLICAS} + * {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND} + * {@link Errors#INVALID_REQUIRED_ACKS} + * {@link Errors#TOPIC_AUTHORIZATION_FAILED} + * {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} + * {@link Errors#INVALID_PRODUCER_EPOCH} + * {@link Errors#CLUSTER_AUTHORIZATION_FAILED} + * {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} + * {@link Errors#INVALID_RECORD} */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; @@ -80,9 +81,9 @@ public class ProduceResponse extends AbstractResponse { private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); - private static final Field.Str RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.Str(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, + private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, "The error message of the record that caused the batch to be dropped"); - private static final Field.Str ERROR_MESSAGE_FIELD = new Field.Str(ERROR_MESSAGE_KEY_NAME, + private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME, "The global error message summarizing the common root cause of the records that caused the batch to be dropped"); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( @@ -268,7 +269,17 @@ protected Struct toStruct(short version) { if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, part.errorRecords.entrySet().toArray()); + + List errorRecords = new ArrayList<>(); + for (Map.Entry recordOffsetAndMessage : part.errorRecords.entrySet()) { + Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME) + .set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey()) + .setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue()); + errorRecords.add(recordOffsetAndMessageStruct); + } + + partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray()); + partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); } @@ -311,7 +322,7 @@ public PartitionResponse(Errors error) { } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { - this(error, baseOffset, logAppendTime, logStartOffset, new HashMap<>(), ""); + this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null); } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords) { diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index c4b225a6386b4..690880d29d17b 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -48,14 +48,14 @@ "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The log start offset." }, - { "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "default": "", "ignorable": true, + { "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "ignorable": true, "about": "The relative offsets of records that caused the batch to be dropped", "fields": [ { "name": "RelativeOffset", "type": "int32", "versions": "8+", "about": "The relative offset of the record that cause the batch to be dropped" }, - { "name": "RelativeOffsetErrorMessage", "type": "string", "versions": "8+", + { "name": "RelativeOffsetErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "about": "The error message of the record that caused the batch to be dropped"} ]}, - { "name": "ErrorMessage", "type": "string", "versions": "8+", "default": "", "ignorable": true, + { "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true, "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"} ]} ]}, diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index bf060cdd161ec..a6fd91246c1c7 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -510,6 +510,78 @@ public void testOffsetFetchVersions() throws Exception { } } + @Test + public void testProduceResponseVersions() throws Exception { + String topicName = "topic"; + int partitionIndex = 0; + short errorCode = Errors.INVALID_TOPIC_EXCEPTION.code(); + long baseOffset = 12L; + int throttleTimeMs = 1234; + long logAppendTimeMs = 1234L; + long logStartOffset= 1234L; + int relativeOffset = 0; + String relativeOffsetErrorMessage = "error message"; + String errorMessage = "global error message"; + + testAllMessageRoundTrips(new ProduceResponseData() + .setResponses(singletonList( + new ProduceResponseData.TopicProduceResponse() + .setName(topicName) + .setPartitions(singletonList( + new ProduceResponseData.PartitionProduceResponse() + .setPartitionIndex(partitionIndex) + .setErrorCode(errorCode) + .setBaseOffset(baseOffset)))))); + + Supplier response = + () -> new ProduceResponseData() + .setResponses(singletonList( + new ProduceResponseData.TopicProduceResponse() + .setName(topicName) + .setPartitions(singletonList( + new ProduceResponseData.PartitionProduceResponse() + .setPartitionIndex(partitionIndex) + .setErrorCode(errorCode) + .setBaseOffset(baseOffset) + .setLogAppendTimeMs(logAppendTimeMs) + .setLogStartOffset(logStartOffset) + .setErrorRecords(singletonList( + new ProduceResponseData.RelativeOffsetAndErrorMessage() + .setRelativeOffset(relativeOffset) + .setRelativeOffsetErrorMessage(relativeOffsetErrorMessage))) + .setErrorMessage(errorMessage))))) + .setThrottleTimeMs(throttleTimeMs); + + for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) { + ProduceResponseData responseData = response.get(); + + if (version < 8) { + responseData.responses().get(0).partitions().get(0).setErrorRecords(Collections.emptyList()); + responseData.responses().get(0).partitions().get(0).setErrorMessage(null); + } + + if (version < 5) { + responseData.responses().get(0).partitions().get(0).setLogStartOffset(-1); + } + + if (version < 2) { + responseData.responses().get(0).partitions().get(0).setLogAppendTimeMs(-1); + } + + if (version < 1) { + responseData.setThrottleTimeMs(0); + } + + if (version >= 3 && version <= 4) { + testAllMessageRoundTripsBetweenVersions(version, (short) 4, responseData, responseData); + } else if (version >= 6 && version <= 7) { + testAllMessageRoundTripsBetweenVersions(version, (short) 7, responseData, responseData); + } else { + testEquivalentMessageRoundTrip(version, responseData); + } + } + } + private void testAllMessageRoundTrips(Message message) throws Exception { testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index d813cbdd9b1ce..a889621b71a34 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -225,6 +225,7 @@ public void testSerialization() throws Exception { checkRequest(createProduceRequest(3), true); checkErrorResponse(createProduceRequest(3), new UnknownServerException(), true); checkResponse(createProduceResponse(), 2, true); + checkResponse(createProduceResponseWithErrorMessage(), 8, true); checkRequest(createStopReplicaRequest(0, true), true); checkRequest(createStopReplicaRequest(0, false), true); checkErrorResponse(createStopReplicaRequest(0, true), new UnknownServerException(), true); @@ -1106,6 +1107,13 @@ private ProduceResponse createProduceResponse() { return new ProduceResponse(responseData, 0); } + private ProduceResponse createProduceResponseWithErrorMessage() { + Map responseData = new HashMap<>(); + responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, + 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonMap(0, "error message"), "global error message")); + return new ProduceResponse(responseData, 0); + } + private StopReplicaRequest createStopReplicaRequest(int version, boolean deletePartitions) { Set partitions = Utils.mkSet(new TopicPartition("test", 0)); return new StopReplicaRequest.Builder((short) version, 0, 1, 0, deletePartitions, partitions).build(); From 74ae7636e8acd3d158ef6ea965dd5d37e1105628 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 24 Sep 2019 09:47:50 -0700 Subject: [PATCH 12/13] fix checkstyle --- .../test/java/org/apache/kafka/common/message/MessageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index a6fd91246c1c7..3106298192cc9 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -518,7 +518,7 @@ public void testProduceResponseVersions() throws Exception { long baseOffset = 12L; int throttleTimeMs = 1234; long logAppendTimeMs = 1234L; - long logStartOffset= 1234L; + long logStartOffset = 1234L; int relativeOffset = 0; String relativeOffsetErrorMessage = "error message"; String errorMessage = "global error message"; From 18deeb45fbcbce5f8d8554ee43aac699aeeb57ea Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 24 Sep 2019 13:36:49 -0700 Subject: [PATCH 13/13] null pointer --- core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 62edadc14fd5e..bedf6ff0f00e8 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -57,7 +57,6 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(expectedOffset, partitionResponse.baseOffset) assertEquals(-1, partitionResponse.logAppendTime) assertTrue(partitionResponse.errorRecords.isEmpty) - assertTrue(partitionResponse.errorMessage.isEmpty) partitionResponse }