diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 23a88c277c969..84eb6f20ac5f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -355,18 +355,18 @@ public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingP */ public TimestampAndOffset largestTimestampAfter(int startingPosition) { long maxTimestamp = RecordBatch.NO_TIMESTAMP; - long offsetOfMaxTimestamp = -1L; + long shallowOffsetOfMaxTimestamp = -1L; int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH; for (RecordBatch batch : batchesFrom(startingPosition)) { long timestamp = batch.maxTimestamp(); if (timestamp > maxTimestamp) { maxTimestamp = timestamp; - offsetOfMaxTimestamp = batch.lastOffset(); + shallowOffsetOfMaxTimestamp = batch.lastOffset(); leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch(); } } - return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp, + return new TimestampAndOffset(maxTimestamp, shallowOffsetOfMaxTimestamp, maybeLeaderEpoch(leaderEpochOfMaxTimestamp)); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 3ba60b09b30df..c5b43d4f88565 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable this.maxTimestamp) { this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } this.maxOffset = Math.max(maxOffset, this.maxOffset); this.messagesRetained += messagesRetained; this.bytesRetained += bytesRetained; } - private void validateBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset) { - if (maxTimestamp != RecordBatch.NO_TIMESTAMP && offsetOfMaxTimestamp < 0) - throw new IllegalArgumentException("offset undefined for maximum timestamp " + maxTimestamp); + private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) { + if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0) + throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp); if (maxOffset < 0) throw new IllegalArgumentException("maxOffset undefined"); } @@ -458,8 +458,8 @@ public long maxTimestamp() { return maxTimestamp; } - public long offsetOfMaxTimestamp() { - return offsetOfMaxTimestamp; + public long shallowOffsetOfMaxTimestamp() { + return shallowOffsetOfMaxTimestamp; } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 98f3bb9047655..b8c3fd53d212e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -240,25 +240,54 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. - * - * @return The max timestamp and its offset + * There are three cases of finding max timestamp to return: + * 1) version 0: The max timestamp is NO_TIMESTAMP (-1) + * 2) LogAppendTime: All records have same timestamp, and so the max timestamp is equal to logAppendTime + * 3) CreateTime: The max timestamp of record + *

+ * Let's talk about OffsetOfMaxTimestamp. There are some paths that we don't try to find the OffsetOfMaxTimestamp + * to avoid expensive records iteration. Those paths include follower append and index recovery. In order to + * avoid inconsistent time index, we let all paths find shallowOffsetOfMaxTimestamp instead of OffsetOfMaxTimestamp. + *

+ * Let's define the shallowOffsetOfMaxTimestamp: It is last offset of the batch having max timestamp. If there are + * many batches having same max timestamp, we pick up the earliest batch. + *

