From 6de88c3419b49d94d033c04153fc90073e4dff83 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 10 Jul 2024 06:46:44 +0800 Subject: [PATCH 1/6] fix InvalidMessageCrcRecordsPerSec is not updated in validating LegacyRecord --- .../unit/kafka/log/LogValidatorTest.scala | 53 ++++++++++++++++++- .../storage/internals/log/LogValidator.java | 2 +- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index c2b29bcc6d6f2..67406d9551e72 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.TestUtils.meterCount import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression} -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{PrimitiveRef, Time} import org.apache.kafka.common.{InvalidRecordException, TopicPartition} @@ -687,6 +687,57 @@ class LogValidatorTest { verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = true) } + @Test + def testInvalidChecksum(): Unit = { + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), CompressionType.GZIP) + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(),CompressionType.GZIP) + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.lz4().build(), CompressionType.LZ4) + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.lz4().build(), CompressionType.LZ4) + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.snappy().build(), CompressionType.SNAPPY) + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.snappy().build(), CompressionType.SNAPPY) + } + + private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = { + val record: LegacyRecord = LegacyRecord.create(magic, 0L, null, "hello".getBytes) + val buf: ByteBuffer = record.buffer() + + // enforce modify crc to make checksum error + buf.put(LegacyRecord.CRC_OFFSET, 0.toByte) + + val buffer: ByteBuffer = ByteBuffer.allocate(1024) + val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, magic, compression, + TimestampType.CREATE_TIME, 0L) + builder.appendUncheckedWithOffset(0, record) + + val memoryRecords: MemoryRecords = builder.build() + val logValidator: LogValidator = new LogValidator( + memoryRecords, + topicPartition, + time, + compressionType, + compression, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting + ) + + assertThrows(classOf[CorruptRecordException], () => + logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching.bufferSupplier + ) + ) + + assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1) + assertTrue(meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) + } + @Test def testCompressedV2(): Unit = { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index c42b550a03e0d..77f8a57aa64ab 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -574,7 +574,7 @@ private static Optional validateRecord(RecordBatch batch, if (batch.magic() <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed()) { try { record.ensureValid(); - } catch (InvalidRecordException e) { + } catch (InvalidRecordException | CorruptRecordException e) { metricsRecorder.recordInvalidChecksums(); throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition); } From b8e02ac7d688fc32e7e044f9eb153e6423ba1360 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 10 Jul 2024 06:50:41 +0800 Subject: [PATCH 2/6] fix code style --- core/src/test/scala/unit/kafka/log/LogValidatorTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 67406d9551e72..0cba0bf3e68d2 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -699,7 +699,7 @@ class LogValidatorTest { private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = { val record: LegacyRecord = LegacyRecord.create(magic, 0L, null, "hello".getBytes) - val buf: ByteBuffer = record.buffer() + val buf: ByteBuffer = record.buffer // enforce modify crc to make checksum error buf.put(LegacyRecord.CRC_OFFSET, 0.toByte) @@ -709,7 +709,7 @@ class LogValidatorTest { TimestampType.CREATE_TIME, 0L) builder.appendUncheckedWithOffset(0, record) - val memoryRecords: MemoryRecords = builder.build() + val memoryRecords: MemoryRecords = builder.build val logValidator: LogValidator = new LogValidator( memoryRecords, topicPartition, From 6a3d825eb86cd2074d23d10d728f7eeb4696d036 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 17 Jul 2024 05:29:25 +0800 Subject: [PATCH 3/6] Use csvSource; --- .../scala/unit/kafka/log/LogValidatorTest.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 0cba0bf3e68d2..61f2f2feb1eb3 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -33,6 +33,8 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, Recor import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource import scala.jdk.CollectionConverters._ @@ -687,14 +689,10 @@ class LogValidatorTest { verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = true) } - @Test - def testInvalidChecksum(): Unit = { - checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), CompressionType.GZIP) - checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(),CompressionType.GZIP) - checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.lz4().build(), CompressionType.LZ4) - checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.lz4().build(), CompressionType.LZ4) - checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.snappy().build(), CompressionType.SNAPPY) - checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.snappy().build(), CompressionType.SNAPPY) + @ParameterizedTest + @CsvSource(Array("0,gzip", "1,gzip", "0,lz4", "1,lz4", "0,snappy", "1,snappy")) + def testInvalidChecksum(code: Byte, compression: String): Unit = { + checkInvalidChecksum(code, Compression.of(compression).build(), CompressionType.forName(compression)) } private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = { From 144d9f2c848457f1b2abee97753c49aee28b7937 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 17 Jul 2024 06:24:44 +0800 Subject: [PATCH 4/6] Fix doc --- .../main/java/org/apache/kafka/common/record/LegacyRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index e14a92d86511c..0b54e50fc5886 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -124,7 +124,7 @@ public boolean isValid() { } /** - * Throw an InvalidRecordException if isValid is false for this record + * Throw an CorruptRecordException if isValid is false for this record */ public void ensureValid() { if (sizeInBytes() < RECORD_OVERHEAD_V0) From 03c3866a61a43ec58cd2c3a6a8a1740b7785b063 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 18 Jul 2024 17:56:25 +0800 Subject: [PATCH 5/6] Trigger CI From 46f90ba52a522584abcbc3ae28e64a62652ee57d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 24 Jul 2024 09:07:20 +0800 Subject: [PATCH 6/6] address comments --- .../org/apache/kafka/storage/internals/log/LogValidator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 77f8a57aa64ab..ea4723098d8ff 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -574,7 +574,10 @@ private static Optional validateRecord(RecordBatch batch, if (batch.magic() <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed()) { try { record.ensureValid(); - } catch (InvalidRecordException | CorruptRecordException e) { + } catch (CorruptRecordException e) { + metricsRecorder.recordInvalidChecksums(); + throw e; + } catch (InvalidRecordException e) { metricsRecorder.recordInvalidChecksums(); throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition); }