Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public boolean isValid() {
}

/**
* Throw an InvalidRecordException if isValid is false for this record
* Throw an CorruptRecordException if isValid is false for this record
*/
public void ensureValid() {
if (sizeInBytes() < RECORD_OVERHEAD_V0)
Expand Down
51 changes: 50 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression}
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{PrimitiveRef, Time}
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
Expand All @@ -33,6 +33,8 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, Recor
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -687,6 +689,53 @@ class LogValidatorTest {
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = true)
}
@ParameterizedTest
@CsvSource(Array("0,gzip", "1,gzip", "0,lz4", "1,lz4", "0,snappy", "1,snappy"))
def testInvalidChecksum(code: Byte, compression: String): Unit = {
checkInvalidChecksum(code, Compression.of(compression).build(), CompressionType.forName(compression))
}

private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = {
val record: LegacyRecord = LegacyRecord.create(magic, 0L, null, "hello".getBytes)
val buf: ByteBuffer = record.buffer

// enforce modify crc to make checksum error
buf.put(LegacyRecord.CRC_OFFSET, 0.toByte)

val buffer: ByteBuffer = ByteBuffer.allocate(1024)
val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, magic, compression,
TimestampType.CREATE_TIME, 0L)
builder.appendUncheckedWithOffset(0, record)

val memoryRecords: MemoryRecords = builder.build
val logValidator: LogValidator = new LogValidator(
memoryRecords,
topicPartition,
time,
compressionType,
compression,
false,
magic,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latestTesting
)

assertThrows(classOf[CorruptRecordException], () =>
logValidator.validateMessagesAndAssignOffsets(
PrimitiveRef.ofLong(0),
metricsRecorder,
RequestLocal.withThreadConfinedCaching.bufferSupplier
)
)

assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1)
assertTrue(meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0)
}


@Test
def testCompressedV2(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ private static Optional<ApiRecordError> validateRecord(RecordBatch batch,
if (batch.magic() <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed()) {
try {
record.ensureValid();
} catch (CorruptRecordException e) {
metricsRecorder.recordInvalidChecksums();
throw e;
} catch (InvalidRecordException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to keep consistency, we should rethrow CorruptRecordException here. for example:

            } catch (CorruptRecordException e) {
                metricsRecorder.recordInvalidChecksums();
                throw e;
            } catch (InvalidRecordException e) {
                metricsRecorder.recordInvalidChecksums();
                throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition);
            }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for this suggestion.

metricsRecorder.recordInvalidChecksums();
throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition);
Expand Down