diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 932473cb869ea..7b3ae1cf2afe0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -121,7 +121,7 @@ public class ProduceRequest extends AbstractRequest { private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; /** - * V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} + * V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} * (See KIP-467) */ private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index a6df88002d582..8bfc6298604be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -74,14 +74,14 @@ public class ProduceResponse extends AbstractResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; - private static final String ERROR_RECORDS_KEY_NAME = "error_records"; - private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset"; - private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message"; + private static final String RECORD_ERRORS_KEY_NAME = "record_errors"; + private static final String BATCH_INDEX_KEY_NAME = "batch_index"; + private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message"; private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); - private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, + private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME, "The error message of the record that caused the batch to be dropped"); private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME, "The global error message summarizing the common root cause of the records that caused the batch to be dropped"); @@ -160,7 +160,7 @@ public class ProduceResponse extends AbstractResponse { private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6; /** - * V8 adds error_records and error_message. (see KIP-467) + * V8 adds record_errors and error_message. (see KIP-467) */ public static final Schema PRODUCE_RESPONSE_V8 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -174,11 +174,11 @@ public class ProduceResponse extends AbstractResponse { "If LogAppendTime is used for the topic, the timestamp will be the broker local " + "time when the messages are appended."), LOG_START_OFFSET_FIELD, - new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema( - new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " + + new Field(RECORD_ERRORS_KEY_NAME, new ArrayOf(new Schema( + new Field.Int32(BATCH_INDEX_KEY_NAME, "The batch index of the record " + "that caused the batch to be dropped"), - RELATIVE_OFFSET_ERROR_MESSAGE_FIELD - )), "The relative offsets of records that caused the batch to be dropped"), + BATCH_INDEX_ERROR_MESSAGE_FIELD + )), "The batch indices of records that caused the batch to be dropped"), ERROR_MESSAGE_FIELD)))))), THROTTLE_TIME_MS); @@ -225,19 +225,24 @@ public ProduceResponse(Struct struct) { long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - Map errorRecords = new HashMap<>(); - if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) { - for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) { - Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage; - Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME); - String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, ""); - errorRecords.put(relativeOffset, relativeOffsetErrorMessage); + List recordErrors = Collections.emptyList(); + if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { + Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); + if (recordErrorsArray.length > 0) { + recordErrors = new ArrayList<>(recordErrorsArray.length); + for (Object indexAndMessage : recordErrorsArray) { + Struct indexAndMessageStruct = (Struct) indexAndMessage; + recordErrors.add(new RecordError( + indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), + indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) + )); + } } } - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, ""); + String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage)); + responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); } } this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); @@ -266,19 +271,21 @@ protected Struct toStruct(short version) { .set(PARTITION_ID, partitionEntry.getKey()) .set(ERROR_CODE, errorCode) .set(BASE_OFFSET_KEY_NAME, part.baseOffset); - if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) - partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); + partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - List errorRecords = new ArrayList<>(); - for (Map.Entry recordOffsetAndMessage : part.errorRecords.entrySet()) { - Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME) - .set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey()) - .setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue()); - errorRecords.add(recordOffsetAndMessageStruct); + List recordErrors = Collections.emptyList(); + if (!part.recordErrors.isEmpty()) { + recordErrors = new ArrayList<>(); + for (RecordError indexAndMessage : part.recordErrors) { + Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) + .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) + .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); + recordErrors.add(indexAndMessageStruct); + } } - partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray()); + partStruct.setIfExists(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); @@ -314,7 +321,7 @@ public static final class PartitionResponse { public long baseOffset; public long logAppendTime; public long logStartOffset; - public Map errorRecords; + public List recordErrors; public String errorMessage; public PartitionResponse(Errors error) { @@ -322,19 +329,19 @@ public PartitionResponse(Errors error) { } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { - this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null); + this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyList(), null); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords) { - this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, ""); + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors) { + this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, null); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map errorRecords, String errorMessage) { + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; - this.errorRecords = errorRecords; + this.recordErrors = recordErrors; this.errorMessage = errorMessage; } @@ -350,15 +357,35 @@ public String toString() { b.append(logAppendTime); b.append(", logStartOffset: "); b.append(logStartOffset); - b.append(", errorRecords: "); - b.append(errorRecords); + b.append(", recordErrors: "); + b.append(recordErrors); b.append(", errorMessage: "); - b.append(errorMessage); + if (errorMessage != null) { + b.append(errorMessage); + } else { + b.append("null"); + } b.append('}'); return b.toString(); } } + public static final class RecordError { + public final int batchIndex; + public final String message; + + public RecordError(int batchIndex, String message) { + this.batchIndex = batchIndex; + this.message = message; + } + + public RecordError(int batchIndex) { + this.batchIndex = batchIndex; + this.message = null; + } + + } + public static ProduceResponse parse(ByteBuffer buffer, short version) { return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer)); } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 4fcedf10da909..a872da75c7e9b 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -29,7 +29,7 @@ // // Starting in version 7, records can be produced using ZStandard compression. See KIP-110. // - // Version 8 is the same as version 7 (but see KIP-467 for the response changes). + // Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467. "validVersions": "0-8", "flexibleVersions": "none", "fields": [ diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index c21b0aa93accd..77ab0655be57f 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -28,7 +28,7 @@ // Version 5 added LogStartOffset to filter out spurious // OutOfOrderSequenceExceptions on the client. // - // Version 8 added ErrorRecords and ErrorMessage to include information about + // Version 8 added RecordErrors and ErrorMessage to include information about // records that cause the whole batch to be dropped. See KIP-467 for details. "validVersions": "0-8", "flexibleVersions": "none", @@ -49,11 +49,11 @@ "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The log start offset." }, - { "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "ignorable": true, - "about": "The relative offsets of records that caused the batch to be dropped", "fields": [ - { "name": "RelativeOffset", "type": "int32", "versions": "8+", - "about": "The relative offset of the record that cause the batch to be dropped" }, - { "name": "RelativeOffsetErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", + { "name": "RecordErrors", "type": "[]BatchIndexAndErrorMessage", "versions": "8+", "ignorable": true, + "about": "The batch indices of records that caused the batch to be dropped", "fields": [ + { "name": "BatchIndex", "type": "int32", "versions": "8+", + "about": "The batch index of the record that cause the batch to be dropped" }, + { "name": "BatchIndexErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "about": "The error message of the record that caused the batch to be dropped"} ]}, { "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 73bb2afd97ca4..c7dedf94cdbf7 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -516,8 +516,8 @@ public void testProduceResponseVersions() throws Exception { int throttleTimeMs = 1234; long logAppendTimeMs = 1234L; long logStartOffset = 1234L; - int relativeOffset = 0; - String relativeOffsetErrorMessage = "error message"; + int batchIndex = 0; + String batchIndexErrorMessage = "error message"; String errorMessage = "global error message"; testAllMessageRoundTrips(new ProduceResponseData() @@ -542,10 +542,10 @@ public void testProduceResponseVersions() throws Exception { .setBaseOffset(baseOffset) .setLogAppendTimeMs(logAppendTimeMs) .setLogStartOffset(logStartOffset) - .setErrorRecords(singletonList( - new ProduceResponseData.RelativeOffsetAndErrorMessage() - .setRelativeOffset(relativeOffset) - .setRelativeOffsetErrorMessage(relativeOffsetErrorMessage))) + .setRecordErrors(singletonList( + new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(batchIndex) + .setBatchIndexErrorMessage(batchIndexErrorMessage))) .setErrorMessage(errorMessage))))) .setThrottleTimeMs(throttleTimeMs); @@ -553,7 +553,7 @@ public void testProduceResponseVersions() throws Exception { ProduceResponseData responseData = response.get(); if (version < 8) { - responseData.responses().get(0).partitions().get(0).setErrorRecords(Collections.emptyList()); + responseData.responses().get(0).partitions().get(0).setRecordErrors(Collections.emptyList()); responseData.responses().get(0).partitions().get(0).setErrorMessage(null); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 32b0a00679eef..c8bd7518c67c9 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1223,7 +1223,8 @@ private ProduceResponse createProduceResponse() { private ProduceResponse createProduceResponseWithErrorMessage() { Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonMap(0, "error message"), "global error message")); + 10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonList(new ProduceResponse.RecordError(0, "error message")), + "global error message")); return new ProduceResponse(responseData, 0); } diff --git a/core/src/main/scala/kafka/common/RecordValidationException.scala b/core/src/main/scala/kafka/common/RecordValidationException.scala new file mode 100644 index 0000000000000..2acdf84c00c06 --- /dev/null +++ b/core/src/main/scala/kafka/common/RecordValidationException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +import org.apache.kafka.common.errors.ApiException +import org.apache.kafka.common.requests.ProduceResponse.RecordError + +class RecordValidationException(val invalidException: ApiException, + val recordErrors: List[RecordError]) extends RuntimeException { +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b0af105f5042b..94997a19a4a51 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} @@ -54,7 +55,18 @@ object LogAppendInfo { def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, + offsetsMonotonic = false, -1L) + + /** + * In ProduceResponse V8+, we add two new fields record_errors and error_message (see KIP-467). + * For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one + * in unknownLogAppendInfoWithLogStartOffset, but with additiona fields recordErrors and errorMessage + */ + def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, recordErrors: List[RecordError], errorMessage: String): LogAppendInfo = + LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, + offsetsMonotonic = false, -1L, recordErrors, errorMessage) } /** @@ -87,7 +99,9 @@ case class LogAppendInfo(var firstOffset: Option[Long], shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean, - lastOffsetOfFirstBatch: Long) { + lastOffsetOfFirstBatch: Long, + recordErrors: List[RecordError] = List(), + errorMessage: String = null) { /** * Get the first offset if it exists, else get the last offset of the first batch * For magic versions 2 and newer, this method will return first offset. For magic versions diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 6bba8eb2d769e..70bf3bf00da15 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -19,7 +19,7 @@ package kafka.log import java.nio.ByteBuffer import kafka.api.{ApiVersion, KAFKA_2_1_IV0} -import kafka.common.LongRef +import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.BrokerTopicStats import kafka.utils.Logging @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampE import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.utils.Time import scala.collection.{Seq, mutable} @@ -146,11 +147,14 @@ private[kafka] object LogValidator extends Logging { throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic") } - private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, now: Long, timestampType: TimestampType, - timestampDiffMaxMs: Long, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Unit = { + private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long, + timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean, + brokerTopicStats: BrokerTopicStats): Unit = { if (!record.hasMagic(batch.magic)) { brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark() - throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.") + throw new RecordValidationException( + new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."), + List(new RecordError(batchIndex))) } // verify the record-level CRC only if this is one of the deep entries of a compressed message @@ -167,8 +171,8 @@ private[kafka] object LogValidator extends Logging { } } - validateKey(record, topicPartition, compactedTopic, brokerTopicStats) - validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs) + validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats) + validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs) } private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords, @@ -201,8 +205,8 @@ private[kafka] object LogValidator extends Logging { for (batch <- records.batches.asScala) { validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats) - for (record <- batch.asScala) { - validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) } } @@ -232,6 +236,7 @@ private[kafka] object LogValidator extends Logging { magic: Byte, brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP + val expectedInnerOffset = new LongRef(0) var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value @@ -243,8 +248,19 @@ private[kafka] object LogValidator extends Logging { var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L - for (record <- batch.asScala) { - validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + + val expectedOffset = expectedInnerOffset.getAndIncrement() + + // inner records offset should always be continuous + if (record.offset != expectedOffset) { + brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() + throw new RecordValidationException( + new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), + List(new RecordError(batchIndex))) + } + val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp @@ -349,11 +365,13 @@ private[kafka] object LogValidator extends Logging { batch.streamingIterator(BufferSupplier.NO_CACHING) try { - for (record <- batch.asScala) { + for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { if (sourceCodec != NoCompressionCodec && record.isCompressed) - throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record") - validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) + throw new RecordValidationException( + new InvalidRecordException(s"Compressed outer record should not have an inner record with a compression attribute set: $record"), + List(new RecordError(batchIndex))) + + validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) uncompressedSizeInBytes += record.sizeInBytes() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { @@ -361,7 +379,9 @@ private[kafka] object LogValidator extends Logging { val expectedOffset = expectedInnerOffset.getAndIncrement() if (record.offset != expectedOffset) { brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition.") + throw new RecordValidationException( + new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), + List(new RecordError(batchIndex))) } if (record.timestamp > maxTimestamp) maxTimestamp = record.timestamp @@ -456,10 +476,12 @@ private[kafka] object LogValidator extends Logging { recordConversionStats = recordConversionStats) } - private def validateKey(record: Record, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { + private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) { if (compactedTopic && !record.hasKey) { brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark() - throw new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition.") + throw new RecordValidationException( + new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."), + List(new RecordError(batchIndex))) } } @@ -469,17 +491,22 @@ private[kafka] object LogValidator extends Logging { */ private def validateTimestamp(batch: RecordBatch, record: Record, + batchIndex: Int, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long): Unit = { if (timestampType == TimestampType.CREATE_TIME && record.timestamp != RecordBatch.NO_TIMESTAMP && math.abs(record.timestamp - now) > timestampDiffMaxMs) - throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + - s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]") + throw new RecordValidationException( + new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " + + s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"), + List(new RecordError(batchIndex))) if (batch.timestampType == TimestampType.LOG_APPEND_TIME) - throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + - s"timestamp type to LogAppendTime.") + throw new RecordValidationException( + new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + + s"timestamp type to LogAppendTime."), + List(new RecordError(batchIndex))) } case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8a74aa7f8908e..d10c93645c36a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,6 +25,7 @@ import java.util.concurrent.locks.Lock import com.yammer.metrics.core.{Gauge, Meter} import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition} +import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ import kafka.metrics.KafkaMetricsGroup @@ -32,9 +33,7 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.ElectionType -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.Node +import org.apache.kafka.common.{ElectionType, Node, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -499,7 +498,8 @@ class ReplicaManager(val config: KafkaConfig, topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset - new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status + new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, + result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status } recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) @@ -753,6 +753,19 @@ class ReplicaManager(val config: KafkaConfig, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { + + def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { + val logStartOffset = getPartition(topicPartition) match { + case HostedPartition.Online(partition) => partition.logStartOffset + case HostedPartition.None | HostedPartition.Offline => -1L + } + brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() + brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() + error(s"Error processing append operation on partition $topicPartition", t) + + logStartOffset + } + trace(s"Append [$entriesPerPartition] to local log") entriesPerPartition.map { case (topicPartition, records) => brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() @@ -786,17 +799,15 @@ class ReplicaManager(val config: KafkaConfig, _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | - _: KafkaStorageException | - _: InvalidTimestampException) => + _: KafkaStorageException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + case rve: RecordValidationException => + val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) + val recordErrors = rve.recordErrors + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo( + logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException))) case t: Throwable => - val logStartOffset = getPartition(topicPartition) match { - case HostedPartition.Online(partition) => partition.logStartOffset - case HostedPartition.None | HostedPartition.Offline => -1L - } - brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() - brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() - error(s"Error processing append operation on partition $topicPartition", t) + val logStartOffset = processFailedRecord(topicPartition, t) (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 29b564e066049..c5de28e8e13c1 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -25,13 +25,13 @@ import java.util.{Collections, Optional, Properties} import com.yammer.metrics.Metrics import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} -import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} +import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition} +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -1850,19 +1850,19 @@ class LogTest { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: InvalidRecordException => // this is good + case _: RecordValidationException => // this is good } try { log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: InvalidRecordException => // this is good + case _: RecordValidationException => // this is good } try { log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0) fail("Compacted topics cannot accept a message without a key.") } catch { - case _: InvalidRecordException => // this is good + case _: RecordValidationException => // this is good } // check if metric for NoKeyCompactedTopicRecordsPerSec is logged diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7fd13210a8ccc..923ae9185211f 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.Metrics import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1} -import kafka.common.LongRef +import kafka.common.{LongRef, RecordValidationException} import kafka.message._ import kafka.server.BrokerTopicStats import kafka.utils.TestUtils.meterCount @@ -81,7 +81,7 @@ class LogValidatorTest { } private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compressionType: CompressionType): Unit = { - assertThrows[InvalidRecordException] { + assertThrows[RecordValidationException] { validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compressionType), batchMagic, compressionType, compressionType) } assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}")), 1) @@ -574,7 +574,7 @@ class LogValidatorTest { checkCompressed(RecordBatch.MAGIC_VALUE_V2) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeNonCompressedV1(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L, @@ -597,7 +597,7 @@ class LogValidatorTest { brokerTopicStats = brokerTopicStats) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeNonCompressedV2(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, @@ -620,7 +620,7 @@ class LogValidatorTest { brokerTopicStats = brokerTopicStats) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeCompressedV1(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L, @@ -643,7 +643,7 @@ class LogValidatorTest { brokerTopicStats = brokerTopicStats) } - @Test(expected = classOf[InvalidTimestampException]) + @Test(expected = classOf[RecordValidationException]) def testInvalidCreateTimeCompressedV2(): Unit = { val now = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, @@ -1250,6 +1250,52 @@ class LogValidatorTest { testBatchWithoutRecordsNotAllowed(NoCompressionCodec, DefaultCompressionCodec) } + @Test + def testInvalidTimestampExceptionHasBatchIndex(): Unit = { + val now = System.currentTimeMillis() + val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, + codec = CompressionType.GZIP) + val e = intercept[RecordValidationException] { + LogValidator.validateMessagesAndAssignOffsets( + records, + topicPartition, + offsetCounter = new LongRef(0), + time = time, + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + magic = RecordBatch.MAGIC_VALUE_V1, + compactedTopic = false, + timestampType = TimestampType.CREATE_TIME, + timestampDiffMaxMs = 1000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true, + interBrokerProtocolVersion = ApiVersion.latestVersion, + brokerTopicStats = brokerTopicStats) + } + + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertTrue(e.recordErrors.nonEmpty) + assertEquals(e.recordErrors.size, 1) + assertEquals(e.recordErrors.head.batchIndex, 0) + assertNull(e.recordErrors.head.message) + } + + @Test + def testInvalidRecordExceptionHasBatchIndex(): Unit = { + val e = intercept[RecordValidationException] { + validateMessages(recordsWithInvalidInnerMagic( + RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP), + RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) + } + + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertTrue(e.recordErrors.nonEmpty) + assertEquals(e.recordErrors.size, 1) + assertEquals(e.recordErrors.head.batchIndex, 0) + assertNull(e.recordErrors.head.message) + } + private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = { val offset = 1234567 val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index bedf6ff0f00e8..3bc8d0aacb8f3 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -17,6 +17,7 @@ package kafka.server +import java.nio.ByteBuffer import java.util.Properties import com.yammer.metrics.Metrics @@ -56,7 +57,7 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(Errors.NONE, partitionResponse.error) assertEquals(expectedOffset, partitionResponse.baseOffset) assertEquals(-1, partitionResponse.logAppendTime) - assertTrue(partitionResponse.errorRecords.isEmpty) + assertTrue(partitionResponse.recordErrors.isEmpty) partitionResponse } @@ -68,6 +69,39 @@ class ProduceRequestTest extends BaseRequestTest { new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } + @Test + def testProduceWithInvalidTimestamp(): Unit = { + val topic = "topic" + val partition = 0 + val topicConfig = new Properties + topicConfig.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000") + val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig) + val leader = partitionToLeader(partition) + + def createRecords(magicValue: Byte, + timestamp: Long = RecordBatch.NO_TIMESTAMP, + codec: CompressionType): MemoryRecords = { + val buf = ByteBuffer.allocate(512) + val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) + builder.appendWithOffset(0, timestamp, null, "hello".getBytes) + builder.appendWithOffset(1, timestamp, null, "there".getBytes) + builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes) + builder.build() + } + + val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP) + val topicPartition = new TopicPartition("topic", partition) + val partitionRecords = Map(topicPartition -> records) + val produceResponse = sendProduceRequest(leader, ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()) + val (tp, partitionResponse) = produceResponse.responses.asScala.head + assertEquals(topicPartition, tp) + assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) + assertEquals(1, partitionResponse.recordErrors.size()) + assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex) + assertNull(partitionResponse.recordErrors.get(0).message) + assertNotNull(partitionResponse.errorMessage) + } + @Test def testProduceToNonReplica(): Unit = { val topic = "topic"