diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 70bf3bf00da15..c4dda087af1e4 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -236,7 +236,6 @@ private[kafka] object LogValidator extends Logging { magic: Byte, brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP - val expectedInnerOffset = new LongRef(0) var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value @@ -251,16 +250,6 @@ private[kafka] object LogValidator extends Logging { for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats) - val expectedOffset = expectedInnerOffset.getAndIncrement() - - // inner records offset should always be continuous - if (record.offset != expectedOffset) { - brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark() - throw new RecordValidationException( - new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."), - List(new RecordError(batchIndex))) - } - val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp