From 9bcf5edb5f32015e4f11335f3340e02ba821fea0 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 1 Aug 2019 11:54:21 -0700 Subject: [PATCH 01/16] Add error_records and error_message to PartitionResponse --- .../clients/consumer/internals/Fetcher.java | 2 +- .../{record => }/InvalidRecordException.java | 6 +-- .../apache/kafka/common/protocol/Errors.java | 4 +- .../record/AbstractLegacyRecordBatch.java | 1 + .../common/record/ControlRecordType.java | 1 + .../kafka/common/record/DefaultRecord.java | 1 + .../common/record/DefaultRecordBatch.java | 1 + .../common/record/EndTransactionMarker.java | 1 + .../kafka/common/record/LegacyRecord.java | 1 + .../apache/kafka/common/record/Record.java | 2 +- .../kafka/common/requests/ProduceRequest.java | 14 +++-- .../common/requests/ProduceResponse.java | 53 ++++++++++++++++++- .../common/message/ProduceRequest.json | 4 +- .../common/message/ProduceResponse.json | 11 +++- .../record/AbstractLegacyRecordBatchTest.java | 1 + .../common/record/DefaultRecordBatchTest.java | 1 + .../common/record/DefaultRecordTest.java | 1 + .../record/EndTransactionMarkerTest.java | 1 + .../kafka/common/record/LegacyRecordTest.java | 1 + .../common/record/SimpleLegacyRecordTest.java | 1 + .../common/requests/ProduceRequestTest.java | 2 +- .../kafka/connect/util/ConnectUtils.java | 2 +- .../connect/runtime/WorkerSourceTaskTest.java | 2 +- core/src/main/scala/kafka/log/Log.scala | 9 +++- .../src/main/scala/kafka/log/LogSegment.scala | 7 +-- .../main/scala/kafka/log/LogValidator.scala | 15 ++++-- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../scala/kafka/server/DelayedProduce.scala | 3 +- .../unit/kafka/log/LogValidatorTest.scala | 1 + .../kafka/server/ProduceRequestTest.scala | 4 +- 30 files changed, 127 insertions(+), 30 deletions(-) rename clients/src/main/java/org/apache/kafka/common/{record => }/InvalidRecordException.java (85%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d4d028d38dc73..87e7da47e1969 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -56,7 +56,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.ControlRecordType; -import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java similarity index 85% rename from clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java rename to clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java index 49f616611589e..4c2815bb3bda5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common; -import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.ApiException; -public class InvalidRecordException extends CorruptRecordException { +public class InvalidRecordException extends ApiException { private static final long serialVersionUID = 1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 9f11fc89adf05..2a3b9c66fb939 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -312,7 +313,8 @@ public enum Errors { EligibleLeadersNotAvailableException::new), ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new), NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.", - NoReassignmentInProgressException::new); + NoReassignmentInProgressException::new), + INVALID_RECORD(86, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index cf38d3123067e..83637640af49d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java index d5ead14df9126..ad41f1d9cd8dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index bf1320ea30c4a..976b5567a3709 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteUtils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 5b53f19dd07b8..e52a818c9d7da 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.ByteBufferOutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java index 726b52a197378..4bf1ebf94a098 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java +++ b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index 482c4a65efc1e..69051d0b3a663 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index ab52befc70b08..37bebd2949d5e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -66,7 +66,7 @@ public interface Record { boolean isValid(); /** - * Raise a {@link org.apache.kafka.common.errors.CorruptRecordException} if the record does not have a valid checksum. + * Raise a {@link org.apache.kafka.common.InvalidRecordException} if the record does not have a valid checksum. */ void ensureValid(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index ad23f3f61d83f..4318addce0af5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.protocol.ApiKeys; @@ -26,7 +27,6 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; @@ -120,9 +120,15 @@ public class ProduceRequest extends AbstractRequest { */ private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; + /** + * V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} + * (See KIP-467) + */ + private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7; + public static Schema[] schemaVersions() { return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, - PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7}; + PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7, PRODUCE_REQUEST_V8}; } public static class Builder extends AbstractRequest.Builder { @@ -337,6 +343,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { case 5: case 6: case 7: + case 8: return new ProduceResponse(responseMap, throttleTimeMs); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -401,7 +408,7 @@ public static void validateRecords(short version, MemoryRecords records) { if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain record batches with magic version 2"); - if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { + if (version < 8 && entry.compressionType() == CompressionType.ZSTD) { throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + "use ZStandard compression"); } @@ -434,6 +441,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) { case 5: case 6: case 7: + case 8: return RecordBatch.MAGIC_VALUE_V2; default: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 7d3e4fed36280..dc5b4e70a73e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -36,6 +36,7 @@ import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; /** @@ -67,14 +68,19 @@ public class ProduceResponse extends AbstractResponse { * INVALID_PRODUCER_EPOCH (47) * CLUSTER_AUTHORIZATION_FAILED (31) * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53) + * INVALID_RECORD (86) */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; + private static final String ERROR_RECORDS_KEY_NAME = "error_records"; + private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); + private static final Field.Str ERROR_MESSAGE_FIELD = new Field.Str(ERROR_MESSAGE_KEY_NAME, + "The error message of the records that cause the batch to be dropped"); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -149,9 +155,28 @@ public class ProduceResponse extends AbstractResponse { */ private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6; + /** + * V8 adds error_records and error_message. (see KIP-467) + */ + public static final Schema PRODUCE_RESPONSE_V8 = new Schema( + new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( + TOPIC_NAME, + new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( + PARTITION_ID, + ERROR_CODE, + new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + + "If LogAppendTime is used for the topic, the timestamp will be the broker local " + + "time when the messages are appended."), + LOG_START_OFFSET_FIELD, + new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(INT32)), + ERROR_MESSAGE_FIELD)))))), + THROTTLE_TIME_MS); + public static Schema[] schemaVersions() { return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, - PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7}; + PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7, PRODUCE_RESPONSE_V8}; } private final Map responses; @@ -190,8 +215,14 @@ public ProduceResponse(Struct struct) { long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); + List errorRecords = new ArrayList<>(); + if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) { + for (Object recordOffset : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) + errorRecords.add((Integer) recordOffset); + } + String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, ""); TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset)); + responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage)); } } this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); @@ -223,6 +254,8 @@ protected Struct toStruct(short version) { if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); + partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, part.errorRecords.toArray()); + partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); } topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); @@ -256,16 +289,28 @@ public static final class PartitionResponse { public long baseOffset; public long logAppendTime; public long logStartOffset; + public List errorRecords; + public String errorMessage; public PartitionResponse(Errors error) { this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET); } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { + this(error, baseOffset, logAppendTime, logStartOffset, new ArrayList<>(), ""); + } + + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords) { + this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, ""); + } + + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords, String errorMessage) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; + this.errorRecords = errorRecords; + this.errorMessage = errorMessage; } @Override @@ -280,6 +325,10 @@ public String toString() { b.append(logAppendTime); b.append(", logStartOffset: "); b.append(logStartOffset); + b.append(", errorRecords: "); + b.append(errorRecords); + b.append(", errorMessage: "); + b.append(errorMessage); b.append('}'); return b.toString(); } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 2da4ed793b295..5a1556d2652d6 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -28,7 +28,9 @@ // Version 5 and 6 are the same as version 3. // // Starting in version 7, records can be produced using ZStandard compression. See KIP-110. - "validVersions": "0-7", + // + // Starting in Version 8, response has ErrorRecords and ErrorMEssage. See KIP-467. + "validVersions": "0-8", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", "about": "The transactional ID, or null if the producer is not transactional." }, diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 0659b4c63db53..4b52391428024 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -27,7 +27,10 @@ // // Version 5 added LogStartOffset to filter out spurious // OutOfOrderSequenceExceptions on the client. - "validVersions": "0-7", + // + // Version 8 added ErrorRecords and ErrorMessage to include information about + // records that cause the whole batch to be dropped + "validVersions": "0-8", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", "about": "Each produce response", "fields": [ @@ -44,7 +47,11 @@ { "name": "LogAppendTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true, "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, - "about": "The log start offset." } + "about": "The log start offset." }, + { "name": "ErrorRecords", "type": "[]int32", "versions": "8+", "default": "", "ignorable": true, + "about": "The relative offset of records that cause the batch to be dropped"}, + { "name": "ErrorMessage", "type": "string", "versions": "8+", "default": "", "ignorable": true, + "about": "The error message of the records that cause the batch to be dropped"} ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java index fe6ffabaf61eb..87811b8bb0d80 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch; import org.apache.kafka.common.utils.Utils; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 34e46adee125f..973def75d8dad 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index 198f9945d3f16..822b3b9c6aff0 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteBufferInputStream; diff --git a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java index 903f674ed1965..8698c7cfca86b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.junit.Test; import java.nio.ByteBuffer; diff --git a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java index 9480c60ca940b..7e4a84414b344 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java index 5f578a873d0dc..2e50097972697 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index 809d64f1e327b..95d719adb3bb7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -17,10 +17,10 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.RecordBatch; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 86ed42e1a604c..e1e4a874035ab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 24a13c2be267d..080280dea2cd5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f8df2115e9aa0..9b4f70634f09e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -1350,7 +1350,12 @@ class Log(@volatile var dir: File, } // check the validity of the message by checking CRC - batch.ensureValid() + try { + batch.ensureValid() + } catch { + case e: InvalidRecordException => + throw new CorruptRecordException(e.getMessage) + } if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4c16291c170ae..31f400db023eb 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -26,6 +26,7 @@ import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils._ +import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ @@ -367,9 +368,9 @@ class LogSegment private[log] (val log: FileRecords, } } } catch { - case e: CorruptRecordException => - warn("Found invalid messages in log segment %s at byte offset %d: %s." - .format(log.file.getAbsolutePath, validBytes, e.getMessage)) + case e@ (_: CorruptRecordException | _: InvalidRecordException) => + warn("Found invalid messages in log segment %s at byte offset %d: %s. %s" + .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause)) } val truncated = log.sizeInBytes - validBytes if (truncated > 0) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 775ea6e8b6a39..336e693a9f4b4 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -22,8 +22,9 @@ import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.LongRef import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.utils.Logging -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier} +import org.apache.kafka.common.InvalidRecordException +import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.utils.Time import scala.collection.{Seq, mutable} @@ -138,8 +139,14 @@ private[kafka] object LogValidator extends Logging { // set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1, // so we depend on the batch-level CRC check in Log.analyzeAndValidateRecords(). For magic v2 and above, // there is no record-level CRC to check. - if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) - record.ensureValid() + if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) { + try { + record.ensureValid() + } catch { + case e: InvalidRecordException => + throw new CorruptRecordException(e.getMessage) + } + } validateKey(record, compactedTopic) validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2dd59445b4cf0..2a5187a6d4cb9 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -39,7 +39,7 @@ import java.util.function.Consumer import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{InvalidRecordException, TopicPartition} import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} import org.apache.kafka.common.requests._ @@ -331,7 +331,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime: CorruptRecordException => + case ime@( _: CorruptRecordException | _: InvalidRecordException) => // we log the error and continue. This ensures two things // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread // down and cause other topic partition to also lag diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 14b6aca23128e..d023ec5b270b4 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -34,7 +34,8 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Partitio @volatile var acksPending = false override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " + - s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]" + s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset, " + + s"error_records: ${responseStatus.errorRecords}, error_message: ${responseStatus.errorMessage}]" } /** diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index e2a8e17cf07ec..1b80298e864b7 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1} import kafka.common.LongRef import kafka.message._ +import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 21644fab4d6ae..93b2f07ea06fe 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -53,6 +53,8 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(Errors.NONE, partitionResponse.error) assertEquals(expectedOffset, partitionResponse.baseOffset) assertEquals(-1, partitionResponse.logAppendTime) + assertTrue(partitionResponse.errorRecords.isEmpty) + assertTrue(partitionResponse.errorMessage.isEmpty) partitionResponse } @@ -133,7 +135,7 @@ class ProduceRequestTest extends BaseRequestTest { // produce request with v7: works fine! val res1 = sendProduceRequest(leader, - new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) + new ProduceRequest.Builder(8, 8, -1, 3000, partitionRecords.asJava, null).build()) val (tp1, partitionResponse1) = res1.responses.asScala.head assertEquals(topicPartition, tp1) assertEquals(Errors.NONE, partitionResponse1.error) From abf342c730c0a063633cd096481fdf7f33a4d9c1 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 1 Aug 2019 15:55:57 -0700 Subject: [PATCH 02/16] Change CorruptCorrupException in LogTest to InvalidRecordException --- core/src/test/scala/unit/kafka/log/LogTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index fa7be7eb88c43..6ec92fe1fea5d 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -30,7 +30,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -1806,19 +1806,19 @@ class LogTest { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: CorruptRecordException => // this is good + case _: InvalidRecordException => // this is good } try { log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: CorruptRecordException => // this is good + case _: InvalidRecordException => // this is good } try { log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: CorruptRecordException => // this is good + case _: InvalidRecordException => // this is good } // the following should succeed without any InvalidMessageException From 0d1b51e350bfff251ff5fb51d20f896f5a0d9246 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Fri, 6 Sep 2019 11:43:08 -0700 Subject: [PATCH 03/16] add changes on server side --- core/src/main/scala/kafka/log/Log.scala | 10 +++++++--- core/src/main/scala/kafka/server/ReplicaManager.scala | 5 +++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2d108e57ae169..b6a1718554100 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -52,9 +52,10 @@ object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) - def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = + def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, errorRecords: List[Int], errorMessage: String): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, + offsetsMonotonic = false, -1L, errorRecords, errorMessage) } /** @@ -74,6 +75,7 @@ object LogAppendInfo { * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing * @param lastOffsetOfFirstBatch The last offset of the first batch + ( */ case class LogAppendInfo(var firstOffset: Option[Long], var lastOffset: Long, @@ -87,7 +89,9 @@ case class LogAppendInfo(var firstOffset: Option[Long], shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean, - lastOffsetOfFirstBatch: Long) { + lastOffsetOfFirstBatch: Long, + errorRecords: List[Int] = List(), + errorMessage: String = "") { /** * Get the first offset if it exists, else get the last offset of the first batch * For magic versions 2 and newer, this method will return first offset. For magic versions diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 12cb507d35005..08ce038a0cb2c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -496,7 +496,8 @@ class ReplicaManager(val config: KafkaConfig, topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset - new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status + new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, + result.info.logStartOffset, result.info.errorRecords.map(Int.box).asJava, result.info.errorMessage)) // response status } recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) @@ -794,7 +795,7 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() error(s"Error processing append operation on partition $topicPartition", t) - (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, List(), ""), Some(t))) } } } From c13d8d5bc934c9584249f5537097879476810a41 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 1 Oct 2019 14:00:04 -0700 Subject: [PATCH 04/16] reverse some changes --- .../main/java/org/apache/kafka/common/record/LegacyRecord.java | 1 - .../src/main/java/org/apache/kafka/common/record/Record.java | 2 +- .../java/org/apache/kafka/common/requests/ProduceRequest.java | 2 +- core/src/main/scala/kafka/server/DelayedProduce.scala | 3 +-- core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala | 2 +- 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index 24d21912ed936..32c5aa81d7530 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.utils.ByteBufferOutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 37bebd2949d5e..ab52befc70b08 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -66,7 +66,7 @@ public interface Record { boolean isValid(); /** - * Raise a {@link org.apache.kafka.common.InvalidRecordException} if the record does not have a valid checksum. + * Raise a {@link org.apache.kafka.common.errors.CorruptRecordException} if the record does not have a valid checksum. */ void ensureValid(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 4318addce0af5..932473cb869ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -408,7 +408,7 @@ public static void validateRecords(short version, MemoryRecords records) { if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain record batches with magic version 2"); - if (version < 8 && entry.compressionType() == CompressionType.ZSTD) { + if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + "use ZStandard compression"); } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 12bef9ea11424..6ec20c1d518ff 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -34,8 +34,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Partitio @volatile var acksPending = false override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " + - s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset, " + - s"error_records: ${responseStatus.errorRecords}, error_message: ${responseStatus.errorMessage}]" + s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]" } /** diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index bdba93dd0d935..bedf6ff0f00e8 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -139,7 +139,7 @@ class ProduceRequestTest extends BaseRequestTest { // produce request with v7: works fine! val res1 = sendProduceRequest(leader, - new ProduceRequest.Builder(8, 8, -1, 3000, partitionRecords.asJava, null).build()) + new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) val (tp1, partitionResponse1) = res1.responses.asScala.head assertEquals(topicPartition, tp1) assertEquals(Errors.NONE, partitionResponse1.error) From 37f88cd3e0a7d1042700c5cc98840637eede9af7 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 1 Oct 2019 16:10:23 -0700 Subject: [PATCH 05/16] add errorRecords to InvalidRecordException and InvalidTimestampException --- .../kafka/common/InvalidRecordException.java | 24 ++++++++++++++++++ .../errors/InvalidTimestampException.java | 25 +++++++++++++++++++ core/src/main/scala/kafka/log/Log.scala | 4 +-- .../scala/kafka/server/ReplicaManager.scala | 15 +++++++---- 4 files changed, 61 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java index 4c2815bb3bda5..8361baac4a421 100644 --- a/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java @@ -18,16 +18,40 @@ import org.apache.kafka.common.errors.ApiException; +import java.util.Collections; +import java.util.Map; + public class InvalidRecordException extends ApiException { private static final long serialVersionUID = 1; + // Relative offset of the record that throws this exception + private Map errorRecords = Collections.emptyMap(); + public InvalidRecordException(String s) { super(s); } + public InvalidRecordException(String s, Map errorRecords) { + super(s); + this.errorRecords = errorRecords; + } + public InvalidRecordException(String message, Throwable cause) { super(message, cause); } + public InvalidRecordException(String message, Throwable cause, Map errorRecords) { + super(message, cause); + this.errorRecords = errorRecords; + } + + public void setErrorRecords(Map errorRecords) { + this.errorRecords = errorRecords; + } + + public Map getErrorRecords() { + return errorRecords; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java index 0e3cd929ef77b..dab430850c483 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.errors; +import java.util.Collections; +import java.util.Map; + /** * Indicate the timestamp of a record is invalid. */ @@ -23,11 +26,33 @@ public class InvalidTimestampException extends ApiException { private static final long serialVersionUID = 1L; + // Relative offset of the record that throws this exception + private Map errorRecords = Collections.emptyMap(); + public InvalidTimestampException(String message) { super(message); } + public InvalidTimestampException(String message, Map errorRecords) { + super(message); + this.errorRecords = errorRecords; + } + public InvalidTimestampException(String message, Throwable cause) { super(message, cause); } + + public InvalidTimestampException(String message, Throwable cause, Map errorRecords) { + super(message, cause); + this.errorRecords = errorRecords; + } + + public Map getErrorRecords() { + return errorRecords; + } + + public void setErrorRecords(Map errorRecords) { + this.errorRecords = errorRecords; + } + } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8d30946c806f1..a5085c66af3a1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -52,7 +52,7 @@ object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) - def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, errorRecords: List[Int], errorMessage: String): LogAppendInfo = + def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, errorRecords: scala.collection.Map[java.lang.Integer, String], errorMessage: String): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L, errorRecords, errorMessage) @@ -90,7 +90,7 @@ case class LogAppendInfo(var firstOffset: Option[Long], validBytes: Int, offsetsMonotonic: Boolean, lastOffsetOfFirstBatch: Long, - errorRecords: List[Int] = List(), + errorRecords: scala.collection.Map[java.lang.Integer, String] = scala.collection.Map.empty[java.lang.Integer, String], errorMessage: String = "") { /** * Get the first offset if it exists, else get the last offset of the first batch diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a73c1e47d15c7..091eb6f509abd 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -32,9 +32,7 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.ElectionType -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.Node +import org.apache.kafka.common.{ElectionType, InvalidRecordException, Node, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -500,7 +498,7 @@ class ReplicaManager(val config: KafkaConfig, ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, - result.info.logStartOffset, result.info.errorRecords.map(Int.box).asJava, result.info.errorMessage)) // response status + result.info.logStartOffset, result.info.errorRecords.asJava, result.info.errorMessage)) // response status } recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) @@ -798,7 +796,14 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() error(s"Error processing append operation on partition $topicPartition", t) - (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, List(), ""), Some(t))) + + val errorRecords: Map[java.lang.Integer, String] = t match { + case ire: InvalidRecordException => ire.getErrorRecords.asScala + case ite: InvalidTimestampException => ite.getErrorRecords.asScala + case _ => Map.empty[java.lang.Integer, String] + } + + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, errorRecords, ""), Some(t))) } } } From 8c01ca3590278b8c7d9e522b3b862f0ce528167c Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 1 Oct 2019 16:27:11 -0700 Subject: [PATCH 06/16] add singletonMap to validateRecord and validateTimestamp --- .../main/scala/kafka/log/LogValidator.scala | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 6bba8eb2d769e..0f7a92b0b48a7 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -146,11 +146,13 @@ private[kafka] object LogValidator extends Logging { throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic") } - private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, now: Long, timestampType: TimestampType, - timestampDiffMaxMs: Long, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Unit = { + private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, relativeOffset: Long, now: Long, + timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean, + brokerTopicStats: BrokerTopicStats): Unit = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() - throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.") + throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.", + java.util.Collections.singletonMap(relativeOffset.asInstanceOf[java.lang.Integer], "")) } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -168,7 +170,7 @@ private[kafka] object LogValidator extends Logging { } validateKey(record, topicPartition, compactedTopic, brokerTopicStats) - validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs) + validateTimestamp(batch, record, relativeOffset, now, timestampType, timestampDiffMaxMs) } private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords, @@ -202,8 +204,9 @@ private[kafka] object LogValidator extends Logging { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats) for (record <- batch.asScala) { - validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - builder.appendWithOffset(offsetCounter.getAndIncrement(), record) + val offset = offsetCounter.getAndIncrement() + validateRecord(batch, topicPartition, record, offset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + builder.appendWithOffset(offset, record) } } @@ -244,8 +247,8 @@ private[kafka] object LogValidator extends Logging { var offsetOfMaxBatchTimestamp = -1L for (record <- batch.asScala) { - validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) val offset = offsetCounter.getAndIncrement() + validateRecord(batch, topicPartition, record, offset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp offsetOfMaxBatchTimestamp = offset @@ -353,12 +356,12 @@ private[kafka] object LogValidator extends Logging { if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + s"compression attribute set: $record") - validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + val expectedOffset = expectedInnerOffset.getAndIncrement() + validateRecord(batch, topicPartition, record, expectedOffset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) uncompressedSizeInBytes += record.sizeInBytes() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { // inner records offset should always be continuous - val expectedOffset = expectedInnerOffset.getAndIncrement() if (record.offset != expectedOffset) { brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() throw new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition.") @@ -469,6 +472,7 @@ private[kafka] object LogValidator extends Logging { */ private def validateTimestamp(batch: RecordBatch, record: Record, + relativeOffset: Long, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long): Unit = { @@ -476,10 +480,12 @@ private[kafka] object LogValidator extends Logging { && record.timestamp != RecordBatch.NO_TIMESTAMP && math.abs(record.timestamp - now) > timestampDiffMaxMs) throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + - s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]") + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]", + java.util.Collections.singletonMap(relativeOffset.asInstanceOf[java.lang.Integer], "")) if (batch.timestampType == TimestampType.LOG_APPEND_TIME) throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + - s"timestamp type to LogAppendTime.") + s"timestamp type to LogAppendTime.", + java.util.Collections.singletonMap(relativeOffset.asInstanceOf[java.lang.Integer], "")) } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, From ebb524a96fc35c13ab63ff41b0df3dab2f683de0 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Wed, 2 Oct 2019 13:43:25 -0700 Subject: [PATCH 07/16] add two unit tests --- .../main/scala/kafka/log/LogValidator.scala | 6 +-- .../unit/kafka/log/LogValidatorTest.scala | 46 +++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 0f7a92b0b48a7..da038259598b9 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -152,7 +152,7 @@ private[kafka] object LogValidator extends Logging { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.", - java.util.Collections.singletonMap(relativeOffset.asInstanceOf[java.lang.Integer], "")) + java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -481,11 +481,11 @@ private[kafka] object LogValidator extends Logging { && math.abs(record.timestamp - now) > timestampDiffMaxMs) throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]", - java.util.Collections.singletonMap(relativeOffset.asInstanceOf[java.lang.Integer], "")) + java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) if (batch.timestampType == TimestampType.LOG_APPEND_TIME) throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + s"timestamp type to LogAppendTime.", - java.util.Collections.singletonMap(relativeOffset.asInstanceOf[java.lang.Integer], "")) + java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7fd13210a8ccc..ce6e5a912eee5 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -1250,6 +1250,52 @@ class LogValidatorTest { testBatchWithoutRecordsNotAllowed(NoCompressionCodec, DefaultCompressionCodec) } + @Test + def testInvalidTimestampExceptionHasRelativeOffset(): Unit = { + val now = System.currentTimeMillis() + val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, + codec = CompressionType.GZIP) + try { + LogValidator.validateMessagesAndAssignOffsets( + records, + topicPartition, + offsetCounter = new LongRef(0), + time = time, + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + magic = RecordBatch.MAGIC_VALUE_V1, + compactedTopic = false, + timestampType = TimestampType.CREATE_TIME, + timestampDiffMaxMs = 1000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion, + brokerTopicStats = brokerTopicStats) + } catch { + case e: InvalidTimestampException => + assertTrue(!e.getErrorRecords.isEmpty) + assertEquals(e.getErrorRecords.keySet.size, 1) + assertTrue(e.getErrorRecords.containsKey(0)); + assertEquals(e.getErrorRecords.get(0), "") + } + } + + @Test + def testInvalidRecordExceptionHasRelativeOffset(): Unit = { + try { + validateMessages(recordsWithInvalidInnerMagic( + RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP), + RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) + } catch { + case e: InvalidRecordException => + assertTrue(!e.getErrorRecords.isEmpty) + assertEquals(e.getErrorRecords.keySet.size, 1) + assertTrue(e.getErrorRecords.containsKey(0)) + assertEquals(e.getErrorRecords.get(0), "") + } + } + private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = { val offset = 1234567 val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = From a20df44f5942dcfe48e3d5ee19d48b3f85eec313 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Wed, 2 Oct 2019 16:10:34 -0700 Subject: [PATCH 08/16] add test for producing with invalid timestamp --- .../main/scala/kafka/log/LogValidator.scala | 12 ++++--- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../unit/kafka/log/LogValidatorTest.scala | 4 +-- .../kafka/server/ProduceRequestTest.scala | 33 +++++++++++++++++++ 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index da038259598b9..4eb207ec4a672 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -169,7 +169,7 @@ private[kafka] object LogValidator extends Logging { } } - validateKey(record, topicPartition, compactedTopic, brokerTopicStats) + validateKey(record, relativeOffset, topicPartition, compactedTopic, brokerTopicStats) validateTimestamp(batch, record, relativeOffset, now, timestampType, timestampDiffMaxMs) } @@ -353,10 +353,11 @@ private[kafka] object LogValidator extends Logging { try { for (record <- batch.asScala) { + val expectedOffset = expectedInnerOffset.getAndIncrement() if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record") - val expectedOffset = expectedInnerOffset.getAndIncrement() + s"compression attribute set: $record", + java.util.Collections.singletonMap(expectedOffset.toInt.asInstanceOf[java.lang.Integer], "")) validateRecord(batch, topicPartition, record, expectedOffset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) uncompressedSizeInBytes += record.sizeInBytes() @@ -459,10 +460,11 @@ private[kafka] object LogValidator extends Logging { recordConversionStats = recordConversionStats) } - private def validateKey(record: Record, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { + private def validateKey(record: Record, relativeOffset: Long, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { if (compactedTopic && !record.hasKey) { brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark() - throw new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition.") + throw new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition.", + java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 091eb6f509abd..45961ed8cd05a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -785,8 +785,7 @@ class ReplicaManager(val config: KafkaConfig, _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | - _: KafkaStorageException | - _: InvalidTimestampException) => + _: KafkaStorageException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => val logStartOffset = getPartition(topicPartition) match { diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index ce6e5a912eee5..82d1ff60babba 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -1275,7 +1275,7 @@ class LogValidatorTest { } catch { case e: InvalidTimestampException => assertTrue(!e.getErrorRecords.isEmpty) - assertEquals(e.getErrorRecords.keySet.size, 1) + assertEquals(e.getErrorRecords.size, 1) assertTrue(e.getErrorRecords.containsKey(0)); assertEquals(e.getErrorRecords.get(0), "") } @@ -1290,7 +1290,7 @@ class LogValidatorTest { } catch { case e: InvalidRecordException => assertTrue(!e.getErrorRecords.isEmpty) - assertEquals(e.getErrorRecords.keySet.size, 1) + assertEquals(e.getErrorRecords.size, 1) assertTrue(e.getErrorRecords.containsKey(0)) assertEquals(e.getErrorRecords.get(0), "") } diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index bedf6ff0f00e8..ff81db062dfcc 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -17,6 +17,7 @@ package kafka.server +import java.nio.ByteBuffer import java.util.Properties import com.yammer.metrics.Metrics @@ -68,6 +69,38 @@ class ProduceRequestTest extends BaseRequestTest { new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } + @Test + def testProduceWithInvalidTimestamp(): Unit = { + val topic = "topic" + val partition = 0 + val topicConfig = new Properties + topicConfig.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000") + val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig) + val leader = partitionToLeader(partition) + + def createRecords(magicValue: Byte, + timestamp: Long = RecordBatch.NO_TIMESTAMP, + codec: CompressionType): MemoryRecords = { + val buf = ByteBuffer.allocate(512) + val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) + builder.appendWithOffset(0, timestamp, null, "hello".getBytes) + builder.appendWithOffset(1, timestamp, null, "there".getBytes) + builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes) + builder.build() + } + + val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP) + val topicPartition = new TopicPartition("topic", partition) + val partitionRecords = Map(topicPartition -> records) + val produceResponse = sendProduceRequest(leader, ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()) + val (tp, partitionResponse) = produceResponse.responses.asScala.head + assertEquals(topicPartition, tp) + assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) + assertEquals(1, partitionResponse.errorRecords.size()) + assertTrue(partitionResponse.errorRecords.containsKey(0)) + assertEquals("", partitionResponse.errorRecords.get(0)) + } + @Test def testProduceToNonReplica(): Unit = { val topic = "topic" From cf6d29dd9360f1223a5ff677e11905e418f676f2 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Wed, 2 Oct 2019 16:38:19 -0700 Subject: [PATCH 09/16] set -> setIfExists --- .../java/org/apache/kafka/common/requests/ProduceResponse.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index a6df88002d582..fad74ab7bde76 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -267,7 +267,7 @@ protected Struct toStruct(short version) { .set(ERROR_CODE, errorCode) .set(BASE_OFFSET_KEY_NAME, part.baseOffset); if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) - partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); + partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); List errorRecords = new ArrayList<>(); From dc31c7962c12bd4ab8010af6cde0d0f1cc95049d Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Mon, 7 Oct 2019 14:46:54 -0700 Subject: [PATCH 10/16] address GH comments --- .../kafka/common/InvalidRecordException.java | 18 ++++---- .../errors/InvalidTimestampException.java | 19 ++++---- .../common/requests/ProduceResponse.java | 46 +++++++++++++------ .../common/requests/RequestResponseTest.java | 3 +- .../common/RecordValidationException.scala | 8 ++++ core/src/main/scala/kafka/log/Log.scala | 12 +++-- .../main/scala/kafka/log/LogValidator.scala | 33 +++++++------ .../scala/kafka/server/ReplicaManager.scala | 37 ++++++++------- .../kafka/server/ProduceRequestTest.scala | 3 +- 9 files changed, 112 insertions(+), 67 deletions(-) create mode 100644 core/src/main/scala/kafka/common/RecordValidationException.scala diff --git a/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java index 8361baac4a421..cb04c36ae5fec 100644 --- a/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java @@ -17,40 +17,40 @@ package org.apache.kafka.common; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.requests.ProduceResponse; import java.util.Collections; -import java.util.Map; +import java.util.List; +// TODO: remove errorRecords public class InvalidRecordException extends ApiException { private static final long serialVersionUID = 1; // Relative offset of the record that throws this exception - private Map errorRecords = Collections.emptyMap(); + private final List errorRecords; public InvalidRecordException(String s) { super(s); + this.errorRecords = Collections.emptyList(); } - public InvalidRecordException(String s, Map errorRecords) { + public InvalidRecordException(String s, List errorRecords) { super(s); this.errorRecords = errorRecords; } public InvalidRecordException(String message, Throwable cause) { super(message, cause); + this.errorRecords = Collections.emptyList(); } - public InvalidRecordException(String message, Throwable cause, Map errorRecords) { + public InvalidRecordException(String message, Throwable cause, List errorRecords) { super(message, cause); this.errorRecords = errorRecords; } - public void setErrorRecords(Map errorRecords) { - this.errorRecords = errorRecords; - } - - public Map getErrorRecords() { + public List getErrorRecords() { return errorRecords; } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java index dab430850c483..8e8d7ec93be56 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java @@ -16,43 +16,44 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.common.requests.ProduceResponse; + import java.util.Collections; -import java.util.Map; +import java.util.List; /** * Indicate the timestamp of a record is invalid. */ +// TODO: remove errorRecords public class InvalidTimestampException extends ApiException { private static final long serialVersionUID = 1L; // Relative offset of the record that throws this exception - private Map errorRecords = Collections.emptyMap(); + private final List errorRecords; public InvalidTimestampException(String message) { super(message); + this.errorRecords = Collections.emptyList(); } - public InvalidTimestampException(String message, Map errorRecords) { + public InvalidTimestampException(String message, List errorRecords) { super(message); this.errorRecords = errorRecords; } public InvalidTimestampException(String message, Throwable cause) { super(message, cause); + this.errorRecords = Collections.emptyList(); } - public InvalidTimestampException(String message, Throwable cause, Map errorRecords) { + public InvalidTimestampException(String message, Throwable cause, List errorRecords) { super(message, cause); this.errorRecords = errorRecords; } - public Map getErrorRecords() { + public List getErrorRecords() { return errorRecords; } - public void setErrorRecords(Map errorRecords) { - this.errorRecords = errorRecords; - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index fad74ab7bde76..818e28f68fa81 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -225,13 +225,13 @@ public ProduceResponse(Struct struct) { long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - Map errorRecords = new HashMap<>(); + List errorRecords = new ArrayList<>(); if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) { for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) { Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage; - Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME); - String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, ""); - errorRecords.put(relativeOffset, relativeOffsetErrorMessage); + errorRecords.add(new ErrorRecord( + recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME), + recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, ""))); } } @@ -266,15 +266,14 @@ protected Struct toStruct(short version) { .set(PARTITION_ID, partitionEntry.getKey()) .set(ERROR_CODE, errorCode) .set(BASE_OFFSET_KEY_NAME, part.baseOffset); - if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) - partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); + partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); List errorRecords = new ArrayList<>(); - for (Map.Entry recordOffsetAndMessage : part.errorRecords.entrySet()) { + for (ErrorRecord recordOffsetAndMessage : part.errorRecords) { Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME) - .set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey()) - .setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue()); + .set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getRelativeOffset()) + .setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getMessage()); errorRecords.add(recordOffsetAndMessageStruct); } @@ -314,7 +313,7 @@ public static final class PartitionResponse { public long baseOffset; public long logAppendTime; public long logStartOffset; - public Map errorRecords; + public List errorRecords; public String errorMessage; public PartitionResponse(Errors error) { @@ -322,14 +321,14 @@ public PartitionResponse(Errors error) { } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { - this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null); + this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyList(), null); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords) { - this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, ""); + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords) { + this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, null); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords, String errorMessage) { + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords, String errorMessage) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; @@ -359,6 +358,25 @@ public String toString() { } } + public static final class ErrorRecord { + private final int relativeOffset; + private final String message; + + public ErrorRecord(int relativeOffset, String message) { + this.relativeOffset = relativeOffset; + this.message = message; + } + + public String getMessage() { + return message; + } + + public int getRelativeOffset() { + return relativeOffset; + } + + } + public static ProduceResponse parse(ByteBuffer buffer, short version) { return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer)); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 99172408d65ed..a527e02cf8333 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1122,7 +1122,8 @@ private ProduceResponse createProduceResponse() { private ProduceResponse createProduceResponseWithErrorMessage() { Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonMap(0, "error message"), "global error message")); + 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonList(new ProduceResponse.ErrorRecord(0, "error message")), + "global error message")); return new ProduceResponse(responseData, 0); } diff --git a/core/src/main/scala/kafka/common/RecordValidationException.scala b/core/src/main/scala/kafka/common/RecordValidationException.scala new file mode 100644 index 0000000000000..978701bf7d19d --- /dev/null +++ b/core/src/main/scala/kafka/common/RecordValidationException.scala @@ -0,0 +1,8 @@ +package kafka.common + +import org.apache.kafka.common.errors.ApiException +import org.apache.kafka.common.requests.ProduceResponse.ErrorRecord + +class RecordValidationException(val invalidException: ApiException, + val errorRecords: List[ErrorRecord]) extends RuntimeException { +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a5085c66af3a1..40f6c89bf563a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.apache.kafka.common.requests.ProduceResponse.ErrorRecord import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} @@ -52,7 +53,12 @@ object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) - def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, errorRecords: scala.collection.Map[java.lang.Integer, String], errorMessage: String): LogAppendInfo = + def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = + LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, + offsetsMonotonic = false, -1L) + + def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, errorRecords: List[ErrorRecord], errorMessage: String): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L, errorRecords, errorMessage) @@ -90,8 +96,8 @@ case class LogAppendInfo(var firstOffset: Option[Long], validBytes: Int, offsetsMonotonic: Boolean, lastOffsetOfFirstBatch: Long, - errorRecords: scala.collection.Map[java.lang.Integer, String] = scala.collection.Map.empty[java.lang.Integer, String], - errorMessage: String = "") { + errorRecords: List[ErrorRecord] = List(), + errorMessage: String = null) { /** * Get the first offset if it exists, else get the last offset of the first batch * For magic versions 2 and newer, this method will return first offset. For magic versions diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 4eb207ec4a672..89009360e1d0c 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -19,7 +19,7 @@ package kafka.log import java.nio.ByteBuffer import kafka.api.{ApiVersion, KAFKA_2_1_IV0} -import kafka.common.LongRef +import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.BrokerTopicStats import kafka.utils.Logging @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampE import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.ProduceResponse.ErrorRecord import org.apache.kafka.common.utils.Time import scala.collection.{Seq, mutable} @@ -151,8 +152,9 @@ private[kafka] object LogValidator extends Logging { brokerTopicStats: BrokerTopicStats): Unit = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() - throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.", - java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) + throw new RecordValidationException( + new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."), + List(new ErrorRecord(relativeOffset.toInt, null))) } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -355,9 +357,9 @@ private[kafka] object LogValidator extends Logging { for (record <- batch.asScala) { val expectedOffset = expectedInnerOffset.getAndIncrement() if (sourceCodec != NoCompressionCodec && record.isCompressed) - throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record", - java.util.Collections.singletonMap(expectedOffset.toInt.asInstanceOf[java.lang.Integer], "")) + throw new RecordValidationException(new InvalidRecordException("Compressed outer record should not have an inner record with a " + + s"compression attribute set: $record"), + List(new ErrorRecord(expectedOffset.toInt, null))) validateRecord(batch, topicPartition, record, expectedOffset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) uncompressedSizeInBytes += record.sizeInBytes() @@ -463,8 +465,9 @@ private[kafka] object LogValidator extends Logging { private def validateKey(record: Record, relativeOffset: Long, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { if (compactedTopic && !record.hasKey) { brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark() - throw new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition.", - java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) + throw new RecordValidationException( + new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."), + List(new ErrorRecord(relativeOffset.toInt, null))) } } @@ -481,13 +484,15 @@ private[kafka] object LogValidator extends Logging { if (timestampType == TimestampType.CREATE_TIME && record.timestamp != RecordBatch.NO_TIMESTAMP && math.abs(record.timestamp - now) > timestampDiffMaxMs) - throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + - s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]", - java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) + throw new RecordValidationException( + new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"), + List(new ErrorRecord(relativeOffset.toInt, null))) if (batch.timestampType == TimestampType.LOG_APPEND_TIME) - throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + - s"timestamp type to LogAppendTime.", - java.util.Collections.singletonMap(relativeOffset.toInt.asInstanceOf[java.lang.Integer], "")) + throw new RecordValidationException( + new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + + s"timestamp type to LogAppendTime."), + List(new ErrorRecord(relativeOffset.toInt, null))) } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 45961ed8cd05a..521940b0b9e32 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,6 +25,7 @@ import java.util.concurrent.locks.Lock import com.yammer.metrics.core.{Gauge, Meter} import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition} +import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ import kafka.metrics.KafkaMetricsGroup @@ -48,7 +49,7 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, Rep import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.ProduceResponse.{ErrorRecord, PartitionResponse} import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView @@ -752,6 +753,19 @@ class ReplicaManager(val config: KafkaConfig, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { + + def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { + val logStartOffset = getPartition(topicPartition) match { + case HostedPartition.Online(partition) => partition.logStartOffset + case HostedPartition.None | HostedPartition.Offline => -1L + } + brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() + brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() + error(s"Error processing append operation on partition $topicPartition", t) + + logStartOffset + } + trace(s"Append [$entriesPerPartition] to local log") entriesPerPartition.map { case (topicPartition, records) => brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() @@ -787,22 +801,13 @@ class ReplicaManager(val config: KafkaConfig, _: CorruptRecordException | _: KafkaStorageException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + case rve: RecordValidationException => + val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) + val errorRecords = rve.errorRecords + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, errorRecords, null), Some(rve.invalidException))) case t: Throwable => - val logStartOffset = getPartition(topicPartition) match { - case HostedPartition.Online(partition) => partition.logStartOffset - case HostedPartition.None | HostedPartition.Offline => -1L - } - brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() - brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() - error(s"Error processing append operation on partition $topicPartition", t) - - val errorRecords: Map[java.lang.Integer, String] = t match { - case ire: InvalidRecordException => ire.getErrorRecords.asScala - case ite: InvalidTimestampException => ite.getErrorRecords.asScala - case _ => Map.empty[java.lang.Integer, String] - } - - (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, errorRecords, ""), Some(t))) + val logStartOffset = processFailedRecord(topicPartition, t) + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) } } } diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index ff81db062dfcc..b103bc6b178a4 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -98,7 +98,8 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) assertEquals(1, partitionResponse.errorRecords.size()) assertTrue(partitionResponse.errorRecords.containsKey(0)) - assertEquals("", partitionResponse.errorRecords.get(0)) + assertNull(partitionResponse.errorRecords.get(0)) + assertNull(partitionResponse.errorMessage) } @Test From 4fcfd7ae2ea820b140fc41884adacdebaae8dda9 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Mon, 7 Oct 2019 14:49:55 -0700 Subject: [PATCH 11/16] clean up exceptions --- .../kafka/common/InvalidRecordException.java | 24 ----------------- .../errors/InvalidTimestampException.java | 26 ------------------- 2 files changed, 50 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java index cb04c36ae5fec..4c2815bb3bda5 100644 --- a/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/InvalidRecordException.java @@ -17,41 +17,17 @@ package org.apache.kafka.common; import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.requests.ProduceResponse; -import java.util.Collections; -import java.util.List; - -// TODO: remove errorRecords public class InvalidRecordException extends ApiException { private static final long serialVersionUID = 1; - // Relative offset of the record that throws this exception - private final List errorRecords; - public InvalidRecordException(String s) { super(s); - this.errorRecords = Collections.emptyList(); - } - - public InvalidRecordException(String s, List errorRecords) { - super(s); - this.errorRecords = errorRecords; } public InvalidRecordException(String message, Throwable cause) { super(message, cause); - this.errorRecords = Collections.emptyList(); - } - - public InvalidRecordException(String message, Throwable cause, List errorRecords) { - super(message, cause); - this.errorRecords = errorRecords; - } - - public List getErrorRecords() { - return errorRecords; } } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java index 8e8d7ec93be56..0e3cd929ef77b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java @@ -16,44 +16,18 @@ */ package org.apache.kafka.common.errors; -import org.apache.kafka.common.requests.ProduceResponse; - -import java.util.Collections; -import java.util.List; - /** * Indicate the timestamp of a record is invalid. */ -// TODO: remove errorRecords public class InvalidTimestampException extends ApiException { private static final long serialVersionUID = 1L; - // Relative offset of the record that throws this exception - private final List errorRecords; - public InvalidTimestampException(String message) { super(message); - this.errorRecords = Collections.emptyList(); - } - - public InvalidTimestampException(String message, List errorRecords) { - super(message); - this.errorRecords = errorRecords; } public InvalidTimestampException(String message, Throwable cause) { super(message, cause); - this.errorRecords = Collections.emptyList(); } - - public InvalidTimestampException(String message, Throwable cause, List errorRecords) { - super(message, cause); - this.errorRecords = errorRecords; - } - - public List getErrorRecords() { - return errorRecords; - } - } From 2550341e85b993b629e882ec4687eeefdc515ad1 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Mon, 7 Oct 2019 15:43:02 -0700 Subject: [PATCH 12/16] fix relative vs absolute offset problem --- .../main/scala/kafka/log/LogValidator.scala | 56 +++++++++++++------ .../unit/kafka/log/LogValidatorTest.scala | 34 +++++------ .../kafka/server/ProduceRequestTest.scala | 4 +- 3 files changed, 59 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 89009360e1d0c..4cd122b81aeec 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -147,14 +147,14 @@ private[kafka] object LogValidator extends Logging { throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic") } - private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, relativeOffset: Long, now: Long, + private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Long, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Unit = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() throw new RecordValidationException( new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."), - List(new ErrorRecord(relativeOffset.toInt, null))) + List(new ErrorRecord(batchIndex.toInt, null))) } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -171,8 +171,8 @@ private[kafka] object LogValidator extends Logging { } } - validateKey(record, relativeOffset, topicPartition, compactedTopic, brokerTopicStats) - validateTimestamp(batch, record, relativeOffset, now, timestampType, timestampDiffMaxMs) + validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats) + validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs) } private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords, @@ -205,10 +205,11 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats) + val batchOffsetCounter = new LongRef(0) + for (record <- batch.asScala) { - val offset = offsetCounter.getAndIncrement() - validateRecord(batch, topicPartition, record, offset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - builder.appendWithOffset(offset, record) + validateRecord(batch, topicPartition, record, batchOffsetCounter.getAndIncrement(), now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } } @@ -237,6 +238,7 @@ private[kafka] object LogValidator extends Logging { magic: Byte, brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP + val expectedInnerOffset = new LongRef(0) var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value @@ -245,12 +247,26 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(topicPartition, firstBatch, batch, isFromClient, magic, brokerTopicStats) + val batchOffsetCounter = new LongRef(0) + var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L for (record <- batch.asScala) { + val batchIndex = batchOffsetCounter.getAndIncrement() + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + + val expectedOffset = expectedInnerOffset.getAndIncrement() + + // inner records offset should always be continuous + if (record.offset() != expectedOffset) { + brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() + throw new RecordValidationException( + new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), + List(new ErrorRecord(batchIndex.toInt, null))) + } + val offset = offsetCounter.getAndIncrement() - validateRecord(batch, topicPartition, record, offset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp offsetOfMaxBatchTimestamp = offset @@ -346,6 +362,8 @@ private[kafka] object LogValidator extends Logging { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagic, brokerTopicStats) uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType()) + val batchOffsetCounter = new LongRef(0) + // if we are on version 2 and beyond, and we know we are going for in place assignment, // then we can optimize the iterator to skip key / value / headers since they would not be used at all val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2) @@ -355,19 +373,23 @@ private[kafka] object LogValidator extends Logging { try { for (record <- batch.asScala) { - val expectedOffset = expectedInnerOffset.getAndIncrement() + val batchIndex = batchOffsetCounter.getAndIncrement() if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new RecordValidationException(new InvalidRecordException("Compressed outer record should not have an inner record with a " + s"compression attribute set: $record"), - List(new ErrorRecord(expectedOffset.toInt, null))) - validateRecord(batch, topicPartition, record, expectedOffset, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + List(new ErrorRecord(batchIndex.toInt, null))) + + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) uncompressedSizeInBytes += record.sizeInBytes() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { // inner records offset should always be continuous + val expectedOffset = expectedInnerOffset.getAndIncrement() if (record.offset != expectedOffset) { brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition.") + throw new RecordValidationException( + new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), + List(new ErrorRecord(batchIndex.toInt, null))) } if (record.timestamp > maxTimestamp) maxTimestamp = record.timestamp @@ -462,12 +484,12 @@ private[kafka] object LogValidator extends Logging { recordConversionStats = recordConversionStats) } - private def validateKey(record: Record, relativeOffset: Long, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { + private def validateKey(record: Record, batchIndex: Long, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { if (compactedTopic && !record.hasKey) { brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark() throw new RecordValidationException( new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."), - List(new ErrorRecord(relativeOffset.toInt, null))) + List(new ErrorRecord(batchIndex.toInt, null))) } } @@ -477,7 +499,7 @@ private[kafka] object LogValidator extends Logging { */ private def validateTimestamp(batch: RecordBatch, record: Record, - relativeOffset: Long, + batchIndex: Long, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long): Unit = { @@ -487,12 +509,12 @@ private[kafka] object LogValidator extends Logging { throw new RecordValidationException( new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"), - List(new ErrorRecord(relativeOffset.toInt, null))) + List(new ErrorRecord(batchIndex.toInt, null))) if (batch.timestampType == TimestampType.LOG_APPEND_TIME) throw new RecordValidationException( new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + s"timestamp type to LogAppendTime."), - List(new ErrorRecord(relativeOffset.toInt, null))) + List(new ErrorRecord(batchIndex.toInt, null))) } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 82d1ff60babba..1aea0bc5ef84b 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.Metrics import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1} -import kafka.common.LongRef +import kafka.common.{LongRef, RecordValidationException} import kafka.message._ import kafka.server.BrokerTopicStats import kafka.utils.TestUtils.meterCount @@ -81,7 +81,7 @@ class LogValidatorTest { } private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compressionType: CompressionType): Unit = { - assertThrows[InvalidRecordException] { + assertThrows[RecordValidationException] { validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compressionType), batchMagic, compressionType, compressionType) } assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}")), 1) @@ -574,7 +574,7 @@ class LogValidatorTest { checkCompressed(RecordBatch.MAGIC_VALUE_V2) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeNonCompressedV1(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L, @@ -597,7 +597,7 @@ class LogValidatorTest { brokerTopicStats = brokerTopicStats) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeNonCompressedV2(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, @@ -620,7 +620,7 @@ class LogValidatorTest { brokerTopicStats = brokerTopicStats) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeCompressedV1(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L, @@ -643,7 +643,7 @@ class LogValidatorTest { brokerTopicStats = brokerTopicStats) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeCompressedV2(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, @@ -1273,11 +1273,12 @@ class LogValidatorTest { interBrokerProtocolVersion = ApiVersion.latestVersion, brokerTopicStats = brokerTopicStats) } catch { - case e: InvalidTimestampException => - assertTrue(!e.getErrorRecords.isEmpty) - assertEquals(e.getErrorRecords.size, 1) - assertTrue(e.getErrorRecords.containsKey(0)); - assertEquals(e.getErrorRecords.get(0), "") + case e: RecordValidationException => + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertTrue(e.errorRecords.nonEmpty) + assertEquals(e.errorRecords.size, 1) + assertEquals(e.errorRecords.head.getRelativeOffset, 0) + assertNull(e.errorRecords.head.getMessage) } } @@ -1288,11 +1289,12 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP), RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) } catch { - case e: InvalidRecordException => - assertTrue(!e.getErrorRecords.isEmpty) - assertEquals(e.getErrorRecords.size, 1) - assertTrue(e.getErrorRecords.containsKey(0)) - assertEquals(e.getErrorRecords.get(0), "") + case e: RecordValidationException => + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertTrue(e.errorRecords.nonEmpty) + assertEquals(e.errorRecords.size, 1) + assertEquals(e.errorRecords.head.getRelativeOffset, 0) + assertNull(e.errorRecords.head.getMessage) } } diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index b103bc6b178a4..32f45c3bce5e1 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -97,8 +97,8 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(topicPartition, tp) assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) assertEquals(1, partitionResponse.errorRecords.size()) - assertTrue(partitionResponse.errorRecords.containsKey(0)) - assertNull(partitionResponse.errorRecords.get(0)) + assertEquals(0, partitionResponse.errorRecords.get(0).getRelativeOffset) + assertNull(partitionResponse.errorRecords.get(0).getMessage) assertNull(partitionResponse.errorMessage) } From 0b9fd27ceb99033b1dccc25c9167ec8791c7081b Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 8 Oct 2019 14:57:05 -0700 Subject: [PATCH 13/16] address review comments --- .../kafka/common/requests/ProduceRequest.java | 2 +- .../common/requests/ProduceResponse.java | 91 ++++++++++--------- .../common/message/ProduceRequest.json | 2 +- .../common/message/ProduceResponse.json | 12 +-- .../kafka/common/message/MessageTest.java | 14 +-- .../common/requests/RequestResponseTest.java | 2 +- .../common/RecordValidationException.scala | 21 ++++- core/src/main/scala/kafka/log/Log.scala | 9 +- .../main/scala/kafka/log/LogValidator.scala | 45 ++++----- .../scala/kafka/server/ReplicaManager.scala | 11 ++- .../test/scala/unit/kafka/log/LogTest.scala | 10 +- .../unit/kafka/log/LogValidatorTest.scala | 35 ++++--- .../kafka/server/ProduceRequestTest.scala | 8 +- 13 files changed, 138 insertions(+), 124 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 932473cb869ea..7b3ae1cf2afe0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -121,7 +121,7 @@ public class ProduceRequest extends AbstractRequest { private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; /** - * V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} + * V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} * (See KIP-467) */ private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 818e28f68fa81..eb5af7b35cf89 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -74,14 +74,14 @@ public class ProduceResponse extends AbstractResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; - private static final String ERROR_RECORDS_KEY_NAME = "error_records"; - private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset"; - private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message"; + private static final String RECORD_ERRORS_KEY_NAME = "record_errors"; + private static final String BATCH_INDEX_KEY_NAME = "batch_index"; + private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message"; private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); - private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, + private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME, "The error message of the record that caused the batch to be dropped"); private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME, "The global error message summarizing the common root cause of the records that caused the batch to be dropped"); @@ -160,7 +160,7 @@ public class ProduceResponse extends AbstractResponse { private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6; /** - * V8 adds error_records and error_message. (see KIP-467) + * V8 adds record_errors and error_message. (see KIP-467) */ public static final Schema PRODUCE_RESPONSE_V8 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -174,11 +174,11 @@ public class ProduceResponse extends AbstractResponse { "If LogAppendTime is used for the topic, the timestamp will be the broker local " + "time when the messages are appended."), LOG_START_OFFSET_FIELD, - new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema( - new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " + + new Field(RECORD_ERRORS_KEY_NAME, new ArrayOf(new Schema( + new Field.Int32(BATCH_INDEX_KEY_NAME, "The batch index of the record " + "that caused the batch to be dropped"), - RELATIVE_OFFSET_ERROR_MESSAGE_FIELD - )), "The relative offsets of records that caused the batch to be dropped"), + BATCH_INDEX_ERROR_MESSAGE_FIELD + )), "The batch indices of records that caused the batch to be dropped"), ERROR_MESSAGE_FIELD)))))), THROTTLE_TIME_MS); @@ -225,19 +225,24 @@ public ProduceResponse(Struct struct) { long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - List errorRecords = new ArrayList<>(); - if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) { - for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) { - Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage; - errorRecords.add(new ErrorRecord( - recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME), - recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, ""))); + List recordErrors = Collections.emptyList(); + if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { + Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); + if (recordErrorsArray.length > 0) { + recordErrors = new ArrayList<>(recordErrorsArray.length); + for (Object indexAndMessage : recordErrorsArray) { + Struct indexAndMessageStruct = (Struct) indexAndMessage; + recordErrors.add(new RecordError( + indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), + indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) + )); + } } } - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, ""); + String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage)); + responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); } } this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); @@ -269,15 +274,18 @@ protected Struct toStruct(short version) { partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - List errorRecords = new ArrayList<>(); - for (ErrorRecord recordOffsetAndMessage : part.errorRecords) { - Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME) - .set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getRelativeOffset()) - .setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getMessage()); - errorRecords.add(recordOffsetAndMessageStruct); + List recordErrors = Collections.emptyList(); + if (!part.recordErrors.isEmpty()) { + recordErrors = new ArrayList<>(); + for (RecordError indexAndMessage : part.recordErrors) { + Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) + .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) + .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); + recordErrors.add(indexAndMessageStruct); + } } - partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray()); + partStruct.setIfExists(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); @@ -313,7 +321,7 @@ public static final class PartitionResponse { public long baseOffset; public long logAppendTime; public long logStartOffset; - public List errorRecords; + public List recordErrors; public String errorMessage; public PartitionResponse(Errors error) { @@ -324,16 +332,16 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyList(), null); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords) { - this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, null); + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors) { + this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, null); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List errorRecords, String errorMessage) { + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; - this.errorRecords = errorRecords; + this.recordErrors = recordErrors; this.errorMessage = errorMessage; } @@ -349,8 +357,8 @@ public String toString() { b.append(logAppendTime); b.append(", logStartOffset: "); b.append(logStartOffset); - b.append(", errorRecords: "); - b.append(errorRecords); + b.append(", recordErrors: "); + b.append(recordErrors); b.append(", errorMessage: "); b.append(errorMessage); b.append('}'); @@ -358,21 +366,18 @@ public String toString() { } } - public static final class ErrorRecord { - private final int relativeOffset; - private final String message; + public static final class RecordError { + public final int batchIndex; + public final String message; - public ErrorRecord(int relativeOffset, String message) { - this.relativeOffset = relativeOffset; + public RecordError(int batchIndex, String message) { + this.batchIndex = batchIndex; this.message = message; } - public String getMessage() { - return message; - } - - public int getRelativeOffset() { - return relativeOffset; + public RecordError(int batchIndex) { + this.batchIndex = batchIndex; + this.message = null; } } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 5a1556d2652d6..8459c05ea5dbe 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -29,7 +29,7 @@ // // Starting in version 7, records can be produced using ZStandard compression. See KIP-110. // - // Starting in Version 8, response has ErrorRecords and ErrorMEssage. See KIP-467. + // Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467. "validVersions": "0-8", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 690880d29d17b..08c76d973b139 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -28,7 +28,7 @@ // Version 5 added LogStartOffset to filter out spurious // OutOfOrderSequenceExceptions on the client. // - // Version 8 added ErrorRecords and ErrorMessage to include information about + // Version 8 added RecordErrors and ErrorMessage to include information about // records that cause the whole batch to be dropped "validVersions": "0-8", "fields": [ @@ -48,11 +48,11 @@ "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The log start offset." }, - { "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "ignorable": true, - "about": "The relative offsets of records that caused the batch to be dropped", "fields": [ - { "name": "RelativeOffset", "type": "int32", "versions": "8+", - "about": "The relative offset of the record that cause the batch to be dropped" }, - { "name": "RelativeOffsetErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", + { "name": "RecordErrors", "type": "[]BatchIndexAndErrorMessage", "versions": "8+", "ignorable": true, + "about": "The batch indices of records that caused the batch to be dropped", "fields": [ + { "name": "BatchIndex", "type": "int32", "versions": "8+", + "about": "The batch index of the record that cause the batch to be dropped" }, + { "name": "BatchIndexErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "about": "The error message of the record that caused the batch to be dropped"} ]}, { "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index c2d3acc7e9f24..f5ee198a4b66c 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -514,8 +514,8 @@ public void testProduceResponseVersions() throws Exception { int throttleTimeMs = 1234; long logAppendTimeMs = 1234L; long logStartOffset = 1234L; - int relativeOffset = 0; - String relativeOffsetErrorMessage = "error message"; + int batchIndex = 0; + String batchIndexErrorMessage = "error message"; String errorMessage = "global error message"; testAllMessageRoundTrips(new ProduceResponseData() @@ -540,10 +540,10 @@ public void testProduceResponseVersions() throws Exception { .setBaseOffset(baseOffset) .setLogAppendTimeMs(logAppendTimeMs) .setLogStartOffset(logStartOffset) - .setErrorRecords(singletonList( - new ProduceResponseData.RelativeOffsetAndErrorMessage() - .setRelativeOffset(relativeOffset) - .setRelativeOffsetErrorMessage(relativeOffsetErrorMessage))) + .setRecordErrors(singletonList( + new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(batchIndex) + .setBatchIndexErrorMessage(batchIndexErrorMessage))) .setErrorMessage(errorMessage))))) .setThrottleTimeMs(throttleTimeMs); @@ -551,7 +551,7 @@ public void testProduceResponseVersions() throws Exception { ProduceResponseData responseData = response.get(); if (version < 8) { - responseData.responses().get(0).partitions().get(0).setErrorRecords(Collections.emptyList()); + responseData.responses().get(0).partitions().get(0).setRecordErrors(Collections.emptyList()); responseData.responses().get(0).partitions().get(0).setErrorMessage(null); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index a527e02cf8333..e1936bf8262ee 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1122,7 +1122,7 @@ private ProduceResponse createProduceResponse() { private ProduceResponse createProduceResponseWithErrorMessage() { Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonList(new ProduceResponse.ErrorRecord(0, "error message")), + 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonList(new ProduceResponse.RecordError(0, "error message")), "global error message")); return new ProduceResponse(responseData, 0); } diff --git a/core/src/main/scala/kafka/common/RecordValidationException.scala b/core/src/main/scala/kafka/common/RecordValidationException.scala index 978701bf7d19d..2acdf84c00c06 100644 --- a/core/src/main/scala/kafka/common/RecordValidationException.scala +++ b/core/src/main/scala/kafka/common/RecordValidationException.scala @@ -1,8 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.common import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.requests.ProduceResponse.ErrorRecord +import org.apache.kafka.common.requests.ProduceResponse.RecordError class RecordValidationException(val invalidException: ApiException, - val errorRecords: List[ErrorRecord]) extends RuntimeException { + val recordErrors: List[RecordError]) extends RuntimeException { } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 40f6c89bf563a..e56db1ddc51ec 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction -import org.apache.kafka.common.requests.ProduceResponse.ErrorRecord +import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} @@ -58,10 +58,10 @@ object LogAppendInfo { RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) - def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, errorRecords: List[ErrorRecord], errorMessage: String): LogAppendInfo = + def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, recordErrors: List[RecordError], errorMessage: String): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, - offsetsMonotonic = false, -1L, errorRecords, errorMessage) + offsetsMonotonic = false, -1L, recordErrors, errorMessage) } /** @@ -81,7 +81,6 @@ object LogAppendInfo { * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing * @param lastOffsetOfFirstBatch The last offset of the first batch - ( */ case class LogAppendInfo(var firstOffset: Option[Long], var lastOffset: Long, @@ -96,7 +95,7 @@ case class LogAppendInfo(var firstOffset: Option[Long], validBytes: Int, offsetsMonotonic: Boolean, lastOffsetOfFirstBatch: Long, - errorRecords: List[ErrorRecord] = List(), + recordErrors: List[RecordError] = List(), errorMessage: String = null) { /** * Get the first offset if it exists, else get the last offset of the first batch diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 4cd122b81aeec..99ac408499909 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampE import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.requests.ProduceResponse.ErrorRecord +import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.utils.Time import scala.collection.{Seq, mutable} @@ -147,14 +147,14 @@ private[kafka] object LogValidator extends Logging { throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic") } - private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Long, now: Long, + private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Unit = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() throw new RecordValidationException( new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."), - List(new ErrorRecord(batchIndex.toInt, null))) + List(new RecordError(batchIndex))) } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -205,10 +205,8 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats) - val batchOffsetCounter = new LongRef(0) - - for (record <- batch.asScala) { - validateRecord(batch, topicPartition, record, batchOffsetCounter.getAndIncrement(), now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } } @@ -247,23 +245,20 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(topicPartition, firstBatch, batch, isFromClient, magic, brokerTopicStats) - val batchOffsetCounter = new LongRef(0) - var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L - for (record <- batch.asScala) { - val batchIndex = batchOffsetCounter.getAndIncrement() + for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) val expectedOffset = expectedInnerOffset.getAndIncrement() // inner records offset should always be continuous - if (record.offset() != expectedOffset) { + if (record.offset != expectedOffset) { brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() throw new RecordValidationException( new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new ErrorRecord(batchIndex.toInt, null))) + List(new RecordError(batchIndex))) } val offset = offsetCounter.getAndIncrement() @@ -362,8 +357,6 @@ private[kafka] object LogValidator extends Logging { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagic, brokerTopicStats) uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType()) - val batchOffsetCounter = new LongRef(0) - // if we are on version 2 and beyond, and we know we are going for in place assignment, // then we can optimize the iterator to skip key / value / headers since they would not be used at all val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2) @@ -372,12 +365,12 @@ private[kafka] object LogValidator extends Logging { batch.streamingIterator(BufferSupplier.NO_CACHING) try { - for (record <- batch.asScala) { - val batchIndex = batchOffsetCounter.getAndIncrement() + for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { if (sourceCodec != NoCompressionCodec && record.isCompressed) - throw new RecordValidationException(new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record"), - List(new ErrorRecord(batchIndex.toInt, null))) + throw new RecordValidationException( + new InvalidRecordException("Compressed outer record should not have an inner record with a " + + s"compression attribute set: $record"), + List(new RecordError(batchIndex))) validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) @@ -389,7 +382,7 @@ private[kafka] object LogValidator extends Logging { brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() throw new RecordValidationException( new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new ErrorRecord(batchIndex.toInt, null))) + List(new RecordError(batchIndex))) } if (record.timestamp > maxTimestamp) maxTimestamp = record.timestamp @@ -484,12 +477,12 @@ private[kafka] object LogValidator extends Logging { recordConversionStats = recordConversionStats) } - private def validateKey(record: Record, batchIndex: Long, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { + private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { if (compactedTopic && !record.hasKey) { brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark() throw new RecordValidationException( new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."), - List(new ErrorRecord(batchIndex.toInt, null))) + List(new RecordError(batchIndex))) } } @@ -499,7 +492,7 @@ private[kafka] object LogValidator extends Logging { */ private def validateTimestamp(batch: RecordBatch, record: Record, - batchIndex: Long, + batchIndex: Int, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long): Unit = { @@ -509,12 +502,12 @@ private[kafka] object LogValidator extends Logging { throw new RecordValidationException( new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"), - List(new ErrorRecord(batchIndex.toInt, null))) + List(new RecordError(batchIndex))) if (batch.timestampType == TimestampType.LOG_APPEND_TIME) throw new RecordValidationException( new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + s"timestamp type to LogAppendTime."), - List(new ErrorRecord(batchIndex.toInt, null))) + List(new RecordError(batchIndex))) } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 521940b0b9e32..d10c93645c36a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -33,7 +33,7 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.{ElectionType, InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.{ElectionType, Node, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -49,7 +49,7 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, Rep import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction -import org.apache.kafka.common.requests.ProduceResponse.{ErrorRecord, PartitionResponse} +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView @@ -499,7 +499,7 @@ class ReplicaManager(val config: KafkaConfig, ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, - result.info.logStartOffset, result.info.errorRecords.asJava, result.info.errorMessage)) // response status + result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status } recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) @@ -803,8 +803,9 @@ class ReplicaManager(val config: KafkaConfig, (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case rve: RecordValidationException => val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) - val errorRecords = rve.errorRecords - (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, errorRecords, null), Some(rve.invalidException))) + val recordErrors = rve.recordErrors + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo( + logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException))) case t: Throwable => val logStartOffset = processFailedRecord(topicPartition, t) (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c446563570480..69304c84c8cc7 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -25,13 +25,13 @@ import java.util.{Collections, Optional, Properties} import com.yammer.metrics.Metrics import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} -import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} +import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -1845,19 +1845,19 @@ class LogTest { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: InvalidRecordException => // this is good + case _: RecordValidationException => // this is good } try { log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: InvalidRecordException => // this is good + case _: RecordValidationException => // this is good } try { log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: InvalidRecordException => // this is good + case _: RecordValidationException => // this is good } // check if metric for NoKeyCompactedTopicRecordsPerSec is logged diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 1aea0bc5ef84b..9bd37f61a5013 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -1251,11 +1251,11 @@ class LogValidatorTest { } @Test - def testInvalidTimestampExceptionHasRelativeOffset(): Unit = { + def testInvalidTimestampExceptionHasBatchIndex(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, codec = CompressionType.GZIP) - try { + val e = intercept[RecordValidationException] { LogValidator.validateMessagesAndAssignOffsets( records, topicPartition, @@ -1272,30 +1272,29 @@ class LogValidatorTest { isFromClient = true, interBrokerProtocolVersion = ApiVersion.latestVersion, brokerTopicStats = brokerTopicStats) - } catch { - case e: RecordValidationException => - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) - assertTrue(e.errorRecords.nonEmpty) - assertEquals(e.errorRecords.size, 1) - assertEquals(e.errorRecords.head.getRelativeOffset, 0) - assertNull(e.errorRecords.head.getMessage) } + + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertTrue(e.recordErrors.nonEmpty) + assertEquals(e.recordErrors.size, 1) + assertEquals(e.recordErrors.head.batchIndex, 0) + assertNull(e.recordErrors.head.message) } @Test - def testInvalidRecordExceptionHasRelativeOffset(): Unit = { - try { + def testInvalidRecordExceptionHasBatchIndex(): Unit = { + val e = intercept[RecordValidationException] { validateMessages(recordsWithInvalidInnerMagic( RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP), RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) - } catch { - case e: RecordValidationException => - assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) - assertTrue(e.errorRecords.nonEmpty) - assertEquals(e.errorRecords.size, 1) - assertEquals(e.errorRecords.head.getRelativeOffset, 0) - assertNull(e.errorRecords.head.getMessage) + fail("Should have thrown RecordValidationException") } + + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertTrue(e.recordErrors.nonEmpty) + assertEquals(e.recordErrors.size, 1) + assertEquals(e.recordErrors.head.batchIndex, 0) + assertNull(e.recordErrors.head.message) } private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 32f45c3bce5e1..c51456037ee5f 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -57,7 +57,7 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(Errors.NONE, partitionResponse.error) assertEquals(expectedOffset, partitionResponse.baseOffset) assertEquals(-1, partitionResponse.logAppendTime) - assertTrue(partitionResponse.errorRecords.isEmpty) + assertTrue(partitionResponse.recordErrors.isEmpty) partitionResponse } @@ -96,9 +96,9 @@ class ProduceRequestTest extends BaseRequestTest { val (tp, partitionResponse) = produceResponse.responses.asScala.head assertEquals(topicPartition, tp) assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) - assertEquals(1, partitionResponse.errorRecords.size()) - assertEquals(0, partitionResponse.errorRecords.get(0).getRelativeOffset) - assertNull(partitionResponse.errorRecords.get(0).getMessage) + assertEquals(1, partitionResponse.recordErrors.size()) + assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex) + assertNull(partitionResponse.recordErrors.get(0).message) assertNull(partitionResponse.errorMessage) } From ab9367a6e6c174d056633acd9430a4776432ee76 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 8 Oct 2019 15:13:25 -0700 Subject: [PATCH 14/16] add comments and fix null pointer in PartitionResponse --- .../org/apache/kafka/common/requests/ProduceResponse.java | 6 +++++- core/src/main/scala/kafka/log/Log.scala | 5 +++++ core/src/main/scala/kafka/log/LogValidator.scala | 3 +-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index eb5af7b35cf89..8bfc6298604be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -360,7 +360,11 @@ public String toString() { b.append(", recordErrors: "); b.append(recordErrors); b.append(", errorMessage: "); - b.append(errorMessage); + if (errorMessage != null) { + b.append(errorMessage); + } else { + b.append("null"); + } b.append('}'); return b.toString(); } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e56db1ddc51ec..41f551e05f358 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -58,6 +58,11 @@ object LogAppendInfo { RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) + /** + * In ProduceRequest V8+, we add two new fields record_errors and error_message (see KIP-467). + * For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one + * in unknownLogAppendInfoWithLogStartOffset, but with additiona fields recordErrors and errorMessage + */ def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, recordErrors: List[RecordError], errorMessage: String): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 99ac408499909..70bf3bf00da15 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -368,8 +368,7 @@ private[kafka] object LogValidator extends Logging { for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new RecordValidationException( - new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record"), + new InvalidRecordException(s"Compressed outer record should not have an inner record with a compression attribute set: $record"), List(new RecordError(batchIndex))) validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) From 877cde837db27f8cd07315d3607c2537ffb64a0c Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 8 Oct 2019 15:28:44 -0700 Subject: [PATCH 15/16] remove redundant fail statement and fix comment --- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/test/scala/unit/kafka/log/LogValidatorTest.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 29968c9d30576..94997a19a4a51 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -59,7 +59,7 @@ object LogAppendInfo { offsetsMonotonic = false, -1L) /** - * In ProduceRequest V8+, we add two new fields record_errors and error_message (see KIP-467). + * In ProduceResponse V8+, we add two new fields record_errors and error_message (see KIP-467). * For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one * in unknownLogAppendInfoWithLogStartOffset, but with additiona fields recordErrors and errorMessage */ diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 9bd37f61a5013..923ae9185211f 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -1287,7 +1287,6 @@ class LogValidatorTest { validateMessages(recordsWithInvalidInnerMagic( RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP), RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) - fail("Should have thrown RecordValidationException") } assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) From ca821c98f7e78ea2dadf8271000d4d084d1be6f5 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Tue, 8 Oct 2019 20:47:39 -0700 Subject: [PATCH 16/16] Change testProduceWithInvalidTimestamp because the global error message is not null anymore --- core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index c51456037ee5f..3bc8d0aacb8f3 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -99,7 +99,7 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(1, partitionResponse.recordErrors.size()) assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex) assertNull(partitionResponse.recordErrors.get(0).message) - assertNull(partitionResponse.errorMessage) + assertNotNull(partitionResponse.errorMessage) } @Test