diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index b40598f243084..0949c1110dec1 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.Logging -import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition @@ -571,9 +571,14 @@ private[log] object LogValidator extends Logging { private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = { if (recordErrors.nonEmpty) { val errors = recordErrors.map(_.recordError) - throw new RecordValidationException(new InvalidRecordException( - "One or more records have been rejected due to " + errors.size + " record errors " + - "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors) + if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) { + throw new RecordValidationException(new InvalidTimestampException( + "One or more records have been rejected due to invalid timestamp"), errors) + } else { + throw new RecordValidationException(new InvalidRecordException( + "One or more records have been rejected due to " + errors.size + " record errors " + + "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors) + } } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 5676a8ee94121..4275684230736 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -25,7 +25,7 @@ import kafka.message._ import kafka.metrics.KafkaYammerMetrics import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.TestUtils.meterCount -import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{InvalidRecordException, TopicPartition} @@ -1352,7 +1352,7 @@ class LogValidatorTest { requestLocal = RequestLocal.withThreadConfinedCaching) ) - assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) assertTrue(e.recordErrors.nonEmpty) assertEquals(e.recordErrors.size, 3) } @@ -1397,9 +1397,8 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) ) // if there is a mix of both regular InvalidRecordException and InvalidTimestampException, - // InvalidTimestampException is no longer takes precedence. The type of invalidException - // is unified as InvalidRecordException - assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) + // InvalidTimestampException takes precedence + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) assertTrue(e.recordErrors.nonEmpty) assertEquals(6, e.recordErrors.size) }