diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 18fd7938ad6b9..cc76ba539c4f3 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -73,6 +73,8 @@ + + diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala deleted file mode 100644 index ea2349059b02b..0000000000000 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ /dev/null @@ -1,1849 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log - -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit -import kafka.server.{BrokerTopicStats, RequestLocal} -import kafka.utils.TestUtils.meterCount -import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{PrimitiveRef, Time} -import org.apache.kafka.common.{InvalidRecordException, TopicPartition} -import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult -import org.apache.kafka.server.metrics.KafkaYammerMetrics -import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, RecordValidationException} -import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics -import org.apache.kafka.test.TestUtils -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.CsvSource - -import scala.jdk.CollectionConverters._ - -class LogValidatorTest { - - val time = Time.SYSTEM - val topicPartition = new TopicPartition("topic", 0) - val metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala - val metricsRecorder = UnifiedLog.newValidatorMetricsRecorder(new BrokerTopicStats().allTopicsStats) - - @Test - def testOnlyOneBatch(): Unit = { - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE) - checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()) - } - - @Test - def testAllowMultiBatch(): Unit = { - checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE) - checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE) - checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()) - checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()) - } - - @Test - def testValidationOfBatchesWithNonSequentialInnerOffsets(): Unit = { - def testMessageValidation(magicValue: Byte): Unit = { - val numRecords = 20 - val compression: Compression = Compression.gzip().build() - val invalidRecords = recordsWithNonSequentialInnerOffsets(magicValue, compression, numRecords) - - // Validation for v2 and above is strict for this case. For older formats, we fix invalid - // internal offsets by rewriting the batch. - if (magicValue >= RecordBatch.MAGIC_VALUE_V2) { - assertThrows(classOf[InvalidRecordException], - () => validateMessages(invalidRecords, magicValue, CompressionType.GZIP, compression) - ) - } else { - val result = validateMessages(invalidRecords, magicValue, CompressionType.GZIP, compression) - assertEquals(0 until numRecords, result.validatedRecords.records.asScala.map(_.offset)) - } - } - - for (version <- RecordVersion.values) { - testMessageValidation(version.value) - } - } - - @Test - def testMisMatchMagic(): Unit = { - val compression: Compression = Compression.gzip().build() - checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compression) - checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, compression) - } - - private def checkOnlyOneBatch(magic: Byte, sourceCompression: Compression, targetCompression: Compression): Unit = { - assertThrows(classOf[InvalidRecordException], - () => validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.`type`(), targetCompression) - ) - } - - private def checkAllowMultiBatch(magic: Byte, sourceCompression: Compression, targetCompression: Compression): Unit = { - validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.`type`(), targetCompression) - } - - private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compression: Compression): Unit = { - assertThrows(classOf[RecordValidationException], - () => validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.`type`(), compression) - ) - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MAGIC_NUMBER_RECORDS_PER_SEC}")), 1) - assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_MAGIC_NUMBER_RECORDS_PER_SEC}") > 0) - } - - private def validateMessages(records: MemoryRecords, - magic: Byte, - sourceCompressionType: CompressionType, - targetCompression: Compression): ValidationResult = { - val mockTime = new MockTime(0L, 0L) - new LogValidator(records, - topicPartition, - mockTime, - sourceCompressionType, - targetCompression, - false, - magic, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PRODUCER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.IBP_2_3_IV1 - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier) - } - - @Test - def testLogAppendTimeNonCompressedV0(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0) - } - - - @Test - def testLogAppendTimeNonCompressedV1(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) - } - - @Test - def testLogAppendTimeNonCompressedV2(): Unit = { - checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2) - } - - private def checkLogAppendTimeNonCompressed(magic: Byte): Unit = { - val mockTime = new MockTime - // The timestamps should be overwritten - val records = createRecords(magicValue = magic, timestamp = 1234L, codec = Compression.NONE) - val offsetCounter = PrimitiveRef.ofLong(0) - val validatedResults = new LogValidator(records, - topicPartition, - mockTime, - CompressionType.NONE, - Compression.NONE, - false, - magic, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - offsetCounter, - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - - assertEquals(offsetCounter.value, records.records.asScala.size) - val validatedRecords = validatedResults.validatedRecords - assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, "message set size should not change") - val now = mockTime.milliseconds - if (magic >= RecordBatch.MAGIC_VALUE_V1) - validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) - assertEquals(if (magic == RecordBatch.MAGIC_VALUE_V0) RecordBatch.NO_TIMESTAMP else now, validatedResults.maxTimestampMs) - assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") - - // If it's LOG_APPEND_TIME, the offset will be the offset of the first record - val expectedMaxTimestampOffset = magic match { - case RecordBatch.MAGIC_VALUE_V0 => -1 - case RecordBatch.MAGIC_VALUE_V1 => 0 - case _ => 2 - } - assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, - compressed = false) - } - - @Test - def testLogAppendTimeWithRecompressionV1(): Unit = { - checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1) - } - - private def checkLogAppendTimeWithRecompression(targetMagic: Byte): Unit = { - val compression: Compression = Compression.gzip().build() - val mockTime = new MockTime - // The timestamps should be overwritten - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = compression) - val validatedResults = new LogValidator( - records, - topicPartition, - mockTime, - CompressionType.GZIP, - compression, - false, - targetMagic, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - - val validatedRecords = validatedResults.validatedRecords - assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, - "message set size should not change") - val now = mockTime.milliseconds() - validatedRecords.batches.forEach(batch => validateLogAppendTime(now, -1, batch)) - assertTrue(validatedRecords.batches.iterator.next().isValid, - "MessageSet should still valid") - assertEquals(now, validatedResults.maxTimestampMs, - s"Max timestamp should be $now") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, - s"The shallow offset of max timestamp should be 2 if logAppendTime is used") - assertTrue(validatedResults.messageSizeMaybeChanged, - "Message size may have been changed") - - val stats = validatedResults.recordValidationStats - verifyRecordValidationStats(stats, numConvertedRecords = 3, records, compressed = true) - } - - @Test - def testLogAppendTimeWithRecompressionV2(): Unit = { - checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2) - } - - @Test - def testLogAppendTimeWithoutRecompressionV1(): Unit = { - checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1) - } - - private def checkLogAppendTimeWithoutRecompression(magic: Byte): Unit = { - val compression: Compression = Compression.gzip().build() - val mockTime = new MockTime - // The timestamps should be overwritten - val records = createRecords(magicValue = magic, timestamp = 1234L, codec = compression) - val validatedResults = new LogValidator( - records, - topicPartition, - mockTime, - CompressionType.GZIP, - compression, - false, - magic, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - val validatedRecords = validatedResults.validatedRecords - - assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, - "message set size should not change") - val now = mockTime.milliseconds() - validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) - assertTrue(validatedRecords.batches.iterator.next().isValid, - "MessageSet should still valid") - assertEquals(now, validatedResults.maxTimestampMs, - s"Max timestamp should be $now") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, - s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used") - assertFalse(validatedResults.messageSizeMaybeChanged, - "Message size should not have been changed") - - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, - compressed = true) - } - - @Test - def testInvalidOffsetRangeAndRecordCount(): Unit = { - // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 - validateRecordBatchWithCountOverrides(lastOffsetDelta = 2, count = 3) - - // Count and offset range are inconsistent or invalid - assertInvalidBatchCountOverrides(lastOffsetDelta = 0, count = 3) - assertInvalidBatchCountOverrides(lastOffsetDelta = 15, count = 3) - assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = 3) - assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = -3) - assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 6) - assertInvalidBatchCountOverrides(lastOffsetDelta = 2, count = 0) - assertInvalidBatchCountOverrides(lastOffsetDelta = -3, count = -2) - - // Count and offset range are consistent, but do not match the actual number of records - assertInvalidBatchCountOverrides(lastOffsetDelta = 5, count = 6) - assertInvalidBatchCountOverrides(lastOffsetDelta = 1, count = 2) - } - - private def assertInvalidBatchCountOverrides(lastOffsetDelta: Int, count: Int): Unit = { - assertThrows(classOf[InvalidRecordException], - () => validateRecordBatchWithCountOverrides(lastOffsetDelta, count)) - } - - private def validateRecordBatchWithCountOverrides(lastOffsetDelta: Int, count: Int): Unit = { - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = Compression.NONE) - records.buffer.putInt(DefaultRecordBatch.RECORDS_COUNT_OFFSET, count) - records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta) - new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - Compression.gzip().build(), - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - } - - @Test - def testLogAppendTimeWithoutRecompressionV2(): Unit = { - checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2) - } - - @Test - def testNonCompressedV1(): Unit = { - checkNonCompressed(RecordBatch.MAGIC_VALUE_V1) - } - - private def checkNonCompressed(magic: Byte): Unit = { - val now = System.currentTimeMillis() - // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp - val timestampSeq = Seq(now - 1, now + 1, now) - - val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = - if (magic >= RecordBatch.MAGIC_VALUE_V2) - (1324L, 10.toShort, 984, true, 40) - else - (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH) - - val recordList = List( - new SimpleRecord(timestampSeq(0), "hello".getBytes), - new SimpleRecord(timestampSeq(1), "there".getBytes), - new SimpleRecord(timestampSeq(2), "beautiful".getBytes) - ) - - val records = MemoryRecords.withRecords(magic, 0L, Compression.NONE, TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*) - - val offsetCounter = PrimitiveRef.ofLong(0) - val validatingResults = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - magic, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - partitionLeaderEpoch, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - offsetCounter, - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - - val validatedRecords = validatingResults.validatedRecords - - var i = 0 - for (batch <- validatedRecords.batches.asScala) { - assertTrue(batch.isValid) - assertEquals(batch.timestampType, TimestampType.CREATE_TIME) - maybeCheckBaseTimestamp(timestampSeq(0), batch) - assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) - assertEquals(producerEpoch, batch.producerEpoch) - assertEquals(producerId, batch.producerId) - assertEquals(baseSequence, batch.baseSequence) - assertEquals(isTransactional, batch.isTransactional) - assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch) - for (record <- batch.asScala) { - record.ensureValid() - assertEquals(timestampSeq(i), record.timestamp) - i += 1 - } - } - - assertEquals(i, offsetCounter.value) - assertEquals(now + 1, validatingResults.maxTimestampMs, - s"Max timestamp should be ${now + 1}") - - // V2: Only one batch is in the records, so the shallow OffsetOfMaxTimestamp is the last offset of the single batch - // V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of batch-1 - if (magic >= RecordBatch.MAGIC_VALUE_V2) { - assertEquals(1, records.batches().asScala.size) - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) - } else { - assertEquals(3, records.batches().asScala.size) - assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp) - } - - assertFalse(validatingResults.messageSizeMaybeChanged, - "Message size should not have been changed") - verifyRecordValidationStats(validatingResults.recordValidationStats, numConvertedRecords = 0, records, - compressed = false) - } - - @Test - def testNonCompressedV2(): Unit = { - checkNonCompressed(RecordBatch.MAGIC_VALUE_V2) - } - - @Test - def testRecompressionV1(): Unit = { - checkRecompression(RecordBatch.MAGIC_VALUE_V1) - } - - private def checkRecompression(magic: Byte): Unit = { - val now = System.currentTimeMillis() - // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp - val timestampSeq = Seq(now - 1, now + 1, now) - - val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = - if (magic >= RecordBatch.MAGIC_VALUE_V2) - (1324L, 10.toShort, 984, true, 40) - else - (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH) - - val records = MemoryRecords.withRecords(magic, 0L, Compression.NONE, TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, - new SimpleRecord(timestampSeq(0), "hello".getBytes), - new SimpleRecord(timestampSeq(1), "there".getBytes), - new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) - - // V2 has single batch, and other versions has many single-record batches - assertEquals(if (magic >= RecordBatch.MAGIC_VALUE_V2) 1 else 3, records.batches().asScala.size) - - val validatingResults = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.gzip().build(), - false, - magic, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - partitionLeaderEpoch, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - val validatedRecords = validatingResults.validatedRecords - - var i = 0 - for (batch <- validatedRecords.batches.asScala) { - assertTrue(batch.isValid) - assertEquals(batch.timestampType, TimestampType.CREATE_TIME) - maybeCheckBaseTimestamp(timestampSeq(0), batch) - assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) - assertEquals(producerEpoch, batch.producerEpoch) - assertEquals(producerId, batch.producerId) - assertEquals(baseSequence, batch.baseSequence) - assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch) - for (record <- batch.asScala) { - record.ensureValid() - assertEquals(timestampSeq(i), record.timestamp) - i += 1 - } - } - assertEquals(now + 1, validatingResults.maxTimestampMs, - s"Max timestamp should be ${now + 1}") - - // Both V2 and V1 has single batch in the validated records when compression is enable, and hence their shallow - // OffsetOfMaxTimestamp is the last offset of the single batch - assertEquals(1, validatedRecords.batches().asScala.size) - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) - assertTrue(validatingResults.messageSizeMaybeChanged, - "Message size should have been changed") - - verifyRecordValidationStats(validatingResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = true) - } - - @Test - def testRecompressionV2(): Unit = { - checkRecompression(RecordBatch.MAGIC_VALUE_V2) - } - - @Test - def testCreateTimeUpConversionV0ToV1(): Unit = { - checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1) - } - - private def checkCreateTimeUpConversionFromV0(toMagic: Byte): Unit = { - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = compression) - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - toMagic, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - val validatedRecords = validatedResults.validatedRecords - - for (batch <- validatedRecords.batches.asScala) { - assertTrue(batch.isValid) - maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch) - assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp) - assertEquals(TimestampType.CREATE_TIME, batch.timestampType) - assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch) - assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId) - assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) - } - assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, - s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp) - assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") - - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = true) - } - - @Test - def testCreateTimeUpConversionV0ToV2(): Unit = { - checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2) - } - - @Test - def testCreateTimeUpConversionV1ToV2(): Unit = { - val timestamp = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = compression, timestamp = timestamp) - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting, - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - val validatedRecords = validatedResults.validatedRecords - - for (batch <- validatedRecords.batches.asScala) { - assertTrue(batch.isValid) - maybeCheckBaseTimestamp(timestamp, batch) - assertEquals(timestamp, batch.maxTimestamp) - assertEquals(TimestampType.CREATE_TIME, batch.timestampType) - assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch) - assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId) - assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) - } - assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, - s"Offset of max timestamp should be the last offset 2.") - assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") - - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = true) - } - - @Test - def testCompressedV1(): Unit = { - checkCompressed(RecordBatch.MAGIC_VALUE_V1) - } - - private def checkCompressed(magic: Byte): Unit = { - val now = System.currentTimeMillis() - // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp - val timestampSeq = Seq(now - 1, now + 1, now) - - val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = - if (magic >= RecordBatch.MAGIC_VALUE_V2) - (1324L, 10.toShort, 984, true, 40) - else - (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH) - - val recordList = List( - new SimpleRecord(timestampSeq(0), "hello".getBytes), - new SimpleRecord(timestampSeq(1), "there".getBytes), - new SimpleRecord(timestampSeq(2), "beautiful".getBytes) - ) - - val records = MemoryRecords.withRecords(magic, 0L, Compression.gzip().build(), TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*) - - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - Compression.gzip().build(), - false, - magic, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - partitionLeaderEpoch, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - val validatedRecords = validatedResults.validatedRecords - - var i = 0 - for (batch <- validatedRecords.batches.asScala) { - assertTrue(batch.isValid) - assertEquals(batch.timestampType, TimestampType.CREATE_TIME) - maybeCheckBaseTimestamp(timestampSeq(0), batch) - assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) - assertEquals(producerEpoch, batch.producerEpoch) - assertEquals(producerId, batch.producerId) - assertEquals(baseSequence, batch.baseSequence) - assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch) - for (record <- batch.asScala) { - record.ensureValid() - assertEquals(timestampSeq(i), record.timestamp) - i += 1 - } - } - assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - - val expectedShallowOffsetOfMaxTimestamp = 2 - assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, - s"Shallow offset of max timestamp should be 2") - assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") - - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, - compressed = true) - } - @ParameterizedTest - @CsvSource(Array("0,gzip", "1,gzip", "0,lz4", "1,lz4", "0,snappy", "1,snappy")) - def testInvalidChecksum(code: Byte, compression: String): Unit = { - checkInvalidChecksum(code, Compression.of(compression).build(), CompressionType.forName(compression)) - } - - private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = { - val record: LegacyRecord = LegacyRecord.create(magic, 0L, null, "hello".getBytes) - val buf: ByteBuffer = record.buffer - - // enforce modify crc to make checksum error - buf.put(LegacyRecord.CRC_OFFSET, 0.toByte) - - val buffer: ByteBuffer = ByteBuffer.allocate(1024) - val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, magic, compression, - TimestampType.CREATE_TIME, 0L) - builder.appendUncheckedWithOffset(0, record) - - val memoryRecords: MemoryRecords = builder.build - val logValidator: LogValidator = new LogValidator( - memoryRecords, - topicPartition, - time, - compressionType, - compression, - false, - magic, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ) - - assertThrows(classOf[CorruptRecordException], () => - logValidator.validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - ) - - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}")), 1) - assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}") > 0) - } - - - @Test - def testCompressedV2(): Unit = { - checkCompressed(RecordBatch.MAGIC_VALUE_V2) - } - - @Test - def testInvalidCreateTimeNonCompressedV1(): Unit = { - val now = System.currentTimeMillis() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L, - codec = Compression.NONE) - assertThrows(classOf[RecordValidationException], () => new LogValidator( - records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testInvalidCreateTimeNonCompressedV2(): Unit = { - val now = System.currentTimeMillis() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, - codec = Compression.NONE) - assertThrows(classOf[RecordValidationException], () => new LogValidator( - records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testInvalidCreateTimeCompressedV1(): Unit = { - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L, - codec = compression) - assertThrows(classOf[RecordValidationException], () => new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testInvalidCreateTimeCompressedV2(): Unit = { - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, - codec = compression) - assertThrows(classOf[RecordValidationException], () => new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0), - metricsRecorder, - RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testAbsoluteOffsetAssignmentNonCompressed(): Unit = { - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = Compression.NONE) - val offset = 1234567 - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting, - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testAbsoluteOffsetAssignmentCompressed(): Unit = { - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = compression) - val offset = 1234567 - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testRelativeOffsetAssignmentNonCompressedV1(): Unit = { - val now = System.currentTimeMillis() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now, codec = Compression.NONE) - val offset = 1234567 - checkOffsets(records, 0) - val messageWithOffset = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords - checkOffsets(messageWithOffset, offset) - } - - @Test - def testRelativeOffsetAssignmentNonCompressedV2(): Unit = { - val now = System.currentTimeMillis() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now, codec = Compression.NONE) - val offset = 1234567 - checkOffsets(records, 0) - val messageWithOffset = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords - checkOffsets(messageWithOffset, offset) - } - - @Test - def testRelativeOffsetAssignmentCompressedV1(): Unit = { - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now, codec = compression) - val offset = 1234567 - checkOffsets(records, 0) - val compressedMessagesWithOffset = new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords - checkOffsets(compressedMessagesWithOffset, offset) - } - - @Test - def testRelativeOffsetAssignmentCompressedV2(): Unit = { - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now, codec = compression) - val offset = 1234567 - checkOffsets(records, 0) - val compressedMessagesWithOffset = new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords - checkOffsets(compressedMessagesWithOffset, offset) - } - - @Test - def testOffsetAssignmentAfterUpConversionV0ToV1NonCompressed(): Unit = { - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = Compression.NONE) - checkOffsets(records, 0) - val offset = 1234567 - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = false) - } - - @Test - def testOffsetAssignmentAfterUpConversionV0ToV2NonCompressed(): Unit = { - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = Compression.NONE) - checkOffsets(records, 0) - val offset = 1234567 - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = false) - } - - @Test - def testOffsetAssignmentAfterUpConversionV0ToV1Compressed(): Unit = { - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = compression) - val offset = 1234567 - checkOffsets(records, 0) - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = true) - } - - @Test - def testOffsetAssignmentAfterUpConversionV0ToV2Compressed(): Unit = { - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = compression) - val offset = 1234567 - checkOffsets(records, 0) - val validatedResults = new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, - compressed = true) - } - - @Test - def testControlRecordsNotAllowedFromClients(): Unit = { - val offset = 1234567 - val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) - val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) - assertThrows(classOf[InvalidRecordException], () => new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.CURRENT_MAGIC_VALUE, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testControlRecordsNotCompressed(): Unit = { - val offset = 1234567 - val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) - val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) - val result = new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.snappy().build(), - false, - RecordBatch.CURRENT_MAGIC_VALUE, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.COORDINATOR, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - val batches = TestUtils.toList(result.validatedRecords.batches) - assertEquals(1, batches.size) - val batch = batches.get(0) - assertFalse(batch.isCompressed) - } - - @Test - def testOffsetAssignmentAfterDownConversionV1ToV0NonCompressed(): Unit = { - val offset = 1234567 - val now = System.currentTimeMillis() - val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, codec = Compression.NONE) - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testOffsetAssignmentAfterDownConversionV1ToV0Compressed(): Unit = { - val offset = 1234567 - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, compression) - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testOffsetAssignmentAfterUpConversionV1ToV2NonCompressed(): Unit = { - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = Compression.NONE) - checkOffsets(records, 0) - val offset = 1234567 - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testOffsetAssignmentAfterUpConversionV1ToV2Compressed(): Unit = { - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = compression) - val offset = 1234567 - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testOffsetAssignmentAfterDownConversionV2ToV1NonCompressed(): Unit = { - val offset = 1234567 - val now = System.currentTimeMillis() - val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = Compression.NONE) - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testOffsetAssignmentAfterDownConversionV2ToV1Compressed(): Unit = { - val offset = 1234567 - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, compression) - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testDownConversionOfTransactionalRecordsNotPermitted(): Unit = { - val offset = 1234567 - val producerId = 1344L - val producerEpoch = 16.toShort - val sequence = 0 - val records = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, - new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) - assertThrows(classOf[UnsupportedForMessageFormatException], () => new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - Compression.gzip().build(), - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testDownConversionOfIdempotentRecordsNotPermitted(): Unit = { - val offset = 1234567 - val producerId = 1344L - val producerEpoch = 16.toShort - val sequence = 0 - val records = MemoryRecords.withIdempotentRecords(Compression.NONE, producerId, producerEpoch, sequence, - new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) - assertThrows(classOf[UnsupportedForMessageFormatException], () => new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - Compression.gzip().build(), - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testOffsetAssignmentAfterDownConversionV2ToV0NonCompressed(): Unit = { - val offset = 1234567 - val now = System.currentTimeMillis() - val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = Compression.NONE) - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.NONE, - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testOffsetAssignmentAfterDownConversionV2ToV0Compressed(): Unit = { - val offset = 1234567 - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, compression) - checkOffsets(records, 0) - checkOffsets(new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ).validatedRecords, offset) - } - - @Test - def testNonIncreasingOffsetRecordBatchHasMetricsLogged(): Unit = { - val records = createNonIncreasingOffsetRecords(RecordBatch.MAGIC_VALUE_V2) - records.batches().asScala.head.setLastOffset(2) - assertThrows(classOf[InvalidRecordException], () => new LogValidator(records, - topicPartition, - time, - CompressionType.GZIP, - Compression.gzip().build(), - false, - RecordBatch.MAGIC_VALUE_V0, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicMetrics.INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC}")), 1) - assertTrue(meterCount(s"${BrokerTopicMetrics.INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC}") > 0) - } - - @Test - def testCompressedBatchWithoutRecordsNotAllowed(): Unit = { - testBatchWithoutRecordsNotAllowed(CompressionType.GZIP, Compression.gzip().build()) - } - - @Test - def testZStdCompressedWithUnavailableIBPVersion(): Unit = { - // The timestamps should be overwritten - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = Compression.NONE) - assertThrows(classOf[UnsupportedCompressionTypeException], () => new LogValidator(records, - topicPartition, - time, - CompressionType.NONE, - Compression.zstd().build(), - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.LOG_APPEND_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.IBP_2_0_IV1 - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - @Test - def testUncompressedBatchWithoutRecordsNotAllowed(): Unit = { - testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.NONE) - } - - @Test - def testRecompressedBatchWithoutRecordsNotAllowed(): Unit = { - testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.gzip().build()) - } - - @Test - def testInvalidTimestampExceptionHasBatchIndex(): Unit = { - val now = System.currentTimeMillis() - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L, - codec = compression) - val e = assertThrows(classOf[RecordValidationException], - () => new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V1, - TimestampType.CREATE_TIME, - 1000L, - 1000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - ) - - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) - assertFalse(e.recordErrors.isEmpty) - assertEquals(e.recordErrors.size, 3) - } - - @Test - def testInvalidRecordExceptionHasBatchIndex(): Unit = { - val e = assertThrows(classOf[RecordValidationException], - () => { - val compression: Compression = Compression.gzip().build() - validateMessages(recordsWithInvalidInnerMagic( - RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compression), - RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, compression) - } - ) - - assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) - assertFalse(e.recordErrors.isEmpty) - // recordsWithInvalidInnerMagic creates 20 records - assertEquals(e.recordErrors.size, 20) - e.recordErrors.asScala.foreach(assertNotNull(_)) - } - - @Test - def testBatchWithInvalidRecordsAndInvalidTimestamp(): Unit = { - val compression: Compression = Compression.gzip().build() - val records = (0 until 5).map(id => - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, null, id.toString.getBytes()) - ) - - val buffer = ByteBuffer.allocate(1024) - val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compression, - TimestampType.CREATE_TIME, 0L) - var offset = 0 - - // we want to mix in a record with invalid timestamp range - builder.appendUncheckedWithOffset(offset, LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, - 1200L, null, "timestamp".getBytes)) - records.foreach { record => - offset += 30 - builder.appendUncheckedWithOffset(offset, record) - } - val invalidOffsetTimestampRecords = builder.build() - - val e = assertThrows(classOf[RecordValidationException], - () => validateMessages(invalidOffsetTimestampRecords, - RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, compression) - ) - // if there is a mix of both regular InvalidRecordException and InvalidTimestampException, - // InvalidTimestampException takes precedence - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) - assertFalse(e.recordErrors.isEmpty) - assertEquals(6, e.recordErrors.size) - } - - @Test - def testRecordWithPastTimestampIsRejected(): Unit = { - val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs - val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr - val now = System.currentTimeMillis() - val fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - (5 * 60 * 1000L) - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = fiveMinutesBeforeThreshold, - codec = compression) - val e = assertThrows(classOf[RecordValidationException], - () => new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - timestampBeforeMaxConfig, - timestampAfterMaxConfig, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - ) - - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) - assertFalse(e.recordErrors.isEmpty) - assertEquals(e.recordErrors.size, 3) - } - - - @Test - def testRecordWithFutureTimestampIsRejected(): Unit = { - val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs - val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr - val now = System.currentTimeMillis() - val fiveMinutesAfterThreshold = now + timestampAfterMaxConfig + (5 * 60 * 1000L) - val compression: Compression = Compression.gzip().build() - val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = fiveMinutesAfterThreshold, - codec = compression) - val e = assertThrows(classOf[RecordValidationException], - () => new LogValidator( - records, - topicPartition, - time, - CompressionType.GZIP, - compression, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - timestampBeforeMaxConfig, - timestampAfterMaxConfig, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - ) - - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) - assertFalse(e.recordErrors.isEmpty) - assertEquals(e.recordErrors.size, 3) - } - - @Test - def testDifferentLevelDoesNotCauseRecompression(): Unit = { - val records = List( - List.fill(256)("some").mkString("").getBytes, - List.fill(256)("data").mkString("").getBytes - ) - // Records from the producer were created with gzip max level - val gzipMax: Compression = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build() - val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) - - // The topic is configured with gzip min level - val gzipMin: Compression = Compression.gzip().level(CompressionType.GZIP.minLevel()).build() - val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin) - - // ensure data compressed with gzip max and min is different - assertNotEquals(recordsGzipMax, recordsGzipMin) - val validator = new LogValidator(recordsGzipMax, - topicPartition, - time, - gzipMax.`type`(), - gzipMin, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ) - val result = validator.validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - // ensure validated records have not been changed so they are the same as the producer records - assertEquals(recordsGzipMax, result.validatedRecords) - assertNotEquals(recordsGzipMin, result.validatedRecords) - } - - @Test - def testDifferentCodecCausesRecompression(): Unit = { - val records = List( - List.fill(256)("some").mkString("").getBytes, - List.fill(256)("data").mkString("").getBytes - ) - // Records from the producer were created with gzip max level - val gzipMax: Compression = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build() - val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax) - - // The topic is configured with lz4 min level - val lz4Min: Compression = Compression.lz4().level(CompressionType.LZ4.minLevel()).build() - val recordsLz4Min = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, lz4Min) - - val validator = new LogValidator(recordsGzipMax, - topicPartition, - time, - gzipMax.`type`(), - lz4Min, - false, - RecordBatch.MAGIC_VALUE_V2, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ) - val result = validator.validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - ) - // ensure validated records have been recompressed and match lz4 min level - assertEquals(recordsLz4Min, result.validatedRecords) - } - - private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: Compression): Unit = { - val offset = 1234567 - val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = - (1324L, 10.toShort, 984, true, 40) - val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD) - DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, - baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), - isTransactional, false) - buffer.flip() - val records = MemoryRecords.readableRecords(buffer) - assertThrows(classOf[InvalidRecordException], () => new LogValidator(records, - topicPartition, - time, - sourceCompression, - targetCompression, - false, - RecordBatch.CURRENT_MAGIC_VALUE, - TimestampType.CREATE_TIME, - 5000L, - 5000L, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - AppendOrigin.CLIENT, - MetadataVersion.latestTesting - ).validateMessagesAndAssignOffsets( - PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier - )) - } - - private def createRecords(magicValue: Byte, - timestamp: Long = RecordBatch.NO_TIMESTAMP, - codec: Compression): MemoryRecords = { - val records = List("hello".getBytes, "there".getBytes, "beautiful".getBytes) - createRecords(records = records, magicValue = magicValue, timestamp = timestamp, codec = codec) - } - - private def createRecords(records: List[Array[Byte]], - magicValue: Byte, - timestamp: Long, - codec: Compression): MemoryRecords = { - val buf = ByteBuffer.allocate(512) - val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) - records.indices.foreach { offset => - builder.appendWithOffset(offset, timestamp, null, records(offset)) - } - builder.build() - } - - private def createNonIncreasingOffsetRecords(magicValue: Byte, - timestamp: Long = RecordBatch.NO_TIMESTAMP, - codec: Compression = Compression.NONE): MemoryRecords = { - val buf = ByteBuffer.allocate(512) - val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) - builder.appendWithOffset(0, timestamp, null, "hello".getBytes) - builder.appendWithOffset(2, timestamp, null, "there".getBytes) - builder.appendWithOffset(3, timestamp, null, "beautiful".getBytes) - builder.build() - } - - private def createTwoBatchedRecords(magicValue: Byte, codec: Compression): MemoryRecords = { - val buf = ByteBuffer.allocate(2048) - var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) - builder.append(10L, "1".getBytes(), "a".getBytes()) - builder.close() - builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L) - builder.append(11L, "2".getBytes(), "b".getBytes()) - builder.append(12L, "3".getBytes(), "c".getBytes()) - builder.close() - - buf.flip() - MemoryRecords.readableRecords(buf.slice()) - } - - /* check that offsets are assigned consecutively from the given base offset */ - def checkOffsets(records: MemoryRecords, baseOffset: Long): Unit = { - assertTrue(records.records.asScala.nonEmpty, "Message set should not be empty") - var offset = baseOffset - for (entry <- records.records.asScala) { - assertEquals(offset, entry.offset, "Unexpected offset in message set iterator") - offset += 1 - } - } - - private def recordsWithNonSequentialInnerOffsets(magicValue: Byte, - compression: Compression, - numRecords: Int): MemoryRecords = { - val records = (0 until numRecords).map { id => - new SimpleRecord(id.toString.getBytes) - } - - val buffer = ByteBuffer.allocate(1024) - val builder = MemoryRecords.builder(buffer, magicValue, compression, TimestampType.CREATE_TIME, 0L) - - records.foreach { record => - builder.appendUncheckedWithOffset(0, record) - } - - builder.build() - } - - private def recordsWithInvalidInnerMagic(batchMagicValue: Byte, - recordMagicValue: Byte, - codec: Compression): MemoryRecords = { - val records = (0 until 20).map(id => - LegacyRecord.create(recordMagicValue, - RecordBatch.NO_TIMESTAMP, - id.toString.getBytes, - id.toString.getBytes)) - - val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) - val builder = MemoryRecords.builder(buffer, batchMagicValue, codec, - TimestampType.CREATE_TIME, 0L) - - var offset = 1234567 - records.foreach { record => - builder.appendUncheckedWithOffset(offset, record) - offset += 1 - } - - builder.build() - } - - def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = { - batch match { - case b: DefaultRecordBatch => - assertEquals(expected, b.baseTimestamp, s"Unexpected base timestamp of batch $batch") - case _ => // no-op - } - } - - /** - * expectedLogAppendTime is only checked if batch.magic is V2 or higher - */ - def validateLogAppendTime(expectedLogAppendTime: Long, expectedBaseTimestamp: Long, batch: RecordBatch): Unit = { - assertTrue(batch.isValid) - assertTrue(batch.timestampType == TimestampType.LOG_APPEND_TIME) - assertEquals(expectedLogAppendTime, batch.maxTimestamp, s"Unexpected max timestamp of batch $batch") - maybeCheckBaseTimestamp(expectedBaseTimestamp, batch) - for (record <- batch.asScala) { - record.ensureValid() - assertEquals(expectedLogAppendTime, record.timestamp, s"Unexpected timestamp of record $record") - } - } - - def verifyRecordValidationStats(stats: RecordValidationStats, numConvertedRecords: Int, records: MemoryRecords, - compressed: Boolean): Unit = { - assertNotNull(stats, "Records processing info is null") - assertEquals(numConvertedRecords, stats.numRecordsConverted) - if (numConvertedRecords > 0) { - assertTrue(stats.conversionTimeNanos >= 0, s"Conversion time not recorded $stats") - assertTrue(stats.conversionTimeNanos <= TimeUnit.MINUTES.toNanos(1), s"Conversion time not valid $stats") - } - val originalSize = records.sizeInBytes - val tempBytes = stats.temporaryMemoryBytes - if (numConvertedRecords > 0 && compressed) - assertTrue(tempBytes > originalSize, s"Temp bytes too small, orig=$originalSize actual=$tempBytes") - else if (numConvertedRecords > 0 || compressed) - assertTrue(tempBytes > 0, "Temp bytes not updated") - else - assertEquals(0, tempBytes) - } -} diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java new file mode 100644 index 0000000000000..7dc5268aa7832 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java @@ -0,0 +1,2155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @ParameterizedTest + @CsvSource({ + "0,gzip,none", "1,gzip,none", "2,gzip,none", + "0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip", + "0,snappy,gzip", "1,snappy,gzip", "2,snappy,gzip", + "0,lz4,gzip", "1,lz4,gzip", "2,lz4,gzip", + "2,none,none", "2,none,gzip", + "2,zstd,gzip", + }) + public void checkOnlyOneBatch(Byte magic, String sourceCompression, + String targetCompression) { + assertThrows(InvalidRecordException.class, + () -> validateMessages(createTwoBatchedRecords(magic, Compression.of(sourceCompression).build()), + magic, CompressionType.forName(sourceCompression), Compression.of(targetCompression).build()) + ); + } + + + private static Stream testAllCompression() { + return Arrays.stream(CompressionType.values()).flatMap(source -> + Arrays.stream(CompressionType.values()).map(target -> + Arguments.of(source.name, target.name))); + } + + @ParameterizedTest + @MethodSource("testAllCompression") + public void testBatchWithoutRecordsNotAllowed(String sourceCompressionName, String targetCompressionName) { + long offset = 1234567; + long producerId = 1324L; + short producerEpoch = 10; + int baseSequence = 984; + boolean isTransactional = true; + int partitionLeaderEpoch = 40; + CompressionType sourceCompression = CompressionType.forName(sourceCompressionName); + Compression targetCompression = Compression.of(targetCompressionName).build(); + + + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, + baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), + isTransactional, false); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + assertThrows(InvalidRecordException.class, () -> new LogValidator(records, + topicPartition, + time, + sourceCompression, + targetCompression, + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + @ParameterizedTest + @CsvSource({"0,1,gzip", "1,0,gzip"}) + public void checkMismatchMagic(byte batchMagic, byte recordMagic, String compressionName) { + Compression compression = Compression.of(compressionName).build(); + assertThrows(RecordValidationException.class, + () -> validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression + ), batchMagic, compression.type(), compression)); + + assertTrue(metricsRecorder.recordInvalidMagicCount > 0); + } + @Test + public void testCreateTimeUpConversionV1ToV2() { + long timestamp = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, timestamp, compression); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(timestamp, batch); + assertEquals(timestamp, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + + assertEquals(timestamp, validatedResults.maxTimestampMs); + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset of max timestamp should be the last offset 2."); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, + records, + true + ); + } + + @ParameterizedTest + @CsvSource({"1", "2"}) + public void checkCreateTimeUpConversionFromV0(byte toMagic) { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + LogValidator logValidator = new LogValidator(records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + toMagic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + LogValidator.ValidationResult validatedResults = logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs, + "Max timestamp should be " + RecordBatch.NO_TIMESTAMP); + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 3, records, true); + } + + @ParameterizedTest + @CsvSource({"1", "2"}) + public void checkRecompression(byte magic) { + long now = System.currentTimeMillis(); + // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.NONE, + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + // V2 has single batch, and other versions have many single-record batches + assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, iteratorSize(records.batches().iterator())); + + LogValidator.ValidationResult validatingResults = new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatingResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatingResults.maxTimestampMs, + "Max timestamp should be " + (now + 1)); + + // Both V2 and V1 have single batch in the validated records when compression is enabled, and hence their shallow + // OffsetOfMaxTimestamp is the last offset of the single batch + assertEquals(1, iteratorSize(validatedRecords.batches().iterator())); + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatingResults.messageSizeMaybeChanged, + "Message size should have been changed"); + + verifyRecordValidationStats(validatingResults.recordValidationStats, 3, records, true); + } + + private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, byte recordMagicValue, Compression codec) { + List records = new ArrayList<>(); + + for (int id = 0; id < 20; id++) { + records.add(LegacyRecord.create( + recordMagicValue, + RecordBatch.NO_TIMESTAMP, + Integer.toString(id).getBytes(), + Integer.toString(id).getBytes() + )); + } + + ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max( + records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 2, 1024), 1 << 16)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, batchMagicValue, codec, + TimestampType.CREATE_TIME, 0L); + + AtomicLong offset = new AtomicLong(1234567); + records.forEach(record -> { + builder.appendUncheckedWithOffset(offset.get(), record); + offset.incrementAndGet(); + }); + + return builder.build(); + } + + private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte magicValue, Compression compression, int numRecords) { + List records = IntStream.range(0, numRecords) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .collect(Collectors.toList()); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magicValue, compression, TimestampType.CREATE_TIME, 0L); + + records.forEach(record -> + assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, record)) + ); + + return builder.build(); + } + + @ParameterizedTest + @CsvSource({"0,none,none", "1,none,none", "0,none,gzip", "1,none,gzip"}) + public void checkAllowMultiBatch(Byte magic, String sourceCompression, String targetCompression) { + validateMessages(createTwoBatchedRecords(magic, Compression.of(sourceCompression).build()), magic, + CompressionType.forName(sourceCompression), Compression.of(targetCompression).build()); + } + + + private ValidationResult validateMessages(MemoryRecords records, + Byte magic, + CompressionType sourceCompressionType, + Compression targetCompressionType) { + MockTime mockTime = new MockTime(0L, 0L); + return new LogValidator(records, + topicPartition, + mockTime, + sourceCompressionType, + targetCompressionType, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PRODUCER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.IBP_2_3_IV1 + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier()); + } + + private MemoryRecords createTwoBatchedRecords(Byte magicValue, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + builder.append(10L, "1".getBytes(), "a".getBytes()); + builder.close(); + builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + + buf.flip(); + return MemoryRecords.readableRecords(buf.slice()); + } + + private MemoryRecords createRecords(byte magicValue, + long timestamp, + Compression codec) { + List records = Arrays.asList("hello".getBytes(), "there".getBytes(), "beautiful".getBytes()); + return createRecords(records, magicValue, timestamp, codec); + } + + @ParameterizedTest + @CsvSource({"1", "2"}) + public void checkCompressed(byte magic) { + long now = System.currentTimeMillis(); + // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + List recordList = Arrays.asList( + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.gzip().build(), + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + recordList.toArray(new SimpleRecord[0]) + ); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(batch.timestampType(), TimestampType.CREATE_TIME); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp should be " + (now + 1)); + + int expectedShallowOffsetOfMaxTimestamp = 2; + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp should be 2"); + assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 0, records, true); + } + + private MemoryRecords createRecords(List records, + byte magicValue, + long timestamp, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(512); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + + AtomicInteger offset = new AtomicInteger(0); + records.forEach(item -> + builder.appendWithOffset(offset.getAndIncrement(), timestamp, null, item)); + return builder.build(); + } + + @Test + void testRecordBatchWithCountOverrides() { + // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 + validateRecordBatchWithCountOverrides(2, 3); + } + + @ParameterizedTest + @CsvSource({"0,3", "15,3", "-3,3"}) + void testInconsistentCountAndOffset(int lastOffsetDelta, int count) { + // Count and offset range are inconsistent or invalid + assertInvalidBatchCountOverrides(lastOffsetDelta, count); + } + + @ParameterizedTest + @CsvSource({"5,6", "1,2"}) + void testUnmatchedNumberOfRecords(int lastOffsetDelta, int count) { + // Count and offset range are consistent, but do not match the actual number of records + assertInvalidBatchCountOverrides(lastOffsetDelta, count); + } + + @Test + void testInvalidCreateTimeNonCompressedV1() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now - 1001L, + Compression.NONE); + assertThrows(RecordValidationException.class, () -> new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + @Test + public void testInvalidCreateTimeCompressedV1() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V1, + now - 1001L, + compression + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testInvalidCreateTimeNonCompressedV2() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V2, + now - 1001L, + Compression.NONE + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @ParameterizedTest + @CsvSource({ + "0,gzip,gzip", "1,gzip,gzip", + "0,lz4,lz4", "1,lz4,lz4", + "0,snappy,snappy", "1,snappy,snappy", + }) + public void checkInvalidChecksum(byte magic, String compressionName, String typeName) { + Compression compression = Compression.of(compressionName).build(); + CompressionType type = CompressionType.forName(typeName); + + LegacyRecord record = LegacyRecord.create(magic, 0L, null, "hello".getBytes()); + ByteBuffer buf = record.buffer(); + + // enforce modify crc to make checksum error + buf.put(LegacyRecord.CRC_OFFSET, (byte) 0); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, + TimestampType.CREATE_TIME, 0L); + builder.appendUncheckedWithOffset(0, record); + + MemoryRecords memoryRecords = builder.build(); + LogValidator logValidator = new LogValidator(memoryRecords, + topicPartition, + time, + type, + compression, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + + assertThrows(CorruptRecordException.class, () -> logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + + assertTrue(metricsRecorder.recordInvalidChecksumsCount > 0); + } + + private static Stream testInvalidSequenceArguments() { + return Stream.of(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + .flatMap(magicValue -> Arrays.stream(CompressionType.values()).flatMap(source -> + Arrays.stream(CompressionType.values()).map(target -> + Arguments.of(magicValue, source.name, target.name)))); + } + + @ParameterizedTest + @MethodSource("testInvalidSequenceArguments") + public void checkInvalidSequence(byte magic, String compressionName, String typeName) { + long producerId = 1234; + short producerEpoch = 0; + int baseSequence = 0; + Compression compression = Compression.of(compressionName).build(); + CompressionType type = CompressionType.forName(typeName); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compression, + 0L, producerId, producerEpoch, baseSequence, false); + builder.append(new SimpleRecord("hello".getBytes())); + + MemoryRecords memoryRecords = builder.build(); + ByteBuffer buf = memoryRecords.buffer(); + + // overwrite baseSequence to make InvalidSequence + // BASE_SEQUENCE_OFFSET is defined in DefaultRecordBatch and it is private + // so we write this number directly. + buf.putInt(53, -2); + + LogValidator logValidator = new LogValidator(memoryRecords, + topicPartition, + time, + type, + compression, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + + assertThrows(InvalidRecordException.class, () -> logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + + assertTrue(metricsRecorder.recordInvalidSequenceCount > 0); + } + + @ParameterizedTest + @CsvSource({ + "0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip", + "0,lz4,lz4", "1,lz4,lz4", "2,lz4,lz4", + "0,snappy,snappy", "1,snappy,snappy", "2,snappy,snappy", + "2,zstd,zstd" + }) + public void checkNoKeyCompactedTopic(byte magic, String compressionName, String typeName) { + Compression codec = Compression.of(compressionName).build(); + CompressionType type = CompressionType.forName(typeName); + + MemoryRecords records = createRecords(magic, RecordBatch.NO_TIMESTAMP, codec); + Assertions.assertThrows(RecordValidationException.class, () -> new LogValidator( + records, + topicPartition, + time, + type, + codec, + true, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + + assertTrue(metricsRecorder.recordNoKeyCompactedTopicCount > 0); + } + + @Test + public void testInvalidCreateTimeCompressedV2() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V2, + now - 1001L, + compression + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testAbsoluteOffsetAssignmentNonCompressed() { + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, Compression.NONE); + long offset = 1234567; + + checkOffsets(records, 0); + + checkOffsets( + new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset + ); + } + + + @Test + public void testAbsoluteOffsetAssignmentCompressed() { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + long offset = 1234567; + + checkOffsets(records, 0); + + checkOffsets( + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, + offset + ); + } + + @Test + public void testRelativeOffsetAssignmentNonCompressedV1() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, Compression.NONE); + long offset = 1234567; + + checkOffsets(records, 0); + + MemoryRecords messageWithOffset = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords; + + checkOffsets(messageWithOffset, offset); + } + + @Test + public void testRelativeOffsetAssignmentNonCompressedV2() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, Compression.NONE); + long offset = 1234567; + + checkOffsets(records, 0); + + MemoryRecords messageWithOffset = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords; + + checkOffsets(messageWithOffset, offset); + } + + @Test + public void testRelativeOffsetAssignmentCompressedV1() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, compression); + long offset = 1234567; + + checkOffsets(records, 0); + + MemoryRecords compressedMessagesWithOffset = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords; + + checkOffsets(compressedMessagesWithOffset, offset); + } + + @Test + public void testRelativeOffsetAssignmentCompressedV2() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, compression); + long offset = 1234567; + + checkOffsets(records, 0); + + MemoryRecords compressedMessagesWithOffset = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords; + + checkOffsets(compressedMessagesWithOffset, offset); + } + + @Test + public void testOffsetAssignmentAfterUpConversionV0ToV1NonCompressed() { + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, Compression.NONE); + checkOffsets(records, 0); + + long offset = 1234567; + + LogValidator.ValidationResult validatedResults = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + checkOffsets(validatedResults.validatedRecords, offset); + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, // numConvertedRecords + records, + false // compressed + ); + } + + @Test + public void testOffsetAssignmentAfterUpConversionV0ToV2NonCompressed() { + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, Compression.NONE); + checkOffsets(records, 0); + + long offset = 1234567; + + LogValidator.ValidationResult validatedResults = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + checkOffsets(validatedResults.validatedRecords, offset); + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, // numConvertedRecords + records, + false // compressed + ); + } + + @Test + public void testOffsetAssignmentAfterUpConversionV0ToV1Compressed() { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + checkOffsets(records, 0); + + long offset = 1234567; + + LogValidator.ValidationResult validatedResults = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + checkOffsets(validatedResults.validatedRecords, offset); + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, // numConvertedRecords + records, + true // compressed + ); + } + + @Test + public void testOffsetAssignmentAfterUpConversionV0ToV2Compressed() { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + checkOffsets(records, 0); + + long offset = 1234567; + + LogValidator.ValidationResult validatedResults = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + checkOffsets(validatedResults.validatedRecords, offset); + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, // numConvertedRecords + records, + true // compressed + ); + } + + @Test + public void testControlRecordsNotAllowedFromClients() { + long offset = 1234567; + EndTransactionMarker endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0); + MemoryRecords records = MemoryRecords.withEndTransactionMarker(23423L, (short) 5, endTxnMarker); + assertThrows(InvalidRecordException.class, () -> new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + @Test + public void testControlRecordsNotCompressed() { + long offset = 1234567; + EndTransactionMarker endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0); + MemoryRecords records = MemoryRecords.withEndTransactionMarker(23423L, (short) 5, endTxnMarker); + LogValidator.ValidationResult result = new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.snappy().build(), + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.COORDINATOR, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + MemoryRecords validatedRecords = result.validatedRecords; + assertEquals(1, TestUtils.toList(validatedRecords.batches()).size()); + assertFalse(TestUtils.toList(validatedRecords.batches()).get(0).isCompressed()); + } + + @Test + public void testOffsetAssignmentAfterDownConversionV1ToV0NonCompressed() { + long offset = 1234567; + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, Compression.NONE); + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testOffsetAssignmentAfterDownConversionV1ToV0Compressed() { + long offset = 1234567; + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, compression); + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testOffsetAssignmentAfterUpConversionV1ToV2NonCompressed() { + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, RecordBatch.NO_TIMESTAMP, Compression.NONE); + checkOffsets(records, 0); + long offset = 1234567; + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testOffsetAssignmentAfterUpConversionV1ToV2Compressed() { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, RecordBatch.NO_TIMESTAMP, compression); + long offset = 1234567; + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testOffsetAssignmentAfterDownConversionV2ToV1NonCompressed() { + long offset = 1234567; + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, Compression.NONE); + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testOffsetAssignmentAfterDownConversionV2ToV1Compressed() { + long offset = 1234567; + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, compression); + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testDownConversionOfTransactionalRecordsNotPermitted() { + long offset = 1234567; + long producerId = 1344L; + short producerEpoch = 16; + int sequence = 0; + MemoryRecords records = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("hello".getBytes()), new SimpleRecord("there".getBytes()), new SimpleRecord("beautiful".getBytes())); + assertThrows(UnsupportedForMessageFormatException.class, () -> new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + + @Test + public void testDownConversionOfIdempotentRecordsNotPermitted() { + long offset = 1234567; + long producerId = 1344L; + short producerEpoch = 16; + int sequence = 0; + MemoryRecords records = MemoryRecords.withIdempotentRecords(Compression.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("hello".getBytes()), new SimpleRecord("there".getBytes()), new SimpleRecord("beautiful".getBytes())); + assertThrows(UnsupportedForMessageFormatException.class, () -> new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + @Test + public void testOffsetAssignmentAfterDownConversionV2ToV0NonCompressed() { + long offset = 1234567; + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, Compression.NONE); + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + + @Test + public void testOffsetAssignmentAfterDownConversionV2ToV0Compressed() { + long offset = 1234567; + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, compression); + checkOffsets(records, 0); + checkOffsets(new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ).validatedRecords, offset); + } + + @Test + public void testNonIncreasingOffsetRecordBatchHasMetricsLogged() { + ByteBuffer buf = ByteBuffer.allocate(512); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, RecordBatch.MAGIC_VALUE_V2, Compression.NONE, TimestampType.CREATE_TIME, 0L); + builder.appendWithOffset(0, RecordBatch.NO_TIMESTAMP, null, "hello".getBytes()); + builder.appendWithOffset(2, RecordBatch.NO_TIMESTAMP, null, "there".getBytes()); + builder.appendWithOffset(3, RecordBatch.NO_TIMESTAMP, null, "beautiful".getBytes()); + + MemoryRecords records = builder.build(); + records.batches().iterator().next().setLastOffset(2); + assertThrows(InvalidRecordException.class, () -> new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + RecordBatch.MAGIC_VALUE_V0, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + + assertEquals(metricsRecorder.recordInvalidOffsetCount, 1); + } + + @Test + public void testZStdCompressedWithUnavailableIBPVersion() { + // The timestamps should be overwritten + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, 1234L, Compression.NONE); + assertThrows(UnsupportedCompressionTypeException.class, () -> + new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.zstd().build(), + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.IBP_2_0_IV1 + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testInvalidTimestampExceptionHasBatchIndex() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, now - 1001L, compression); + + RecordValidationException e = assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + + assertInstanceOf(InvalidTimestampException.class, e.invalidException()); + assertFalse(e.recordErrors().isEmpty()); + assertEquals(3, e.recordErrors().size()); + } + + @Test + public void testInvalidRecordExceptionHasBatchIndex() { + RecordValidationException e = assertThrows(RecordValidationException.class, () -> { + Compression compression = Compression.gzip().build(); + validateMessages( + recordsWithInvalidInnerMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compression), + RecordBatch.MAGIC_VALUE_V0, + CompressionType.GZIP, + compression + ); + }); + + assertInstanceOf(InvalidRecordException.class, e.invalidException()); + assertFalse(e.recordErrors().isEmpty()); + // recordsWithInvalidInnerMagic creates 20 records + assertEquals(20, e.recordErrors().size()); + for (Object error : e.recordErrors()) { + assertNotNull(error); + } + } + + @Test + public void testBatchWithInvalidRecordsAndInvalidTimestamp() { + Compression compression = Compression.gzip().build(); + List records = new ArrayList<>(); + for (int id = 0; id < 5; id++) { + records.add(LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, null, String.valueOf(id).getBytes())); + } + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compression, + TimestampType.CREATE_TIME, 0L); + int offset = 0; + + // We want to mix in a record with an invalid timestamp range + builder.appendUncheckedWithOffset(offset, LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, + 1200L, null, "timestamp".getBytes())); + for (LegacyRecord record : records) { + offset += 30; + builder.appendUncheckedWithOffset(offset, record); + } + MemoryRecords invalidOffsetTimestampRecords = builder.build(); + + RecordValidationException e = assertThrows(RecordValidationException.class, () -> + validateMessages(invalidOffsetTimestampRecords, + RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, compression) + ); + // If there is a mix of both regular InvalidRecordException and InvalidTimestampException, + // InvalidTimestampException takes precedence + assertInstanceOf(InvalidTimestampException.class, e.invalidException()); + assertFalse(e.recordErrors().isEmpty()); + assertEquals(6, e.recordErrors().size()); + } + + @Test + public void testRecordWithPastTimestampIsRejected() { + long timestampBeforeMaxConfig = Duration.ofHours(24).toMillis(); // 24 hrs + long timestampAfterMaxConfig = Duration.ofHours(1).toMillis(); // 1 hr + long now = System.currentTimeMillis(); + long fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - Duration.ofMinutes(5).toMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, fiveMinutesBeforeThreshold, compression); + RecordValidationException e = assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + timestampBeforeMaxConfig, + timestampAfterMaxConfig, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + + assertInstanceOf(InvalidTimestampException.class, e.invalidException()); + assertFalse(e.recordErrors().isEmpty()); + assertEquals(3, e.recordErrors().size()); + } + + @Test + public void testRecordWithFutureTimestampIsRejected() { + long timestampBeforeMaxConfig = Duration.ofHours(24).toMillis(); // 24 hrs + long timestampAfterMaxConfig = Duration.ofHours(1).toMillis(); // 1 hr + long now = System.currentTimeMillis(); + long fiveMinutesAfterThreshold = now + timestampAfterMaxConfig + Duration.ofMinutes(5).toMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, fiveMinutesAfterThreshold, compression); + RecordValidationException e = assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + timestampBeforeMaxConfig, + timestampAfterMaxConfig, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + + assertInstanceOf(InvalidTimestampException.class, e.invalidException()); + assertFalse(e.recordErrors().isEmpty()); + assertEquals(3, e.recordErrors().size()); + } + + + @Test + public void testDifferentLevelDoesNotCauseRecompression() { + List records = Arrays.asList( + String.join("", Collections.nCopies(256, "some")).getBytes(), + String.join("", Collections.nCopies(256, "data")).getBytes() + ); + + // Records from the producer were created with gzip max level + Compression gzipMax = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build(); + MemoryRecords recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax); + + // The topic is configured with gzip min level + Compression gzipMin = Compression.gzip().level(CompressionType.GZIP.minLevel()).build(); + MemoryRecords recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMin); + + // Ensure data compressed with gzip max and min is different + assertNotEquals(recordsGzipMax, recordsGzipMin); + + LogValidator validator = new LogValidator(recordsGzipMax, + topicPartition, + time, + gzipMax.type(), + gzipMin, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult result = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + // Ensure validated records have not been changed so they are the same as the producer records + assertEquals(recordsGzipMax, result.validatedRecords); + assertNotEquals(recordsGzipMin, result.validatedRecords); + } + + @Test + public void testDifferentCodecCausesRecompression() { + List records = Arrays.asList( + "somedata".getBytes(), + "moredata".getBytes() + ); + + // Records from the producer were created with gzip max level + Compression gzipMax = Compression.gzip().level(CompressionType.GZIP.maxLevel()).build(); + MemoryRecords recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, gzipMax); + + // The topic is configured with lz4 min level + Compression lz4Min = Compression.lz4().level(CompressionType.GZIP.minLevel()).build(); + MemoryRecords recordsLz4Min = createRecords(records, RecordBatch.MAGIC_VALUE_V2, RecordBatch.NO_TIMESTAMP, lz4Min); + + LogValidator validator = new LogValidator(recordsGzipMax, + topicPartition, + time, + gzipMax.type(), + lz4Min, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult result = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + // Ensure validated records have been recompressed and match lz4 min level + assertEquals(recordsLz4Min, result.validatedRecords); + } + + @ParameterizedTest + @CsvSource({"1", "2"}) + public void checkNonCompressed(byte magic) { + long now = System.currentTimeMillis(); + // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + long[] timestampSeq = new long[]{now - 1, now + 1, now}; + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + List recordList = new ArrayList<>(); + recordList.add(new SimpleRecord(timestampSeq[0], "hello".getBytes())); + recordList.add(new SimpleRecord(timestampSeq[1], "there".getBytes())); + recordList.add(new SimpleRecord(timestampSeq[2], "beautiful".getBytes())); + + MemoryRecords records = MemoryRecords.withRecords(magic, 0L, Compression.NONE, TimestampType.CREATE_TIME, producerId, + producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList.toArray(new SimpleRecord[0])); + + PrimitiveRef.LongRef offsetCounter = PrimitiveRef.ofLong(0L); + LogValidator.ValidationResult validatingResults = new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.NONE, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + offsetCounter, + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatingResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + maybeCheckBaseTimestamp(timestampSeq[0], batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(isTransactional, batch.isTransactional()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + for (Record record : batch) { + record.ensureValid(); + assertEquals(timestampSeq[i], record.timestamp()); + i += 1; + } + } + + assertEquals(i, offsetCounter.value); + assertEquals(now + 1, validatingResults.maxTimestampMs, + "Max timestamp should be " + (now + 1)); + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + assertEquals(1, iteratorSize(records.batches().iterator())); + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp); + } else { + assertEquals(3, iteratorSize(records.batches().iterator())); + assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp); + } + + assertFalse(validatingResults.messageSizeMaybeChanged, + "Message size should not have been changed"); + verifyRecordValidationStats(validatingResults.recordValidationStats, 0, records, false); + } + + private void assertInvalidBatchCountOverrides(int lastOffsetDelta, int count) { + assertThrows(InvalidRecordException.class, + () -> validateRecordBatchWithCountOverrides(lastOffsetDelta, count)); + } + + private void validateRecordBatchWithCountOverrides(int lastOffsetDelta, int count) { + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V2, 1234L, Compression.NONE); + ByteBuffer buffer = records.buffer(); + buffer.putInt(DefaultRecordBatch.RECORDS_COUNT_OFFSET, count); + buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); + + new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + } + + @ParameterizedTest + @CsvSource({"1", "2"}) + public void checkLogAppendTimeWithoutRecompression(byte magic) { + Compression compression = Compression.gzip().build(); + MockTime mockTime = new MockTime(); + MemoryRecords records = createRecords(magic, 1234L, compression); + LogValidator.ValidationResult validatedResults = new LogValidator( + records, + topicPartition, + mockTime, + CompressionType.GZIP, + compression, + false, + magic, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + assertEquals(records.sizeInBytes(), validatedRecords.sizeInBytes(), + "message set size should not change"); + long now = mockTime.milliseconds(); + for (RecordBatch batch : validatedRecords.batches()) + validateLogAppendTime(now, 1234L, batch); + assertTrue(validatedRecords.batches().iterator().next().isValid(), + "MessageSet should still valid"); + assertEquals(now, validatedResults.maxTimestampMs, + "Max timestamp should be " + now); + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, + "The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used"); + assertFalse(validatedResults.messageSizeMaybeChanged, + "Message size should not have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 0, records, true); + } + + @ParameterizedTest + @CsvSource({"1", "2"}) + public void checkLogAppendTimeWithRecompression(byte targetMagic) { + Compression compression = Compression.gzip().build(); + MockTime mockTime = new MockTime(); + // The timestamps should be overwritten + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + LogValidator.ValidationResult validatedResults = new LogValidator( + records, + topicPartition, + mockTime, + CompressionType.GZIP, + compression, + false, + targetMagic, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + assertEquals(iteratorSize(records.records().iterator()), iteratorSize(validatedRecords.records().iterator()), + "message set size should not change"); + long now = mockTime.milliseconds(); + validatedRecords.batches().forEach(batch -> validateLogAppendTime(now, -1, batch)); + assertTrue(validatedRecords.batches().iterator().next().isValid(), + "MessageSet should still valid"); + assertEquals(now, validatedResults.maxTimestampMs, String.format("Max timestamp should be %d", now)); + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, + "The shallow offset of max timestamp should be 2 if logAppendTime is used"); + assertTrue(validatedResults.messageSizeMaybeChanged, + "Message size may have been changed"); + + RecordValidationStats stats = validatedResults.recordValidationStats; + verifyRecordValidationStats(stats, 3, records, true); + } + + @ParameterizedTest + @CsvSource({"0", "1", "2"}) + public void checkLogAppendTimeNonCompressed(byte magic) { + MockTime mockTime = new MockTime(); + // The timestamps should be overwritten + MemoryRecords records = createRecords(magic, 1234L, Compression.NONE); + PrimitiveRef.LongRef offsetCounter = PrimitiveRef.ofLong(0); + LogValidator.ValidationResult validatedResults = new LogValidator(records, + topicPartition, + mockTime, + CompressionType.NONE, + Compression.NONE, + false, + magic, + TimestampType.LOG_APPEND_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + offsetCounter, + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + assertEquals(offsetCounter.value, iteratorSize(records.records().iterator())); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + assertEquals(iteratorSize(records.records().iterator()), iteratorSize(validatedRecords.records().iterator()), "message set size should not change"); + + long now = mockTime.milliseconds(); + + if (magic >= RecordBatch.MAGIC_VALUE_V1) { + validatedRecords.batches().forEach(batch -> validateLogAppendTime(now, 1234L, batch)); + } + + if (magic == RecordBatch.MAGIC_VALUE_V0) { + assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs); + } else { + assertEquals(now, validatedResults.maxTimestampMs); + } + + assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed"); + + int expectedMaxTimestampOffset; + switch (magic) { + case RecordBatch.MAGIC_VALUE_V0: + expectedMaxTimestampOffset = -1; + break; + case RecordBatch.MAGIC_VALUE_V1: + expectedMaxTimestampOffset = 0; + break; + default: + expectedMaxTimestampOffset = 2; + break; + } + assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp); + verifyRecordValidationStats(validatedResults.recordValidationStats, 0, records, false); + } + + /** + * expectedLogAppendTime is only checked if batch.magic is V2 or higher + */ + void validateLogAppendTime(long expectedLogAppendTime, long expectedBaseTimestamp, RecordBatch batch) { + assertTrue(batch.isValid()); + assertEquals(batch.timestampType(), TimestampType.LOG_APPEND_TIME); + assertEquals(expectedLogAppendTime, batch.maxTimestamp(), "Unexpected max timestamp of batch $batch"); + maybeCheckBaseTimestamp(expectedBaseTimestamp, batch); + batch.forEach(record -> { + record.ensureValid(); + assertEquals(expectedLogAppendTime, record.timestamp(), String.format("Unexpected timestamp of record %s", record)); + }); + + } + + // Check that offsets are assigned consecutively from the given base offset + private void checkOffsets(MemoryRecords records, long baseOffset) { + Assertions.assertTrue(iteratorSize(records.records().iterator()) != 0, "Message set should not be empty"); + + long offset = baseOffset; + + for (Record record : records.records()) { + Assertions.assertEquals(offset, record.offset(), "Unexpected offset in message set iterator"); + offset += 1; + } + } + + private void maybeCheckBaseTimestamp(long expected, RecordBatch batch) { + if (batch instanceof DefaultRecordBatch) { + DefaultRecordBatch b = (DefaultRecordBatch) batch; + assertEquals(expected, b.baseTimestamp(), "Unexpected base timestamp of batch " + batch); + } + } + + private static int iteratorSize(Iterator iterator) { + int counter = 0; + while (iterator.hasNext()) { + iterator.next(); + counter += 1; + } + return counter; + } + + public void verifyRecordValidationStats(RecordValidationStats stats, int numConvertedRecords, MemoryRecords records, + boolean compressed) { + assertNotNull(stats, "Records processing info is null"); + assertEquals(numConvertedRecords, stats.numRecordsConverted()); + if (numConvertedRecords > 0) { + assertTrue(stats.conversionTimeNanos() >= 0, "Conversion time not recorded " + stats); + assertTrue(stats.conversionTimeNanos() <= TimeUnit.MINUTES.toNanos(1), "Conversion time not valid " + stats); + } + int originalSize = records.sizeInBytes(); + long tempBytes = stats.temporaryMemoryBytes(); + if (numConvertedRecords > 0 && compressed) + assertTrue(tempBytes > originalSize, "Temp bytes too small, orig=" + originalSize + " actual=" + tempBytes); + else if (numConvertedRecords > 0 || compressed) + assertTrue(tempBytes > 0, "Temp bytes not updated"); + else + assertEquals(0, tempBytes); + } +} \ No newline at end of file