Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/src/main/scala/kafka/log/LogValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException}
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -571,9 +571,14 @@ private[log] object LogValidator extends Logging {
private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
if (recordErrors.nonEmpty) {
val errors = recordErrors.map(_.recordError)
throw new RecordValidationException(new InvalidRecordException(
"One or more records have been rejected due to " + errors.size + " record errors " +
"in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
throw new RecordValidationException(new InvalidTimestampException(
"One or more records have been rejected due to invalid timestamp"), errors)
} else {
throw new RecordValidationException(new InvalidRecordException(
"One or more records have been rejected due to " + errors.size + " record errors " +
"in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
}
}
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.message._
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
Expand Down Expand Up @@ -1352,7 +1352,7 @@ class LogValidatorTest {
requestLocal = RequestLocal.withThreadConfinedCaching)
)

assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 3)
}
Expand Down Expand Up @@ -1397,9 +1397,8 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
)
// if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
// InvalidTimestampException is no longer takes precedence. The type of invalidException
// is unified as InvalidRecordException
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
// InvalidTimestampException takes precedence
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
Comment on lines 1399 to +1401
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since v2.4, (PR: #7612), we intentionally let InvalidTimestampException take precedence. I think we should keep the same behavior as before.

assertTrue(e.recordErrors.nonEmpty)
assertEquals(6, e.recordErrors.size)
}
Expand Down