From def07134ab633e32f08d6cda0f751053f407b125 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Thu, 31 Oct 2019 14:32:14 -0700 Subject: [PATCH 1/3] add check for version --- core/src/main/scala/kafka/log/Log.scala | 1 + .../main/scala/kafka/log/LogValidator.scala | 22 +++++----- .../unit/kafka/log/LogValidatorTest.scala | 40 +++++++++++++++++++ 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ccf1d16763242..c2b26a61716cc 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1049,6 +1049,7 @@ class Log(@volatile var dir: File, val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator.validateMessagesAndAssignOffsets(validRecords, + recordVersion, topicPartition, offset, time, diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 70bf3bf00da15..34a5348394ffc 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -24,7 +24,7 @@ import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec import kafka.server.BrokerTopicStats import kafka.utils.Logging import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} +import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, RecordVersion, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.ProduceResponse.RecordError @@ -51,6 +51,7 @@ private[kafka] object LogValidator extends Logging { * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. */ private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords, + recordVersion: RecordVersion, topicPartition: TopicPartition, offsetCounter: LongRef, time: Time, @@ -72,7 +73,7 @@ private[kafka] object LogValidator extends Logging { timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient, brokerTopicStats) else // Do in-place validation, offset assignment and maybe set timestamp - assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, + assignOffsetsNonCompressed(records, recordVersion, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, magic, brokerTopicStats) } else { validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic, @@ -225,6 +226,7 @@ private[kafka] object LogValidator extends Logging { } private def assignOffsetsNonCompressed(records: MemoryRecords, + recordVersion: RecordVersion, topicPartition: TopicPartition, offsetCounter: LongRef, now: Long, @@ -251,14 +253,16 @@ private[kafka] object LogValidator extends Logging { for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - val expectedOffset = expectedInnerOffset.getAndIncrement() + if (!recordVersion.precedes(RecordVersion.V2)) { + val expectedOffset = expectedInnerOffset.getAndIncrement() - // inner records offset should always be continuous - if (record.offset != expectedOffset) { - brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new RecordError(batchIndex))) + // inner records offset should always be continuous + if (record.offset != expectedOffset) { + brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() + throw new RecordValidationException( + new InvalidRecordException(s"Inner record $record inside the non-compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), + List(new RecordError(batchIndex))) + } } val offset = offsetCounter.getAndIncrement() diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 923ae9185211f..4406bf23abbc2 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -90,6 +90,7 @@ class LogValidatorTest { private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = { LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, new LongRef(0L), time, @@ -117,6 +118,7 @@ class LogValidatorTest { // The timestamps should be overwritten val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time= time, @@ -157,6 +159,7 @@ class LogValidatorTest { val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -201,6 +204,7 @@ class LogValidatorTest { val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -261,6 +265,7 @@ class LogValidatorTest { records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta) LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -305,6 +310,7 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -374,6 +380,7 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -427,6 +434,7 @@ class LogValidatorTest { private def checkCreateTimeUpConversionFromV0(toMagic: Byte): Unit = { val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -471,6 +479,7 @@ class LogValidatorTest { val timestamp = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -528,6 +537,7 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -581,6 +591,7 @@ class LogValidatorTest { codec = CompressionType.NONE) LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -604,6 +615,7 @@ class LogValidatorTest { codec = CompressionType.NONE) LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -627,6 +639,7 @@ class LogValidatorTest { codec = CompressionType.GZIP) LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -650,6 +663,7 @@ class LogValidatorTest { codec = CompressionType.GZIP) LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -672,6 +686,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -694,6 +709,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -717,6 +733,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -741,6 +758,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -766,6 +784,7 @@ class LogValidatorTest { checkOffsets(records, 0) val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -791,6 +810,7 @@ class LogValidatorTest { checkOffsets(records, 0) val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -814,6 +834,7 @@ class LogValidatorTest { checkOffsets(records, 0) val offset = 1234567 val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -839,6 +860,7 @@ class LogValidatorTest { checkOffsets(records, 0) val offset = 1234567 val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -864,6 +886,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -889,6 +912,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -914,6 +938,7 @@ class LogValidatorTest { val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -936,6 +961,7 @@ class LogValidatorTest { val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) val result = LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -963,6 +989,7 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, codec = CompressionType.NONE) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -986,6 +1013,7 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, CompressionType.GZIP) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1008,6 +1036,7 @@ class LogValidatorTest { checkOffsets(records, 0) val offset = 1234567 checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1030,6 +1059,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1053,6 +1083,7 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1076,6 +1107,7 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1101,6 +1133,7 @@ class LogValidatorTest { val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1126,6 +1159,7 @@ class LogValidatorTest { val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1149,6 +1183,7 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1172,6 +1207,7 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1194,6 +1230,7 @@ class LogValidatorTest { records.batches().asScala.head.setLastOffset(2) assertThrows[InvalidRecordException] { LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0L), time = time, @@ -1224,6 +1261,7 @@ class LogValidatorTest { // The timestamps should be overwritten val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = CompressionType.NONE) LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time= time, @@ -1258,6 +1296,7 @@ class LogValidatorTest { val e = intercept[RecordValidationException] { LogValidator.validateMessagesAndAssignOffsets( records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -1307,6 +1346,7 @@ class LogValidatorTest { buffer.flip() val records = MemoryRecords.readableRecords(buffer) LogValidator.validateMessagesAndAssignOffsets(records, + RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, From b14a549f8288fbb7f1591f0b82692fa36cf18874 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Fri, 1 Nov 2019 13:11:52 -0700 Subject: [PATCH 2/3] Revert "add check for version" This reverts commit def07134ab633e32f08d6cda0f751053f407b125. --- core/src/main/scala/kafka/log/Log.scala | 1 - .../main/scala/kafka/log/LogValidator.scala | 22 +++++----- .../unit/kafka/log/LogValidatorTest.scala | 40 ------------------- 3 files changed, 9 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c2b26a61716cc..ccf1d16763242 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1049,7 +1049,6 @@ class Log(@volatile var dir: File, val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator.validateMessagesAndAssignOffsets(validRecords, - recordVersion, topicPartition, offset, time, diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 34a5348394ffc..70bf3bf00da15 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -24,7 +24,7 @@ import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec import kafka.server.BrokerTopicStats import kafka.utils.Logging import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, RecordVersion, TimestampType} +import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.ProduceResponse.RecordError @@ -51,7 +51,6 @@ private[kafka] object LogValidator extends Logging { * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. */ private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords, - recordVersion: RecordVersion, topicPartition: TopicPartition, offsetCounter: LongRef, time: Time, @@ -73,7 +72,7 @@ private[kafka] object LogValidator extends Logging { timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient, brokerTopicStats) else // Do in-place validation, offset assignment and maybe set timestamp - assignOffsetsNonCompressed(records, recordVersion, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, + assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, magic, brokerTopicStats) } else { validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic, @@ -226,7 +225,6 @@ private[kafka] object LogValidator extends Logging { } private def assignOffsetsNonCompressed(records: MemoryRecords, - recordVersion: RecordVersion, topicPartition: TopicPartition, offsetCounter: LongRef, now: Long, @@ -253,16 +251,14 @@ private[kafka] object LogValidator extends Logging { for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - if (!recordVersion.precedes(RecordVersion.V2)) { - val expectedOffset = expectedInnerOffset.getAndIncrement() + val expectedOffset = expectedInnerOffset.getAndIncrement() - // inner records offset should always be continuous - if (record.offset != expectedOffset) { - brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Inner record $record inside the non-compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new RecordError(batchIndex))) - } + // inner records offset should always be continuous + if (record.offset != expectedOffset) { + brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() + throw new RecordValidationException( + new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), + List(new RecordError(batchIndex))) } val offset = offsetCounter.getAndIncrement() diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 4406bf23abbc2..923ae9185211f 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -90,7 +90,6 @@ class LogValidatorTest { private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = { LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, new LongRef(0L), time, @@ -118,7 +117,6 @@ class LogValidatorTest { // The timestamps should be overwritten val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time= time, @@ -159,7 +157,6 @@ class LogValidatorTest { val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -204,7 +201,6 @@ class LogValidatorTest { val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -265,7 +261,6 @@ class LogValidatorTest { records.buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta) LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -310,7 +305,6 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -380,7 +374,6 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -434,7 +427,6 @@ class LogValidatorTest { private def checkCreateTimeUpConversionFromV0(toMagic: Byte): Unit = { val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -479,7 +471,6 @@ class LogValidatorTest { val timestamp = System.currentTimeMillis() val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -537,7 +528,6 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -591,7 +581,6 @@ class LogValidatorTest { codec = CompressionType.NONE) LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -615,7 +604,6 @@ class LogValidatorTest { codec = CompressionType.NONE) LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -639,7 +627,6 @@ class LogValidatorTest { codec = CompressionType.GZIP) LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -663,7 +650,6 @@ class LogValidatorTest { codec = CompressionType.GZIP) LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -686,7 +672,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -709,7 +694,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -733,7 +717,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -758,7 +741,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -784,7 +766,6 @@ class LogValidatorTest { checkOffsets(records, 0) val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -810,7 +791,6 @@ class LogValidatorTest { checkOffsets(records, 0) val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -834,7 +814,6 @@ class LogValidatorTest { checkOffsets(records, 0) val offset = 1234567 val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -860,7 +839,6 @@ class LogValidatorTest { checkOffsets(records, 0) val offset = 1234567 val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -886,7 +864,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -912,7 +889,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -938,7 +914,6 @@ class LogValidatorTest { val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -961,7 +936,6 @@ class LogValidatorTest { val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) val result = LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -989,7 +963,6 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, codec = CompressionType.NONE) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1013,7 +986,6 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, CompressionType.GZIP) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1036,7 +1008,6 @@ class LogValidatorTest { checkOffsets(records, 0) val offset = 1234567 checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1059,7 +1030,6 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1083,7 +1053,6 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1107,7 +1076,6 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1133,7 +1101,6 @@ class LogValidatorTest { val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1159,7 +1126,6 @@ class LogValidatorTest { val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1183,7 +1149,6 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1207,7 +1172,6 @@ class LogValidatorTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP) checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, @@ -1230,7 +1194,6 @@ class LogValidatorTest { records.batches().asScala.head.setLastOffset(2) assertThrows[InvalidRecordException] { LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0L), time = time, @@ -1261,7 +1224,6 @@ class LogValidatorTest { // The timestamps should be overwritten val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = 1234L, codec = CompressionType.NONE) LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time= time, @@ -1296,7 +1258,6 @@ class LogValidatorTest { val e = intercept[RecordValidationException] { LogValidator.validateMessagesAndAssignOffsets( records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(0), time = time, @@ -1346,7 +1307,6 @@ class LogValidatorTest { buffer.flip() val records = MemoryRecords.readableRecords(buffer) LogValidator.validateMessagesAndAssignOffsets(records, - RecordVersion.V2, topicPartition, offsetCounter = new LongRef(offset), time = time, From b9bfc2ed730fd59c9a176d058a27585fa8473d04 Mon Sep 17 00:00:00 2001 From: Tu Tran Date: Fri, 1 Nov 2019 13:38:29 -0700 Subject: [PATCH 3/3] remove check in assignOffsetNonCompressed --- core/src/main/scala/kafka/log/LogValidator.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 70bf3bf00da15..c4dda087af1e4 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging { magic: Byte, brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP - val expectedInnerOffset = new LongRef(0) var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value @@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging { for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - val expectedOffset = expectedInnerOffset.getAndIncrement() - - // inner records offset should always be continuous - if (record.offset != expectedOffset) { - brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new RecordError(batchIndex))) - } - val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp