Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 75 additions & 28 deletions core/src/main/scala/kafka/log/LogValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Time

import scala.collection.{Seq, mutable}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

private[kafka] object LogValidator extends Logging {

Expand Down Expand Up @@ -147,14 +148,15 @@ private[kafka] object LogValidator extends Logging {
throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic")
}

/**
* This method returns an Option object that potentially holds the why a reason is rejected
*/
private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long,
timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean,
brokerTopicStats: BrokerTopicStats): Unit = {
brokerTopicStats: BrokerTopicStats): Option[String] = {
if (!record.hasMagic(batch.magic)) {
brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
return Some(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.")
}

// verify the record-level CRC only if this is one of the deep entries of a compressed message
Expand All @@ -171,7 +173,10 @@ private[kafka] object LogValidator extends Logging {
}
}

validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats)
val result = validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats)
if (result.isDefined)
return result

validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs)
}

Expand Down Expand Up @@ -205,10 +210,16 @@ private[kafka] object LogValidator extends Logging {
for (batch <- records.batches.asScala) {
validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats)

val recordErrors = ListBuffer[RecordError]()
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match {
case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage)
case None =>
}
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
}

processRecordErrors(recordErrors)
}

val convertedRecords = builder.build()
Expand Down Expand Up @@ -247,8 +258,12 @@ private[kafka] object LogValidator extends Logging {
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxBatchTimestamp = -1L

val recordErrors = ListBuffer[RecordError]()
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match {
case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage)
case None =>
}

val offset = offsetCounter.getAndIncrement()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
Expand All @@ -257,6 +272,8 @@ private[kafka] object LogValidator extends Logging {
}
}

processRecordErrors(recordErrors)

if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) {
maxTimestamp = maxBatchTimestamp
offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp
Expand Down Expand Up @@ -354,30 +371,37 @@ private[kafka] object LogValidator extends Logging {
batch.streamingIterator(BufferSupplier.NO_CACHING)

try {
val recordErrors = ListBuffer[RecordError]()
for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new RecordValidationException(
new InvalidRecordException(s"Compressed outer record should not have an inner record with a compression attribute set: $record"),
List(new RecordError(batchIndex)))

validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
recordErrors += new RecordError(
batchIndex,
s"Compressed outer record should not have an inner record with a compression attribute set: $record"
)

validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) match {
case Some(errorMessage) => recordErrors += new RecordError(batchIndex, errorMessage)
case None =>
}

uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// inner records offset should always be continuous
val expectedOffset = expectedInnerOffset.getAndIncrement()
if (record.offset != expectedOffset) {
brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
recordErrors += new RecordError(
batchIndex,
s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."
)
}
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
}

validatedRecords += record
}
processRecordErrors(recordErrors)
} finally {
recordsIterator.close()
}
Expand Down Expand Up @@ -465,37 +489,60 @@ private[kafka] object LogValidator extends Logging {
recordConversionStats = recordConversionStats)
}