+ * There are five cases of finding shallowOffsetOfMaxTimestamp to return: + * 1) version 0: It is always the -1 + * 2) LogAppendTime with single batch: It is the offset of last record + * 3) LogAppendTime with many single-record batches: Those single-record batches have same max timestamp, so we return + * the base offset, which is equal to the last offset of earliest batch + * 4) CreateTime with single batch: We return offset of last record to follow the spec we mentioned above. Of course, + * we do have the OffsetOfMaxTimestamp for this case, but we want to make all paths + * find the shallowOffsetOfMaxTimestamp rather than offsetOfMaxTimestamp + * 5) CreateTime with many single-record batches: Each batch is composed of single record, and hence offsetOfMaxTimestamp + * is equal to the last offset of earliest batch with max timestamp */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { - return new RecordsInfo(logAppendTime, baseOffset); + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + // maxTimestamp => case 2 + // shallowOffsetOfMaxTimestamp => case 2 + return new RecordsInfo(logAppendTime, lastOffset); + else + // maxTimestamp => case 2 + // shallowOffsetOfMaxTimestamp => case 3 + return new RecordsInfo(logAppendTime, baseOffset); + } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { + // maxTimestamp => case 1 + // shallowOffsetOfMaxTimestamp => case 1 + return new RecordsInfo(RecordBatch.NO_TIMESTAMP, -1); } else { - // For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping - // If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] - return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + // maxTimestamp => case 3 + // shallowOffsetOfMaxTimestamp => case 4 + return new RecordsInfo(maxTimestamp, lastOffset); + else + // maxTimestamp => case 3 + // shallowOffsetOfMaxTimestamp => case 5 + return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); } } @@ -849,12 +878,12 @@ private long nextSequentialOffset() { public static class RecordsInfo { public final long maxTimestamp; - public final long offsetOfMaxTimestamp; + public final long shallowOffsetOfMaxTimestamp; public RecordsInfo(long maxTimestamp, - long offsetOfMaxTimestamp) { + long shallowOffsetOfMaxTimestamp) { this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 7d231c1774367..e36beff08f2ac 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.Optional; import java.util.OptionalLong; /** @@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable { * @return Whether this is a batch containing control records */ boolean isControlBatch(); + + /** + * iterate all records to find the offset of max timestamp. + * noted: + * 1) that the earliest offset will return if there are multi records having same (max) timestamp + * 2) it always returns None if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} + * @return offset of max timestamp + */ + default Optional offsetOfMaxTimestamp() { + if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty(); + long maxTimestamp = maxTimestamp(); + try (CloseableIterator iter = streamingIterator(BufferSupplier.create())) { + while (iter.hasNext()) { + Record record = iter.next(); + if (maxTimestamp == record.timestamp()) return Optional.of(record.offset()); + } + } + return Optional.empty(); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index eaaa95ff673cf..e279e952e7eaa 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -377,8 +377,11 @@ public void buildUsingLogAppendTime(Args args) { MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(logAppendTime, info.maxTimestamp); - // When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp - assertEquals(0L, info.offsetOfMaxTimestamp); + + if (args.compressionType == CompressionType.NONE && magic <= MAGIC_VALUE_V1) + assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + else + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (RecordBatch batch : records.batches()) { if (magic == MAGIC_VALUE_V0) { @@ -413,10 +416,11 @@ public void buildUsingCreateTime(Args args) { } if (magic == MAGIC_VALUE_V0) - // in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1. - assertEquals(-1L, info.offsetOfMaxTimestamp); + assertEquals(-1, info.shallowOffsetOfMaxTimestamp); + else if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1) + assertEquals(1L, info.shallowOffsetOfMaxTimestamp); else - assertEquals(1L, info.offsetOfMaxTimestamp); + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); int i = 0; long[] expectedTimestamps = new long[] {0L, 2L, 1L}; @@ -494,11 +498,11 @@ public void writePastLimit(Args args) { MemoryRecordsBuilder.RecordsInfo info = builder.info(); if (magic == MAGIC_VALUE_V0) { + assertEquals(-1, info.shallowOffsetOfMaxTimestamp); assertEquals(-1, info.maxTimestamp); - assertEquals(-1L, info.offsetOfMaxTimestamp); } else { + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); assertEquals(2L, info.maxTimestamp); - assertEquals(2L, info.offsetOfMaxTimestamp); } long i = 0L; diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 9e688fc3ab695..3f0195bf5d149 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -352,7 +352,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { assertEquals(0, filterResult.messagesRetained()); assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); assertEquals(12, filterResult.maxTimestamp()); - assertEquals(baseOffset + 1, filterResult.offsetOfMaxTimestamp()); + assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); // Verify filtered records filtered.flip(); @@ -413,7 +413,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { assertEquals(0, filterResult.messagesRetained()); assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); assertEquals(timestamp, filterResult.maxTimestamp()); - assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp()); + assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); assertTrue(filterResult.outputBuffer().position() > 0); // Verify filtered records @@ -893,7 +893,10 @@ public void testFilterTo(Args args) { assertEquals(filtered.limit(), result.bytesRetained()); if (magic > RecordBatch.MAGIC_VALUE_V0) { assertEquals(20L, result.maxTimestamp()); - assertEquals(4L, result.offsetOfMaxTimestamp()); + if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2) + assertEquals(4L, result.shallowOffsetOfMaxTimestamp()); + else + assertEquals(5L, result.shallowOffsetOfMaxTimestamp()); } MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index a91bfae739e05..b2121f5312d7b 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -406,8 +406,8 @@ class LocalLog(@volatile private var _dir: File, } } - private[log] def append(lastOffset: Long, largestTimestamp: Long, offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { - segments.activeSegment.append(lastOffset, largestTimestamp, offsetOfMaxTimestamp, records) + private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { + segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records) updateLogEndOffset(lastOffset + 1) } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 35dadc8672f08..45aee545f82ce 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -812,7 +812,7 @@ private[log] class Cleaner(val id: Int, val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called - dest.append(result.maxOffset, result.maxTimestamp, result.offsetOfMaxTimestamp, retained) + dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained) throttler.maybeThrottle(outputBuffer.limit()) } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 0fdc236e720c2..9c71ff155cff8 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs) + appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp) appendInfo.setLastOffset(offset.value - 1) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) @@ -905,7 +905,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // will be cleaned up after the log directory is recovered. Note that the end offset of the // ProducerStateManager will not be updated and the last stable offset will not advance // if the append to the transaction index fails. - localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords) + localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords) updateHighWatermarkWithLogEndOffset() // update the producer state @@ -1120,7 +1120,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, var sourceCompression = CompressionType.NONE var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP - var offsetOfMaxTimestamp = -1L + var shallowOffsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L @@ -1171,7 +1171,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp - offsetOfMaxTimestamp = lastOffset + shallowOffsetOfMaxTimestamp = lastOffset } validBytesCount += batchSize @@ -1190,7 +1190,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, else OptionalInt.empty() - new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp, + new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) } @@ -1270,15 +1270,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { debug(s"Searching offset for timestamp $targetTimestamp") - def latestEpochAsOptional(leaderEpochCache: Option[LeaderEpochFileCache]): Optional[Integer] = { - leaderEpochCache match { - case Some(cache) => - val latestEpoch = cache.latestEpoch() - if (latestEpoch.isPresent) Optional.of(latestEpoch.getAsInt) else Optional.empty[Integer]() - case None => Optional.empty[Integer]() - } - } - if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) @@ -1311,7 +1302,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) + val epoch = leaderEpochCache match { + case Some(cache) => + val latestEpoch = cache.latestEpoch() + if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]() + case None => Optional.empty[Integer]() + } + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { if (remoteLogEnabled()) { val curHighestRemoteOffset = highestOffsetInRemoteStorage() @@ -1337,13 +1334,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.asScala.toBuffer - val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) - val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - - Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) + val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) + // cache the timestamp and offset + val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar + // lookup the position of batch to avoid extra I/O + val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) + latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, + Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index e5a26d8f118b9..62dbd1a0e393b 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -19,29 +19,33 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.TestUtils +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers, tempDir, waitForAllReassignmentsToComplete} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import java.util.Properties +import java.io.File +import java.util.{Optional, Properties} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { - val topicName = "foo" - val topicNameWithCustomConfigs = "foo2" - var adminClient: Admin = _ - var setOldMessageFormat: Boolean = false - val mockTime: Time = new MockTime(1) + private val topicName = "foo" + private val topicNameWithCustomConfigs = "foo2" + private var adminClient: Admin = _ + private val mockTime: Time = new MockTime(1) + private var version = RecordBatch.MAGIC_VALUE_V2 + private val dataFolder = Seq(tempDir().getAbsolutePath, tempDir().getAbsolutePath) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { @@ -56,11 +60,32 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { - setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testListMaxTimestampWithEmptyLog(quorum: String): Unit = { + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName) + assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset()) + assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testListVersion0(quorum: String): Unit = { + // create records for version 0 + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0) + produceMessagesInSeparateBatch() + + // update version to version 1 to list offset for max timestamp + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) + // the offset of max timestamp is always -1 if the batch version is 0 + verifyListOffsets(expectedMaxTimestampOffset = -1) + } + + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { @@ -107,7 +132,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest @ValueSource(strings = Array("zk")) def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = { - createOldMessageFormatBrokers() + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) produceMessagesInOneBatch() verifyListOffsets() @@ -123,15 +148,15 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest @ValueSource(strings = Array("zk")) def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = { - createOldMessageFormatBrokers() + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) produceMessagesInSeparateBatch() verifyListOffsets() // test LogAppendTime case setUpForLogAppendTimeCase() produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this separate batch test, it'll be the last offset 2 + // In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time + // for each batch, So it'll be the last offset 2 verifyListOffsets(topic = topicNameWithCustomConfigs, 2) } @@ -164,8 +189,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { // test LogAppendTime case setUpForLogAppendTimeCase() produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this separate batch test, it'll be the last offset 2 + // In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time + // for each batch, So it'll be the last offset 2 verifyListOffsets(topic = topicNameWithCustomConfigs, 2) } @@ -175,8 +200,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { createTopicWithConfig(topicNameWithCustomConfigs, props) } - private def createOldMessageFormatBrokers(): Unit = { - setOldMessageFormat = true + private def createMessageFormatBrokers(recordVersion: Byte): Unit = { + version = recordVersion recreateBrokers(reconfigure = true, startup = true) Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") adminClient = Admin.create(Map[String, Object]( @@ -189,14 +214,62 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { - val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) - assertEquals(0, earliestOffset.offset()) - - val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) - assertEquals(3, latestOffset.offset()) + def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) + + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) + + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + if (version >= RecordBatch.MAGIC_VALUE_V2) + // the epoch is related to the returned offset. + // Hence, it should be zero (the earliest leader epoch), regardless of new leader election + assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch()) + else + assertEquals(Optional.empty(), maxTimestampOffset.leaderEpoch()) + } - val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) - assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + // case 0: test the offsets from leader's append path + check() + + // case 1: test the offsets from follower's append path. + // we make a follower be the new leader to handle the ListOffsetRequest + def leader(): Int = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id() + + val previousLeader = leader() + val newLeader = brokers.map(_.config.brokerId).find(_ != previousLeader).get + + // change the leader to new one + adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), + Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader))))).all().get() + // wait for all reassignments get completed + waitForAllReassignmentsToComplete(adminClient) + // make sure we are able to see the new leader + var lastLeader = -1 + TestUtils.waitUntilTrue(() => { + lastLeader = leader() + lastLeader == newLeader + }, s"expected leader: $newLeader but actual: $lastLeader") + check() + + // case 2: test the offsets from recovery path. + // server will rebuild offset index according to log files if the index files are nonexistent + val indexFiles = brokers.flatMap(_.config.logDirs).toSet + brokers.foreach(b => killBroker(b.config.brokerId)) + indexFiles.foreach { root => + val files = new File(s"$root/$topic-0").listFiles() + if (files != null) files.foreach { f => + if (f.getName.endsWith(".index")) f.delete() + } + } + restartDeadBrokers() + Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") + adminClient = Admin.create(java.util.Collections.singletonMap( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers().asInstanceOf[Object])) + check() } private def runFetchOffsets(adminClient: Admin, @@ -261,11 +334,19 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props => - if (setOldMessageFormat) { + TestUtils.createBrokerConfigs(2, zkConnectOrNull).zipWithIndex.map{ case (props, index) => + if (version == RecordBatch.MAGIC_VALUE_V0) { + props.setProperty("log.message.format.version", "0.9.0") + props.setProperty("inter.broker.protocol.version", "0.9.0") + } + if (version == RecordBatch.MAGIC_VALUE_V1) { props.setProperty("log.message.format.version", "0.10.0") props.setProperty("inter.broker.protocol.version", "0.10.0") } + // We use mock timer so the records can get removed if the test env is too busy to complete + // tests before kafka-log-retention. Hence, we disable the retention to avoid failed tests + props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "-1") + props.setProperty(KafkaConfig.LogDirProp, dataFolder(index)) props }.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index bffd41156b357..29b5fd34f9091 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -100,7 +100,7 @@ class LocalLogTest { initialOffset: Long = 0L): Unit = { log.append(lastOffset = initialOffset + records.size - 1, largestTimestamp = records.head.timestamp, - offsetOfMaxTimestamp = initialOffset, + shallowOffsetOfMaxTimestamp = initialOffset, records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*)) } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index bb0c85a858360..b4e278c380230 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -86,10 +86,10 @@ class LogSegmentTest { def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { val seg = createSegment(baseOffset) val currentTime = Time.SYSTEM.milliseconds() - val offsetOfMaxTimestamp = largestOffset + val shallowOffsetOfMaxTimestamp = largestOffset val memoryRecords = records(0, "hello") assertThrows(classOf[LogSegmentOffsetOverflowException], () => { - seg.append(largestOffset, currentTime, offsetOfMaxTimestamp, memoryRecords) + seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords) }) } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 53b385c62e86c..edaa8b8483122 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -131,6 +131,12 @@ class LogValidatorTest { PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier) } + @Test + def testLogAppendTimeNonCompressedV0(): Unit = { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0) + } + + @Test def testLogAppendTimeNonCompressedV1(): Unit = { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) @@ -169,14 +175,18 @@ class LogValidatorTest { val validatedRecords = validatedResults.validatedRecords assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, "message set size should not change") val now = mockTime.milliseconds - validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) - assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") + if (magic >= RecordBatch.MAGIC_VALUE_V1) + validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) + assertEquals(if (magic == RecordBatch.MAGIC_VALUE_V0) RecordBatch.NO_TIMESTAMP else now, validatedResults.maxTimestampMs) assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") // If it's LOG_APPEND_TIME, the offset will be the offset of the first record - val expectedMaxTimestampOffset = 0 - assertEquals(expectedMaxTimestampOffset, validatedResults.offsetOfMaxTimestampMs, - s"The offset of max timestamp should be $expectedMaxTimestampOffset") + val expectedMaxTimestampOffset = magic match { + case RecordBatch.MAGIC_VALUE_V0 => -1 + case RecordBatch.MAGIC_VALUE_V1 => 0 + case _ => 2 + } + assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) } @@ -219,8 +229,8 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.offsetOfMaxTimestampMs, - s"The offset of max timestamp should be 0 if logAppendTime is used") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, + s"The shallow offset of max timestamp should be 2 if logAppendTime is used") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -271,8 +281,8 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.offsetOfMaxTimestampMs, - s"The offset of max timestamp should be 0 if logAppendTime is used") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, + s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedOffsetOfMaxTimestamp = 1 - assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) assertFalse(validatingResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -480,8 +488,8 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatingResults.offsetOfMaxTimestampMs, - "Offset of max timestamp should be 1") + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp, + "Shallow offset of max timestamp should be 2") assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -532,8 +540,7 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be -1") + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -579,8 +586,8 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(0, validatedResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, + s"Offset of max timestamp should be the last offset 2.") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -651,9 +658,9 @@ class LogValidatorTest { } assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedOffsetOfMaxTimestamp = 1 - assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") + val expectedShallowOffsetOfMaxTimestamp = 2 + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, + s"Shallow offset of max timestamp should be 2") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index db551b180fcd5..5a559565fc0da 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -182,11 +182,13 @@ class ListOffsetsRequestTest extends BaseRequestTest { TestUtils.generateAndProduceMessages(servers, topic, 9) TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version = -1)) - assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) - assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) + val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, partition, servers) + + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version = -1)) + assertEquals((10L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) + assertEquals((9L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) // Kill the first leader so that we can verify the epoch change when fetching the latest offset killBroker(firstLeaderId) @@ -197,18 +199,19 @@ class ListOffsetsRequestTest extends BaseRequestTest { val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, partition, servers) // No changes to written data - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) // The latest offset reflects the updated epoch assertEquals((10L, secondLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) - assertEquals((9L, secondLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) + // No changes of epoch since the offset of max timestamp reflects the epoch of batch + assertEquals((9L, firstLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 6783e83ca4457..fdbbb70ad8a81 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -112,9 +112,7 @@ class LogOffsetTest extends BaseRequestTest { log.truncateTo(0) - val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) - assertEquals(0L, secondOffset.get.offset) - assertEquals(-1L, secondOffset.get.timestamp) + assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @ParameterizedTest @@ -202,10 +200,8 @@ class LogOffsetTest extends BaseRequestTest { log.updateHighWatermark(log.logEndOffset) - val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) assertEquals(0L, log.logEndOffset) - assertEquals(0L, maxTimestampOffset.get.offset) - assertEquals(-1L, maxTimestampOffset.get.timestamp) + assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @deprecated("legacyFetchOffsetsBefore", since = "") diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala index 49efa0a49ba45..30528a6729ddc 100644 --- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala +++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala @@ -83,7 +83,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, // Now check message's crc val batches = FetchResponse.recordsOrFail(partitionData).batches.asScala var maxTimestamp = RecordBatch.NO_TIMESTAMP - var offsetOfMaxTimestamp = -1L + var shallowOffsetOfMaxTimestamp = -1L var lastOffset = state.logEndOffset var lastEpoch: OptionalInt = OptionalInt.empty() @@ -91,7 +91,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, batch.ensureValid() if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp - offsetOfMaxTimestamp = batch.baseOffset + shallowOffsetOfMaxTimestamp = batch.baseOffset } state.log.append(batch) state.logEndOffset = batch.nextOffset @@ -106,7 +106,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, lastOffset, lastEpoch, maxTimestamp, - offsetOfMaxTimestamp, + shallowOffsetOfMaxTimestamp, Time.SYSTEM.milliseconds(), state.logStartOffset, RecordValidationStats.EMPTY, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java index 76b9bef7d2c64..9211140aa3882 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java @@ -37,7 +37,7 @@ public class LogAppendInfo { private long firstOffset; private long lastOffset; private long maxTimestamp; - private long offsetOfMaxTimestamp; + private long shallowOffsetOfMaxTimestamp; private long logAppendTime; private long logStartOffset; private RecordValidationStats recordValidationStats; @@ -52,31 +52,31 @@ public class LogAppendInfo { /** * Creates an instance with the given params. * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. - * @param maxTimestamp The maximum timestamp of the message set. - * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCompression The source codec used in the message set (send by the producer) - * @param validBytes The number of valid bytes - * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. + * @param maxTimestamp The maximum timestamp of the message set. + * @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCompression The source codec used in the message set (send by the producer) + * @param validBytes The number of valid bytes + * @param lastOffsetOfFirstBatch The last offset of the first batch */ public LogAppendInfo(long firstOffset, long lastOffset, OptionalInt lastLeaderEpoch, long maxTimestamp, - long offsetOfMaxTimestamp, + long shallowOffsetOfMaxTimestamp, long logAppendTime, long logStartOffset, RecordValidationStats recordValidationStats, CompressionType sourceCompression, int validBytes, long lastOffsetOfFirstBatch) { - this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, + this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(), LeaderHwChange.NONE); } @@ -84,27 +84,27 @@ public LogAppendInfo(long firstOffset, /** * Creates an instance with the given params. * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. - * @param maxTimestamp The maximum timestamp of the message set. - * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCompression The source codec used in the message set (send by the producer) - * @param validBytes The number of valid bytes - * @param lastOffsetOfFirstBatch The last offset of the first batch - * @param recordErrors List of record errors that caused the respective batch to be dropped - * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record - * Same if high watermark is not changed. None is the default value and it means append failed + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. + * @param maxTimestamp The maximum timestamp of the message set. + * @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCompression The source codec used in the message set (send by the producer) + * @param validBytes The number of valid bytes + * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param recordErrors List of record errors that caused the respective batch to be dropped + * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record + * Same if high watermark is not changed. None is the default value and it means append failed */ public LogAppendInfo(long firstOffset, long lastOffset, OptionalInt lastLeaderEpoch, long maxTimestamp, - long offsetOfMaxTimestamp, + long shallowOffsetOfMaxTimestamp, long logAppendTime, long logStartOffset, RecordValidationStats recordValidationStats, @@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset, this.lastOffset = lastOffset; this.lastLeaderEpoch = lastLeaderEpoch; this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordValidationStats = recordValidationStats; @@ -156,12 +156,12 @@ public void setMaxTimestamp(long maxTimestamp) { this.maxTimestamp = maxTimestamp; } - public long offsetOfMaxTimestamp() { - return offsetOfMaxTimestamp; + public long shallowOffsetOfMaxTimestamp() { + return shallowOffsetOfMaxTimestamp; } - public void setOffsetOfMaxTimestamp(long offsetOfMaxTimestamp) { - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + public void setShallowOffsetOfMaxTimestamp(long shallowOffsetOfMaxTimestamp) { + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } public long logAppendTime() { @@ -233,7 +233,7 @@ public long numMessages() { * @return a new instance with the given LeaderHwChange */ public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) { - return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, + return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange); } @@ -259,7 +259,7 @@ public String toString() { ", lastOffset=" + lastOffset + ", lastLeaderEpoch=" + lastLeaderEpoch + ", maxTimestamp=" + maxTimestamp + - ", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp + + ", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + ", logAppendTime=" + logAppendTime + ", logStartOffset=" + logStartOffset + ", recordConversionStats=" + recordValidationStats + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 30d2a62a3c958..2f806bfcf32ed 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -89,7 +89,8 @@ public MetricName metricName(String name, Map tags) { // volatile for LogCleaner to see the update private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); - /* The maximum timestamp and offset we see so far */ + // The maximum timestamp and offset we see so far + // NOTED: the offset is the last offset of batch having the max timestamp. private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; private long created; @@ -208,7 +209,7 @@ public long maxTimestampSoFar() throws IOException { /** * Note that this may result in time index materialization. */ - private long offsetOfMaxTimestampSoFar() throws IOException { + private long shallowOffsetOfMaxTimestampSoFar() throws IOException { return readMaxTimestampAndOffsetSoFar().offset; } @@ -232,17 +233,17 @@ private boolean canConvertToRelativeOffset(long offset) throws IOException { * * @param largestOffset The last offset in the message set * @param largestTimestampMs The largest timestamp in the message set. - * @param offsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param shallowOffsetOfMaxTimestamp The last offset of earliest batch with max timestamp in the messages to append. * @param records The log entries to append. * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ public void append(long largestOffset, long largestTimestampMs, - long offsetOfMaxTimestamp, + long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException { if (records.sizeInBytes() > 0) { LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}", - records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, offsetOfMaxTimestamp); + records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); int physicalPosition = log.sizeInBytes(); if (physicalPosition == 0) rollingBasedTimestamp = OptionalLong.of(largestTimestampMs); @@ -254,12 +255,12 @@ public void append(long largestOffset, LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); // Update the in memory max timestamp and corresponding offset. if (largestTimestampMs > maxTimestampSoFar()) { - maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, offsetOfMaxTimestamp); + maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex().append(largestOffset, physicalPosition); - timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); + timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); bytesSinceLastIndexEntry = 0; } bytesSinceLastIndexEntry += records.sizeInBytes(); @@ -274,7 +275,7 @@ private void ensureOffsetInRange(long offset) throws IOException { private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException { int bytesToAppend = 0; long maxTimestamp = Long.MIN_VALUE; - long offsetOfMaxTimestamp = Long.MIN_VALUE; + long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE; long maxOffset = Long.MIN_VALUE; ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024); @@ -285,7 +286,7 @@ private int appendChunkFromFile(FileRecords records, int position, BufferSupplie while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) { if (batch.maxTimestamp() > maxTimestamp) { maxTimestamp = batch.maxTimestamp(); - offsetOfMaxTimestamp = batch.lastOffset(); + shallowOffsetOfMaxTimestamp = batch.lastOffset(); } maxOffset = batch.lastOffset(); bytesToAppend += batch.sizeInBytes(); @@ -299,7 +300,7 @@ private int appendChunkFromFile(FileRecords records, int position, BufferSupplie readBuffer.limit(bytesToAppend); records.readInto(readBuffer, position); - append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); + append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); } bufferSupplier.release(readBuffer); @@ -488,7 +489,7 @@ public int recover(ProducerStateManager producerStateManager, Optional indexIntervalBytes) { offsetIndex().append(batch.lastOffset(), validBytes); - timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); + timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); lastIndexEntry = validBytes; } validBytes += batch.sizeInBytes(); @@ -513,7 +514,7 @@ public int recover(ProducerStateManager producerStateManager, Optional findOffsetByTimestamp(long times @Override public void close() throws IOException { if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) - Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true)); + Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true)); Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER); Utils.closeQuietly(log, "log", LOGGER); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 9aa1e06633b6d..2f3d62b2e9334 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -68,17 +68,20 @@ public static class ValidationResult { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; - public final long offsetOfMaxTimestampMs; + // we only maintain batch level offset for max timestamp since we want to align the behavior of updating time + // indexing entries. The paths of follower append and replica recovery do not iterate all records, so they have no + // idea about record level offset for max timestamp. + public final long shallowOffsetOfMaxTimestamp; public final boolean messageSizeMaybeChanged; public final RecordValidationStats recordValidationStats; public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, - long offsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, + long shallowOffsetOfMaxTimestamp, boolean messageSizeMaybeChanged, RecordValidationStats recordValidationStats) { this.logAppendTimeMs = logAppendTimeMs; this.validatedRecords = validatedRecords; this.maxTimestampMs = maxTimestampMs; - this.offsetOfMaxTimestampMs = offsetOfMaxTimestampMs; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.recordValidationStats = recordValidationStats; } @@ -149,7 +152,7 @@ public LogValidator(MemoryRecords records, * avoid expensive re-compression. * * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset - * of the message with the max timestamp and a boolean indicating whether the message sizes may have changed. + * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. */ public ValidationResult validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter, MetricsRecorder metricsRecorder, @@ -232,7 +235,7 @@ private ValidationResult convertAndAssignOffsetsNonCompressed(LongRef offsetCoun now, convertedRecords, info.maxTimestamp, - info.offsetOfMaxTimestamp, + info.shallowOffsetOfMaxTimestamp, true, recordValidationStats); } @@ -242,7 +245,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, MetricsRecorder metricsRecorder) { long now = time.milliseconds(); long maxTimestamp = RecordBatch.NO_TIMESTAMP; - long offsetOfMaxTimestamp = -1L; + long shallowOffsetOfMaxTimestamp = -1L; long initialOffset = offsetCounter.value; RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE); @@ -251,7 +254,6 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, validateBatch(topicPartition, firstBatch, batch, origin, toMagic, metricsRecorder); long maxBatchTimestamp = RecordBatch.NO_TIMESTAMP; - long offsetOfMaxBatchTimestamp = -1L; List recordErrors = new ArrayList<>(0); // This is a hot path and we want to avoid any unnecessary allocations. @@ -263,11 +265,9 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); recordError.ifPresent(recordErrors::add); - long offset = offsetCounter.value++; - if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) { + offsetCounter.value++; + if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) maxBatchTimestamp = record.timestamp(); - offsetOfMaxBatchTimestamp = offset; - } ++recordIndex; } @@ -275,7 +275,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; - offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; + shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; } batch.setLastOffset(offsetCounter.value - 1); @@ -293,14 +293,30 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; + // those checks should be equal to MemoryRecordsBuilder#info + switch (toMagic) { + case RecordBatch.MAGIC_VALUE_V0: + maxTimestamp = RecordBatch.NO_TIMESTAMP; + // value will be the default value: -1 + shallowOffsetOfMaxTimestamp = -1; + break; + case RecordBatch.MAGIC_VALUE_V1: + // Those single-record batches have same max timestamp, so the initial offset is equal with + // the last offset of earliest batch + shallowOffsetOfMaxTimestamp = initialOffset; + break; + default: + // there is only one batch so use the last offset + shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; + break; + } } return new ValidationResult( now, records, maxTimestamp, - offsetOfMaxTimestamp, + shallowOffsetOfMaxTimestamp, false, RecordValidationStats.EMPTY); } @@ -326,8 +342,6 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse long maxTimestamp = RecordBatch.NO_TIMESTAMP; LongRef expectedInnerOffset = PrimitiveRef.ofLong(0); List validatedRecords = new ArrayList<>(); - long offsetOfMaxTimestamp = -1; - long initialOffset = offsetCounter.value; int uncompressedSizeInBytes = 0; @@ -377,11 +391,8 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { - if (record.timestamp() > maxTimestamp) { + if (record.timestamp() > maxTimestamp) maxTimestamp = record.timestamp(); - // The offset is only increased when it is a valid record - offsetOfMaxTimestamp = initialOffset + validatedRecords.size(); - } // Some older clients do not implement the V1 internal offsets correctly. // Historically the broker handled this by rewriting the batches rather @@ -415,13 +426,12 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse // we can update the batch only and write the compressed payload as is; // again we assume only one record batch within the compressed set offsetCounter.value += validatedRecords.size(); + // there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestamp long lastOffset = offsetCounter.value - 1; firstBatch.setLastOffset(lastOffset); - if (timestampType == TimestampType.LOG_APPEND_TIME) { + if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; - } if (toMagic >= RecordBatch.MAGIC_VALUE_V1) firstBatch.setMaxTimestamp(timestampType, maxTimestamp); @@ -434,7 +444,7 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse now, records, maxTimestamp, - offsetOfMaxTimestamp, + lastOffset, false, recordValidationStats); } @@ -476,7 +486,7 @@ private ValidationResult buildRecordsAndAssignOffsets(LongRef offsetCounter, logAppendTime, records, info.maxTimestamp, - info.offsetOfMaxTimestamp, + info.shallowOffsetOfMaxTimestamp, true, recordValidationStats); }