diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 9b945366013b8..7c9d70b8d6784 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -226,6 +226,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(batchIndex, message); } + + @Override + public String toString() { + return "RecordError(" + + "batchIndex=" + batchIndex + + ", message=" + ((message == null) ? "null" : "'" + message + "'") + + ")"; + } } public static ProduceResponse parse(ByteBuffer buffer, short version) { diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 925c60294a6af..b40598f243084 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.Logging -import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition @@ -571,13 +571,9 @@ private[log] object LogValidator extends Logging { private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = { if (recordErrors.nonEmpty) { val errors = recordErrors.map(_.recordError) - if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) { - throw new RecordValidationException(new InvalidTimestampException( - "One or more records have been rejected due to invalid timestamp"), errors) - } else { - throw new RecordValidationException(new InvalidRecordException( - "One or more records have been rejected"), errors) - } + throw new RecordValidationException(new InvalidRecordException( + "One or more records have been rejected due to " + errors.size + " record errors " + + "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors) } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 4275684230736..5676a8ee94121 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -25,7 +25,7 @@ import kafka.message._ import kafka.metrics.KafkaYammerMetrics import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.TestUtils.meterCount -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{InvalidRecordException, TopicPartition} @@ -1352,7 +1352,7 @@ class LogValidatorTest { requestLocal = RequestLocal.withThreadConfinedCaching) ) - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) assertTrue(e.recordErrors.nonEmpty) assertEquals(e.recordErrors.size, 3) } @@ -1397,8 +1397,9 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) ) // if there is a mix of both regular InvalidRecordException and InvalidTimestampException, - // InvalidTimestampException takes precedence - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + // InvalidTimestampException is no longer takes precedence. The type of invalidException + // is unified as InvalidRecordException + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) assertTrue(e.recordErrors.nonEmpty) assertEquals(6, e.recordErrors.size) }