private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) {
/**
* This method returns an Option object that potentially holds the message that a record is rejected because of
* having no key in a compacted topic
*/
private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Option[String] = {
if (compactedTopic && !record.hasKey) {
brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark()
throw new RecordValidationException(
new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."),
List(new RecordError(batchIndex)))
return Some(s"Compacted topic cannot accept message without key in topic partition $topicPartition.")
}

None
}

/**
* This method validates the timestamps of a message.
* If the message is using create time, this method checks if it is within acceptable range.
* If a record has invalid timetamp or is out of range within acceptable timestamp span, this method
* returns an Option object with the message.
*
* The decision to make this function returns an Option object is based on KIP-467
*/
private def validateTimestamp(batch: RecordBatch,
record: Record,
batchIndex: Int,
now: Long,
timestampType: TimestampType,
timestampDiffMaxMs: Long): Unit = {
timestampDiffMaxMs: Long): Option[String] = {
if (timestampType == TimestampType.CREATE_TIME
&& record.timestamp != RecordBatch.NO_TIMESTAMP
&& math.abs(record.timestamp - now) > timestampDiffMaxMs)
throw new RecordValidationException(
new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"),
List(new RecordError(batchIndex)))
return Some(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")

if (batch.timestampType == TimestampType.LOG_APPEND_TIME)
throw new RecordValidationException(
new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime."),
List(new RecordError(batchIndex)))
return Some(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime.")

None
}

private def processRecordErrors(recordErrors: ListBuffer[RecordError]): Unit = {
if (recordErrors.nonEmpty) {
// if the first RecordError is related to timestamp, we'll set the Exception to
// InvalidTimestampException
if (recordErrors.exists(re => re.message.contains("Invalid timestamp") || re.message.contains("The timestamp"))) {
throw new RecordValidationException(
new InvalidTimestampException("One or more records have been rejected due to invalid timestamp"),
recordErrors.toList)
} else {
throw new RecordValidationException(
new InvalidRecordException("One or more records have been rejected"),
recordErrors.toList)
}
}
}

case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
Expand Down
33 changes: 20 additions & 13 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
Expand Down Expand Up @@ -1875,24 +1875,31 @@ class LogTest {
val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact)
val log = createLog(logDir, logConfig)

try {
val errorMsgPrefix = "Compacted topic cannot accept message without key"

var e = intercept[RecordValidationException] {
log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case _: RecordValidationException => // this is good
}
try {
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertEquals(1, e.recordErrors.size)
assertEquals(0, e.recordErrors.head.batchIndex)
assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix))

e = intercept[RecordValidationException] {
log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case _: RecordValidationException => // this is good
}
try {
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertEquals(1, e.recordErrors.size)
assertEquals(0, e.recordErrors.head.batchIndex)
assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix))

e = intercept[RecordValidationException] {
log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case _: RecordValidationException => // this is good
}
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertEquals(1, e.recordErrors.size)
assertEquals(1, e.recordErrors.head.batchIndex) // batch index is 1
assertTrue(e.recordErrors.head.message.startsWith(errorMsgPrefix))

// check if metric for NoKeyCompactedTopicRecordsPerSec is logged
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec}")), 1)
Expand Down
42 changes: 36 additions & 6 deletions core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1276,9 +1276,7 @@ class LogValidatorTest {

assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 1)
assertEquals(e.recordErrors.head.batchIndex, 0)
assertNull(e.recordErrors.head.message)
assertEquals(e.recordErrors.size, 3)
}

@Test
Expand All @@ -1289,11 +1287,43 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
}

e.recordErrors.foreach(e => println(e.batchIndex + " " + e.message))
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 1)
assertEquals(e.recordErrors.head.batchIndex, 0)
assertNull(e.recordErrors.head.message)
// recordsWithInvalidInnerMagic creates 20 records
assertEquals(e.recordErrors.size, 20)
e.recordErrors.foreach(assertNotNull(_))
}

@Test
def testBatchWithInvalidRecordsAndInvalidTimestamp(): Unit = {
val records = (0 until 5).map(id =>
LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, null, id.toString.getBytes())
)

val buffer = ByteBuffer.allocate(1024)
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP,
TimestampType.CREATE_TIME, 0L)
var offset = 0

// we want to mix in a record with invalid timestamp range
builder.appendUncheckedWithOffset(offset, LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1,
1200L, null, "timestamp".getBytes))
records.foreach { record =>
offset += 30
builder.appendUncheckedWithOffset(offset, record)
}
val invalidOffsetTimestampRecords = builder.build()

val e = intercept[RecordValidationException] {
validateMessages(invalidOffsetTimestampRecords,
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
}
// if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
// InvalidTimestampException takes precedence
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(6, e.recordErrors.size)
}

private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = {
Expand Down
12 changes: 9 additions & 3 deletions core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
Expand Down Expand Up @@ -96,10 +97,15 @@ class ProduceRequestTest extends BaseRequestTest {
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error)
assertEquals(1, partitionResponse.recordErrors.size())
// there are 3 records with InvalidTimestampException created from inner function createRecords
assertEquals(3, partitionResponse.recordErrors.size())
assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex)
assertNull(partitionResponse.recordErrors.get(0).message)
assertNotNull(partitionResponse.errorMessage)
assertEquals(1, partitionResponse.recordErrors.get(1).batchIndex)
assertEquals(2, partitionResponse.recordErrors.get(2).batchIndex)
for (recordError <- partitionResponse.recordErrors.asScala) {
assertNotNull(recordError.message)
}
assertEquals("One or more records have been rejected due to invalid timestamp", partitionResponse.errorMessage)
}

@Test
Expand Down