diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index e14a92d86511c..0b54e50fc5886 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -124,7 +124,7 @@ public boolean isValid() { } /** - * Throw an InvalidRecordException if isValid is false for this record + * Throw an CorruptRecordException if isValid is false for this record */ public void ensureValid() { if (sizeInBytes() < RECORD_OVERHEAD_V0) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index c2b29bcc6d6f2..61f2f2feb1eb3 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.TestUtils.meterCount import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression} -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{PrimitiveRef, Time} import org.apache.kafka.common.{InvalidRecordException, TopicPartition} @@ -33,6 +33,8 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, Recor import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource import scala.jdk.CollectionConverters._ @@ -687,6 +689,53 @@ class LogValidatorTest { verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = true) } + @ParameterizedTest + @CsvSource(Array("0,gzip", "1,gzip", "0,lz4", "1,lz4", "0,snappy", "1,snappy")) + def testInvalidChecksum(code: Byte, compression: String): Unit = { + checkInvalidChecksum(code, Compression.of(compression).build(), CompressionType.forName(compression)) + } + + private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = { + val record: LegacyRecord = LegacyRecord.create(magic, 0L, null, "hello".getBytes) + val buf: ByteBuffer = record.buffer + + // enforce modify crc to make checksum error + buf.put(LegacyRecord.CRC_OFFSET, 0.toByte) + + val buffer: ByteBuffer = ByteBuffer.allocate(1024) + val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, magic, compression, + TimestampType.CREATE_TIME, 0L) + builder.appendUncheckedWithOffset(0, record) + + val memoryRecords: MemoryRecords = builder.build + val logValidator: LogValidator = new LogValidator( + memoryRecords, + topicPartition, + time, + compressionType, + compression, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting + ) + + assertThrows(classOf[CorruptRecordException], () => + logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching.bufferSupplier + ) + ) + + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1) + assertTrue(meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) + } + @Test def testCompressedV2(): Unit = { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index c42b550a03e0d..ea4723098d8ff 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -574,6 +574,9 @@ private static Optional validateRecord(RecordBatch batch, if (batch.magic() <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed()) { try { record.ensureValid(); + } catch (CorruptRecordException e) { + metricsRecorder.recordInvalidChecksums(); + throw e; } catch (InvalidRecordException e) { metricsRecorder.recordInvalidChecksums(); throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition);