From a7d62883b305a515caa836e2908d64268112dbcc Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 29 Mar 2024 04:21:25 +0800 Subject: [PATCH 01/31] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp anymore --- .../apache/kafka/common/record/RecordBatch.java | 17 +++++++++++++++++ core/src/main/scala/kafka/log/UnifiedLog.scala | 17 ++++++++++------- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 7d231c1774367..f569bbe48c37b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.Optional; import java.util.OptionalLong; /** @@ -245,4 +246,20 @@ public interface RecordBatch extends Iterable { * @return Whether this is a batch containing control records */ boolean isControlBatch(); + + /** + * iterate all records to find the offset of max timestamp. + * noted that the earliest offset will return if there are multi records having same (max) timestamp + * @return offset of max timestamp + */ + default Optional offsetOfMaxTimestamp() { + long maxTimestamp = maxTimestamp(); + try (CloseableIterator iter = streamingIterator(BufferSupplier.create())) { + while (iter.hasNext()) { + Record record = iter.next(); + if (maxTimestamp == record.timestamp()) return Optional.of(record.offset()); + } + } + return Optional.empty(); + } } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 0fdc236e720c2..4bca02897de32 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.asScala.toBuffer - val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) - val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - - Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) + val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) + // cache the timestamp and offset + val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar + // lookup the position of batch to avoid extra I/O + val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) + latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .map(batch => new TimestampAndOffset(batch.maxTimestamp(), batch.offsetOfMaxTimestamp().orElse(-1), + latestEpochAsOptional(leaderEpochCache))) + .headOption } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { From 2c00cee4a72b273733deae258ab78c6f9ab1a47c Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 29 Mar 2024 17:27:21 +0800 Subject: [PATCH 02/31] add more tests --- .../src/main/scala/kafka/log/UnifiedLog.scala | 3 +- .../admin/ListOffsetsIntegrationTest.scala | 52 +++++++++++++++---- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 4bca02897de32..ece4151c78314 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1343,10 +1343,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, // lookup the position of batch to avoid extra I/O val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) latestTimestampSegment.log.batchesFrom(position.position).asScala - .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) .map(batch => new TimestampAndOffset(batch.maxTimestamp(), batch.offsetOfMaxTimestamp().orElse(-1), latestEpochAsOptional(leaderEpochCache))) - .headOption } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 5362a1d5e35c9..b1c8f27539cd1 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -19,7 +19,7 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers, waitForAllReassignmentsToComplete} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord @@ -31,7 +31,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import java.util.Properties +import java.io.File +import java.util.{Optional, Properties} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -189,14 +190,47 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { - val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) - assertEquals(0, earliestOffset.offset()) + def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) - val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) - assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) - val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) - assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + } + + // case 0: test the offsets from leader's append path + check() + + // case 1: test the offsets from follower's append path. + // we make a follower be the new leader to handle the ListOffsetRequest + val partitionAssignment = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0) + val newLeader = brokers.map(_.config.brokerId).find(_ != partitionAssignment.leader().id()).get + adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), + Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader))))).all().get() + waitForAllReassignmentsToComplete(adminClient) + assertEquals(newLeader, adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id()) + check() + + // case 2: test the offsets from recovery path. + // server will rebuild offset index according to log files if the index files are nonexistent + val indexFiles = brokers.flatMap(_.config.logDirs).toSet + brokers.foreach(b => killBroker(b.config.brokerId)) + indexFiles.foreach { root => + val files = new File(s"$root/$topic-0").listFiles() + if (files != null) files.foreach { f => + if (f.getName.endsWith(".index")) f.delete() + } + } + restartDeadBrokers() + Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") + adminClient = Admin.create(java.util.Collections.singletonMap( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers().asInstanceOf[Object])) + check() } private def runFetchOffsets(adminClient: Admin, @@ -261,7 +295,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props => + TestUtils.createBrokerConfigs(2, zkConnectOrNull).map{ props => if (setOldMessageFormat) { props.setProperty("log.message.format.version", "0.10.0") props.setProperty("inter.broker.protocol.version", "0.10.0") From 90d33e0dcc4197b73862e6185e562ab236f073ae Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 30 Mar 2024 11:50:07 +0800 Subject: [PATCH 03/31] revert code and add comments --- .../kafka/common/record/MemoryRecords.java | 2 +- .../common/record/MemoryRecordsBuilder.java | 40 +++++++++++++------ .../record/MemoryRecordsBuilderTest.java | 23 ++++++----- .../src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../unit/kafka/log/LogValidatorTest.scala | 16 ++++---- .../storage/internals/log/LogValidator.java | 32 ++++++--------- 6 files changed, 61 insertions(+), 54 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 3ba60b09b30df..ac4e0e8c19ec6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -209,7 +209,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable= RecordBatch.MAGIC_VALUE_V2) + // case 0: there is only one batch so use the last offset + return new RecordsInfo(logAppendTime, lastOffset); + else + // case 1: there is many single-record batches having same max timestamp, so the base offset is + // equal with the last offset of earliest batch + return new RecordsInfo(logAppendTime, baseOffset); + } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { + return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { - // For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping - // If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] - return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + // ditto to case 0 + return new RecordsInfo(maxTimestamp, lastOffset); + else + // case 2: there is many single-record batches, and so offsetOfMaxTimestamp is equal with + // the last offset of batch which having max timestamp + return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); } } @@ -851,12 +865,12 @@ private long nextSequentialOffset() { public static class RecordsInfo { public final long maxTimestamp; - public final long offsetOfMaxTimestamp; + public final long shallowOffsetOfMaxTimestamp; public RecordsInfo(long maxTimestamp, - long offsetOfMaxTimestamp) { + long shallowOffsetOfMaxTimestamp) { this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index eaaa95ff673cf..33a419e64ee9d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -377,8 +377,11 @@ public void buildUsingLogAppendTime(Args args) { MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(logAppendTime, info.maxTimestamp); - // When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp - assertEquals(0L, info.offsetOfMaxTimestamp); + + if (args.compressionType == CompressionType.NONE && magic <= MAGIC_VALUE_V1) + assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + else + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (RecordBatch batch : records.batches()) { if (magic == MAGIC_VALUE_V0) { @@ -412,11 +415,10 @@ public void buildUsingCreateTime(Args args) { assertEquals(2L, info.maxTimestamp); } - if (magic == MAGIC_VALUE_V0) - // in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1. - assertEquals(-1L, info.offsetOfMaxTimestamp); + if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1) + assertEquals(1L, info.shallowOffsetOfMaxTimestamp); else - assertEquals(1L, info.offsetOfMaxTimestamp); + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); int i = 0; long[] expectedTimestamps = new long[] {0L, 2L, 1L}; @@ -493,13 +495,12 @@ public void writePastLimit(Args args) { MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); - if (magic == MAGIC_VALUE_V0) { + if (magic == MAGIC_VALUE_V0) assertEquals(-1, info.maxTimestamp); - assertEquals(-1L, info.offsetOfMaxTimestamp); - } else { + else assertEquals(2L, info.maxTimestamp); - assertEquals(2L, info.offsetOfMaxTimestamp); - } + + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); long i = 0L; for (RecordBatch batch : records.batches()) { diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index ece4151c78314..251b6cd619db5 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs) + appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) appendInfo.setLastOffset(offset.value - 1) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 53b385c62e86c..0267832955f99 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -175,7 +175,7 @@ class LogValidatorTest { // If it's LOG_APPEND_TIME, the offset will be the offset of the first record val expectedMaxTimestampOffset = 0 - assertEquals(expectedMaxTimestampOffset, validatedResults.offsetOfMaxTimestampMs, + assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs, s"The offset of max timestamp should be $expectedMaxTimestampOffset") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) @@ -219,7 +219,7 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.offsetOfMaxTimestampMs, + assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, s"The offset of max timestamp should be 0 if logAppendTime is used") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -271,7 +271,7 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.offsetOfMaxTimestampMs, + assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, s"The offset of max timestamp should be 0 if logAppendTime is used") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -405,7 +405,7 @@ class LogValidatorTest { s"Max timestamp should be ${now + 1}") val expectedOffsetOfMaxTimestamp = 1 - assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, + assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be 1") assertFalse(validatingResults.messageSizeMaybeChanged, @@ -480,7 +480,7 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatingResults.offsetOfMaxTimestampMs, + assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, "Offset of max timestamp should be 1") assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -532,7 +532,7 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.offsetOfMaxTimestampMs, + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be -1") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -579,7 +579,7 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(0, validatedResults.offsetOfMaxTimestampMs, + assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -652,7 +652,7 @@ class LogValidatorTest { assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") val expectedOffsetOfMaxTimestamp = 1 - assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.offsetOfMaxTimestampMs, + assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be 1") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 9aa1e06633b6d..24c80864fc528 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -68,17 +68,17 @@ public static class ValidationResult { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; - public final long offsetOfMaxTimestampMs; + public final long shallowOffsetOfMaxTimestampMs; public final boolean messageSizeMaybeChanged; public final RecordValidationStats recordValidationStats; public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, - long offsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, + long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, RecordValidationStats recordValidationStats) { this.logAppendTimeMs = logAppendTimeMs; this.validatedRecords = validatedRecords; this.maxTimestampMs = maxTimestampMs; - this.offsetOfMaxTimestampMs = offsetOfMaxTimestampMs; + this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs; this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.recordValidationStats = recordValidationStats; } @@ -232,7 +232,7 @@ private ValidationResult convertAndAssignOffsetsNonCompressed(LongRef offsetCoun now, convertedRecords, info.maxTimestamp, - info.offsetOfMaxTimestamp, + info.shallowOffsetOfMaxTimestamp, true, recordValidationStats); } @@ -242,8 +242,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, MetricsRecorder metricsRecorder) { long now = time.milliseconds(); long maxTimestamp = RecordBatch.NO_TIMESTAMP; - long offsetOfMaxTimestamp = -1L; - long initialOffset = offsetCounter.value; + long shallowOffsetOfMaxTimestamp = -1L; RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE); @@ -275,7 +274,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; - offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; + shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; } batch.setLastOffset(offsetCounter.value - 1); @@ -293,14 +292,13 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; } return new ValidationResult( now, records, maxTimestamp, - offsetOfMaxTimestamp, + shallowOffsetOfMaxTimestamp, false, RecordValidationStats.EMPTY); } @@ -326,8 +324,6 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse long maxTimestamp = RecordBatch.NO_TIMESTAMP; LongRef expectedInnerOffset = PrimitiveRef.ofLong(0); List validatedRecords = new ArrayList<>(); - long offsetOfMaxTimestamp = -1; - long initialOffset = offsetCounter.value; int uncompressedSizeInBytes = 0; @@ -377,11 +373,8 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { - if (record.timestamp() > maxTimestamp) { + if (record.timestamp() > maxTimestamp) maxTimestamp = record.timestamp(); - // The offset is only increased when it is a valid record - offsetOfMaxTimestamp = initialOffset + validatedRecords.size(); - } // Some older clients do not implement the V1 internal offsets correctly. // Historically the broker handled this by rewriting the batches rather @@ -418,10 +411,8 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse long lastOffset = offsetCounter.value - 1; firstBatch.setLastOffset(lastOffset); - if (timestampType == TimestampType.LOG_APPEND_TIME) { + if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; - } if (toMagic >= RecordBatch.MAGIC_VALUE_V1) firstBatch.setMaxTimestamp(timestampType, maxTimestamp); @@ -434,7 +425,8 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse now, records, maxTimestamp, - offsetOfMaxTimestamp, + // there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestampMs + lastOffset, false, recordValidationStats); } @@ -476,7 +468,7 @@ private ValidationResult buildRecordsAndAssignOffsets(LongRef offsetCounter, logAppendTime, records, info.maxTimestamp, - info.offsetOfMaxTimestamp, + info.shallowOffsetOfMaxTimestamp, true, recordValidationStats); } From 5180f2dad88aebae8ac7ad91583fc6cbdedce508 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 30 Mar 2024 12:09:40 +0800 Subject: [PATCH 04/31] adjust comments --- .../apache/kafka/common/record/MemoryRecordsBuilder.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 07610ded5d5ab..dc094a4500685 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -260,7 +260,7 @@ public RecordsInfo info() { // case 0: there is only one batch so use the last offset return new RecordsInfo(logAppendTime, lastOffset); else - // case 1: there is many single-record batches having same max timestamp, so the base offset is + // case 1: there are many single-record batches having same max timestamp, so the base offset is // equal with the last offset of earliest batch return new RecordsInfo(logAppendTime, baseOffset); } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { @@ -270,8 +270,8 @@ public RecordsInfo info() { // ditto to case 0 return new RecordsInfo(maxTimestamp, lastOffset); else - // case 2: there is many single-record batches, and so offsetOfMaxTimestamp is equal with - // the last offset of batch which having max timestamp + // case 2: Each batch is composed of single record, and offsetOfMaxTimestamp points to the record having + // max timestamp. Hence, offsetOfMaxTimestamp is equal to the last offset of earliest batch (record) return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); } } From 79561cded3511e2983604e4b6ff97374a74a5939 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 30 Mar 2024 12:11:07 +0800 Subject: [PATCH 05/31] adjust comments --- .../org/apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index dc094a4500685..45607c6a9c1bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -243,7 +243,7 @@ public MemoryRecords build() { /** * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) - * The definition of shallowOffsetOfMaxTimestamp is the "last offset" of a batch which having max timestamp. + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. * If there are many batches having same max timestamp, we pick up the earliest batch. * * If the log append time is used, the offset will be the last offset unless no compression is used and From 726de99cc4caa47204bd499d68f1ee56c00826ca Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 30 Mar 2024 12:11:30 +0800 Subject: [PATCH 06/31] adjust comments --- .../org/apache/kafka/common/record/MemoryRecordsBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 45607c6a9c1bd..8f074135d7714 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -245,10 +245,10 @@ public MemoryRecords build() { * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. * If there are many batches having same max timestamp, we pick up the earliest batch. - * + *

* If the log append time is used, the offset will be the last offset unless no compression is used and * the message format version is 0 or 1, in which case, it will be the first offset. - * + *

* If create time is used, the offset will be the last offset unless no compression is used and the message * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. * From 29137ae0214098cc21f29dc54702e8409ca8fb35 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 30 Mar 2024 12:26:09 +0800 Subject: [PATCH 07/31] ranem offsetOfMaxTimestampSoFar to shallowOffsetOfMaxTimestampSoFar --- .../kafka/storage/internals/log/LogSegment.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 464858b4cd783..1dc0ef85a19f8 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -89,7 +89,8 @@ public MetricName metricName(String name, Map tags) { // volatile for LogCleaner to see the update private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); - /* The maximum timestamp and offset we see so far */ + // The maximum timestamp and offset we see so far + // NOTED: the offset is the last offset of batch having the max timestamp. private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; private long created; @@ -208,7 +209,7 @@ public long maxTimestampSoFar() throws IOException { /** * Note that this may result in time index materialization. */ - private long offsetOfMaxTimestampSoFar() throws IOException { + private long shallowOffsetOfMaxTimestampSoFar() throws IOException { return readMaxTimestampAndOffsetSoFar().offset; } @@ -259,7 +260,7 @@ public void append(long largestOffset, // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex().append(largestOffset, physicalPosition); - timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); + timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); bytesSinceLastIndexEntry = 0; } bytesSinceLastIndexEntry += records.sizeInBytes(); @@ -488,7 +489,7 @@ public int recover(ProducerStateManager producerStateManager, Optional indexIntervalBytes) { offsetIndex().append(batch.lastOffset(), validBytes); - timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); + timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); lastIndexEntry = validBytes; } validBytes += batch.sizeInBytes(); @@ -513,7 +514,7 @@ public int recover(ProducerStateManager producerStateManager, Optional findOffsetByTimestamp(long times @Override public void close() throws IOException { if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) - Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true)); + Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true)); Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER); Utils.closeQuietly(log, "log", LOGGER); From 2297b521a8399185bc06d56b8a1b25b784b7199c Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 1 Apr 2024 13:58:06 +0800 Subject: [PATCH 08/31] apply luke patch --- .../kafka/common/record/MemoryRecords.java | 18 +++++------ .../common/record/MemoryRecordsTest.java | 9 ++++-- core/src/main/scala/kafka/log/LocalLog.scala | 4 +-- .../src/main/scala/kafka/log/LogCleaner.scala | 2 +- .../scala/unit/kafka/log/LocalLogTest.scala | 2 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 4 +-- .../unit/kafka/log/LogValidatorTest.scala | 30 +++++++++---------- .../storage/internals/log/LogSegment.java | 8 ++--- .../storage/internals/log/LogValidator.java | 2 +- 9 files changed, 41 insertions(+), 38 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index ac4e0e8c19ec6..c5b43d4f88565 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -399,7 +399,7 @@ public static class FilterResult { private int bytesRetained = 0; private long maxOffset = -1L; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; - private long offsetOfMaxTimestamp = -1L; + private long shallowOffsetOfMaxTimestamp = -1L; private FilterResult(ByteBuffer outputBuffer) { this.outputBuffer = outputBuffer; @@ -411,21 +411,21 @@ private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int n retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained); } - private void updateRetainedBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset, + private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset, int messagesRetained, int bytesRetained) { - validateBatchMetadata(maxTimestamp, offsetOfMaxTimestamp, maxOffset); + validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset); if (maxTimestamp > this.maxTimestamp) { this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } this.maxOffset = Math.max(maxOffset, this.maxOffset); this.messagesRetained += messagesRetained; this.bytesRetained += bytesRetained; } - private void validateBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset) { - if (maxTimestamp != RecordBatch.NO_TIMESTAMP && offsetOfMaxTimestamp < 0) - throw new IllegalArgumentException("offset undefined for maximum timestamp " + maxTimestamp); + private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) { + if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0) + throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp); if (maxOffset < 0) throw new IllegalArgumentException("maxOffset undefined"); } @@ -458,8 +458,8 @@ public long maxTimestamp() { return maxTimestamp; } - public long offsetOfMaxTimestamp() { - return offsetOfMaxTimestamp; + public long shallowOffsetOfMaxTimestamp() { + return shallowOffsetOfMaxTimestamp; } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 9e688fc3ab695..3f0195bf5d149 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -352,7 +352,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { assertEquals(0, filterResult.messagesRetained()); assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); assertEquals(12, filterResult.maxTimestamp()); - assertEquals(baseOffset + 1, filterResult.offsetOfMaxTimestamp()); + assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp()); // Verify filtered records filtered.flip(); @@ -413,7 +413,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { assertEquals(0, filterResult.messagesRetained()); assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained()); assertEquals(timestamp, filterResult.maxTimestamp()); - assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp()); + assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp()); assertTrue(filterResult.outputBuffer().position() > 0); // Verify filtered records @@ -893,7 +893,10 @@ public void testFilterTo(Args args) { assertEquals(filtered.limit(), result.bytesRetained()); if (magic > RecordBatch.MAGIC_VALUE_V0) { assertEquals(20L, result.maxTimestamp()); - assertEquals(4L, result.offsetOfMaxTimestamp()); + if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2) + assertEquals(4L, result.shallowOffsetOfMaxTimestamp()); + else + assertEquals(5L, result.shallowOffsetOfMaxTimestamp()); } MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index a91bfae739e05..b2121f5312d7b 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -406,8 +406,8 @@ class LocalLog(@volatile private var _dir: File, } } - private[log] def append(lastOffset: Long, largestTimestamp: Long, offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { - segments.activeSegment.append(lastOffset, largestTimestamp, offsetOfMaxTimestamp, records) + private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { + segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records) updateLogEndOffset(lastOffset + 1) } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 35dadc8672f08..45aee545f82ce 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -812,7 +812,7 @@ private[log] class Cleaner(val id: Int, val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called - dest.append(result.maxOffset, result.maxTimestamp, result.offsetOfMaxTimestamp, retained) + dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained) throttler.maybeThrottle(outputBuffer.limit()) } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index bffd41156b357..29b5fd34f9091 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -100,7 +100,7 @@ class LocalLogTest { initialOffset: Long = 0L): Unit = { log.append(lastOffset = initialOffset + records.size - 1, largestTimestamp = records.head.timestamp, - offsetOfMaxTimestamp = initialOffset, + shallowOffsetOfMaxTimestamp = initialOffset, records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*)) } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index bb0c85a858360..b4e278c380230 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -86,10 +86,10 @@ class LogSegmentTest { def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { val seg = createSegment(baseOffset) val currentTime = Time.SYSTEM.milliseconds() - val offsetOfMaxTimestamp = largestOffset + val shallowOffsetOfMaxTimestamp = largestOffset val memoryRecords = records(0, "hello") assertThrows(classOf[LogSegmentOffsetOverflowException], () => { - seg.append(largestOffset, currentTime, offsetOfMaxTimestamp, memoryRecords) + seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords) }) } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 0267832955f99..7327308cd164c 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -219,8 +219,8 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, - s"The offset of max timestamp should be 0 if logAppendTime is used") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + s"The shallow offset of max timestamp should be 2 if logAppendTime is used") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -271,8 +271,8 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, - s"The offset of max timestamp should be 0 if logAppendTime is used") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -404,8 +404,8 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedOffsetOfMaxTimestamp = 1 - assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs, + val expectedShallowOffsetOfMaxTimestamp = 1 + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be 1") assertFalse(validatingResults.messageSizeMaybeChanged, @@ -480,8 +480,8 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, - "Offset of max timestamp should be 1") + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs, + "Shallow offset of max timestamp should be 2") assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -532,8 +532,8 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be -1") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + s"Offset of max timestamp should be the last offset 2") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -579,8 +579,8 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.") + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + s"Offset of max timestamp should be the last offset 2.") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -651,9 +651,9 @@ class LogValidatorTest { } assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedOffsetOfMaxTimestamp = 1 - assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") + val expectedShallowOffsetOfMaxTimestamp = 2 + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs, + s"Shallow offset of max timestamp should be 2") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 1dc0ef85a19f8..ce8587a8ffa77 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -233,17 +233,17 @@ private boolean canConvertToRelativeOffset(long offset) throws IOException { * * @param largestOffset The last offset in the message set * @param largestTimestampMs The largest timestamp in the message set. - * @param offsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param records The log entries to append. * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ public void append(long largestOffset, long largestTimestampMs, - long offsetOfMaxTimestamp, + long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException { if (records.sizeInBytes() > 0) { LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}", - records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, offsetOfMaxTimestamp); + records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); int physicalPosition = log.sizeInBytes(); if (physicalPosition == 0) rollingBasedTimestamp = OptionalLong.of(largestTimestampMs); @@ -255,7 +255,7 @@ public void append(long largestOffset, LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); // Update the in memory max timestamp and corresponding offset. if (largestTimestampMs > maxTimestampSoFar()) { - maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, offsetOfMaxTimestamp); + maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 24c80864fc528..7729e25922461 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -149,7 +149,7 @@ public LogValidator(MemoryRecords records, * avoid expensive re-compression. * * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset - * of the message with the max timestamp and a boolean indicating whether the message sizes may have changed. + * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. */ public ValidationResult validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter, MetricsRecorder metricsRecorder, From b4b13325f929eea8bf592f497dfd0062584eac75 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 2 Apr 2024 03:53:45 +0800 Subject: [PATCH 09/31] address comments --- .../common/record/MemoryRecordsBuilder.java | 7 ++++--- .../common/record/MemoryRecordsBuilderTest.java | 13 ++++++++----- core/src/main/scala/kafka/log/UnifiedLog.scala | 4 ++-- .../storage/internals/log/LogAppendInfo.java | 16 ++++++++-------- .../kafka/storage/internals/log/LogSegment.java | 2 +- .../storage/internals/log/LogValidator.java | 11 +++++++++++ 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 8f074135d7714..a9ebd6850c62d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -260,18 +260,19 @@ public RecordsInfo info() { // case 0: there is only one batch so use the last offset return new RecordsInfo(logAppendTime, lastOffset); else - // case 1: there are many single-record batches having same max timestamp, so the base offset is + // case 1: Those single-record batches have same max timestamp, so the base offset is // equal with the last offset of earliest batch return new RecordsInfo(logAppendTime, baseOffset); } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { - return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); + // it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] + return new RecordsInfo(RecordBatch.NO_TIMESTAMP, -1); } else { if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) // ditto to case 0 return new RecordsInfo(maxTimestamp, lastOffset); else // case 2: Each batch is composed of single record, and offsetOfMaxTimestamp points to the record having - // max timestamp. Hence, offsetOfMaxTimestamp is equal to the last offset of earliest batch (record) + // max timestamp. Hence, offsetOfMaxTimestamp is equal to the last offset of earliest batch with max timestamp return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 33a419e64ee9d..e279e952e7eaa 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -415,7 +415,9 @@ public void buildUsingCreateTime(Args args) { assertEquals(2L, info.maxTimestamp); } - if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1) + if (magic == MAGIC_VALUE_V0) + assertEquals(-1, info.shallowOffsetOfMaxTimestamp); + else if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1) assertEquals(1L, info.shallowOffsetOfMaxTimestamp); else assertEquals(2L, info.shallowOffsetOfMaxTimestamp); @@ -495,12 +497,13 @@ public void writePastLimit(Args args) { MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); - if (magic == MAGIC_VALUE_V0) + if (magic == MAGIC_VALUE_V0) { + assertEquals(-1, info.shallowOffsetOfMaxTimestamp); assertEquals(-1, info.maxTimestamp); - else + } else { + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); assertEquals(2L, info.maxTimestamp); - - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + } long i = 0L; for (RecordBatch batch : records.batches()) { diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 251b6cd619db5..8313367fd41c2 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) + appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) appendInfo.setLastOffset(offset.value - 1) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) @@ -905,7 +905,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // will be cleaned up after the log directory is recovered. Note that the end offset of the // ProducerStateManager will not be updated and the last stable offset will not advance // if the append to the transaction index fails. - localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords) + localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords) updateHighWatermarkWithLogEndOffset() // update the producer state diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java index 76b9bef7d2c64..b1055236cf769 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java @@ -37,7 +37,7 @@ public class LogAppendInfo { private long firstOffset; private long lastOffset; private long maxTimestamp; - private long offsetOfMaxTimestamp; + private long shallowOffsetOfMaxTimestamp; private long logAppendTime; private long logStartOffset; private RecordValidationStats recordValidationStats; @@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset, this.lastOffset = lastOffset; this.lastLeaderEpoch = lastLeaderEpoch; this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordValidationStats = recordValidationStats; @@ -156,12 +156,12 @@ public void setMaxTimestamp(long maxTimestamp) { this.maxTimestamp = maxTimestamp; } - public long offsetOfMaxTimestamp() { - return offsetOfMaxTimestamp; + public long shallowOffsetOfMaxTimestamp() { + return shallowOffsetOfMaxTimestamp; } - public void setOffsetOfMaxTimestamp(long offsetOfMaxTimestamp) { - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + public void setShallowOffsetOfMaxTimestamp(long shallowOffsetOfMaxTimestamp) { + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } public long logAppendTime() { @@ -233,7 +233,7 @@ public long numMessages() { * @return a new instance with the given LeaderHwChange */ public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) { - return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, + return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange); } @@ -259,7 +259,7 @@ public String toString() { ", lastOffset=" + lastOffset + ", lastLeaderEpoch=" + lastLeaderEpoch + ", maxTimestamp=" + maxTimestamp + - ", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp + + ", offsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + ", logAppendTime=" + logAppendTime + ", logStartOffset=" + logStartOffset + ", recordConversionStats=" + recordValidationStats + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index ce8587a8ffa77..d57a6340aebd1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -233,7 +233,7 @@ private boolean canConvertToRelativeOffset(long offset) throws IOException { * * @param largestOffset The last offset in the message set * @param largestTimestampMs The largest timestamp in the message set. - * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param shallowOffsetOfMaxTimestamp The last offset of earliest batch with max timestamp in the messages to append. * @param records The log entries to append. * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 7729e25922461..8ac9de4b26ce0 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -68,6 +68,9 @@ public static class ValidationResult { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; + // we only maintain batch level offset for max timestamp since we want to align the behavior of updating time + // indexing entries. The paths of follower sync and replica recovery do not iterate all records, so they have no + // idea about record level offset for max timestamp. public final long shallowOffsetOfMaxTimestampMs; public final boolean messageSizeMaybeChanged; public final RecordValidationStats recordValidationStats; @@ -243,6 +246,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, long now = time.milliseconds(); long maxTimestamp = RecordBatch.NO_TIMESTAMP; long shallowOffsetOfMaxTimestamp = -1L; + long initialOffset = offsetCounter.value; RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE); @@ -292,6 +296,13 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; + if (toMagic >= RecordBatch.MAGIC_VALUE_V2) + // case 0: there is only one batch so use the last offset + shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; + else + // case 1: Those single-record batches have same max timestamp, so the initial offset is equal with + // the last offset of earliest batch + shallowOffsetOfMaxTimestamp = initialOffset; } return new ValidationResult( From 609b6abeee408346d963734c2ddaabac7b47c9ba Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 2 Apr 2024 08:22:57 +0800 Subject: [PATCH 10/31] address comments --- .../src/main/scala/kafka/log/UnifiedLog.scala | 6 ++-- .../unit/kafka/log/LogValidatorTest.scala | 25 ++++++++----- .../storage/internals/log/LogAppendInfo.java | 36 +++++++++---------- .../storage/internals/log/LogValidator.java | 34 ++++++++++-------- 4 files changed, 57 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 8313367fd41c2..e8cfea907ef48 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1120,7 +1120,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, var sourceCompression = CompressionType.NONE var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP - var offsetOfMaxTimestamp = -1L + var shallowOffsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L @@ -1171,7 +1171,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp - offsetOfMaxTimestamp = lastOffset + shallowOffsetOfMaxTimestamp = lastOffset } validBytesCount += batchSize @@ -1190,7 +1190,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, else OptionalInt.empty() - new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp, + new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 7327308cd164c..080901fac74e9 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -131,6 +131,12 @@ class LogValidatorTest { PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier) } + @Test + def testLogAppendTimeNonCompressedV0(): Unit = { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0) + } + + @Test def testLogAppendTimeNonCompressedV1(): Unit = { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) @@ -169,14 +175,18 @@ class LogValidatorTest { val validatedRecords = validatedResults.validatedRecords assertEquals(records.records.asScala.size, validatedRecords.records.asScala.size, "message set size should not change") val now = mockTime.milliseconds - validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) + if (magic >= RecordBatch.MAGIC_VALUE_V1) + validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") // If it's LOG_APPEND_TIME, the offset will be the offset of the first record - val expectedMaxTimestampOffset = 0 - assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs, - s"The offset of max timestamp should be $expectedMaxTimestampOffset") + val expectedMaxTimestampOffset = magic match { + case RecordBatch.MAGIC_VALUE_V0 => -1 + case RecordBatch.MAGIC_VALUE_V1 => 0 + case _ => 2 + } + assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs) verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) } @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedShallowOffsetOfMaxTimestamp = 1 - assertEquals(expectedShallowOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs) assertFalse(validatingResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -532,8 +540,7 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, - s"Offset of max timestamp should be the last offset 2") + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs) assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java index b1055236cf769..6a3d00cae5e69 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java @@ -84,27 +84,27 @@ public LogAppendInfo(long firstOffset, /** * Creates an instance with the given params. * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. - * @param maxTimestamp The maximum timestamp of the message set. - * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCompression The source codec used in the message set (send by the producer) - * @param validBytes The number of valid bytes - * @param lastOffsetOfFirstBatch The last offset of the first batch - * @param recordErrors List of record errors that caused the respective batch to be dropped - * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record - * Same if high watermark is not changed. None is the default value and it means append failed + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. + * @param maxTimestamp The maximum timestamp of the message set. + * @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCompression The source codec used in the message set (send by the producer) + * @param validBytes The number of valid bytes + * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param recordErrors List of record errors that caused the respective batch to be dropped + * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record + * Same if high watermark is not changed. None is the default value and it means append failed */ public LogAppendInfo(long firstOffset, long lastOffset, OptionalInt lastLeaderEpoch, long maxTimestamp, - long offsetOfMaxTimestamp, + long shallowOffsetOfMaxTimestamp, long logAppendTime, long logStartOffset, RecordValidationStats recordValidationStats, @@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset, this.lastOffset = lastOffset; this.lastLeaderEpoch = lastLeaderEpoch; this.maxTimestamp = maxTimestamp; - this.shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordValidationStats = recordValidationStats; @@ -259,7 +259,7 @@ public String toString() { ", lastOffset=" + lastOffset + ", lastLeaderEpoch=" + lastLeaderEpoch + ", maxTimestamp=" + maxTimestamp + - ", offsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + + ", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + ", logAppendTime=" + logAppendTime + ", logStartOffset=" + logStartOffset + ", recordConversionStats=" + recordValidationStats + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 8ac9de4b26ce0..fb25db1373bff 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -69,7 +69,7 @@ public static class ValidationResult { public final MemoryRecords validatedRecords; public final long maxTimestampMs; // we only maintain batch level offset for max timestamp since we want to align the behavior of updating time - // indexing entries. The paths of follower sync and replica recovery do not iterate all records, so they have no + // indexing entries. The paths of follower append and replica recovery do not iterate all records, so they have no // idea about record level offset for max timestamp. public final long shallowOffsetOfMaxTimestampMs; public final boolean messageSizeMaybeChanged; @@ -254,7 +254,6 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, validateBatch(topicPartition, firstBatch, batch, origin, toMagic, metricsRecorder); long maxBatchTimestamp = RecordBatch.NO_TIMESTAMP; - long offsetOfMaxBatchTimestamp = -1L; List recordErrors = new ArrayList<>(0); // This is a hot path and we want to avoid any unnecessary allocations. @@ -266,11 +265,9 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); recordError.ifPresent(recordErrors::add); - long offset = offsetCounter.value++; - if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) { + offsetCounter.value++; + if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) maxBatchTimestamp = record.timestamp(); - offsetOfMaxBatchTimestamp = offset; - } ++recordIndex; } @@ -278,7 +275,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; - shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; + shallowOffsetOfMaxTimestamp = batch.lastOffset(); } batch.setLastOffset(offsetCounter.value - 1); @@ -296,13 +293,22 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; - if (toMagic >= RecordBatch.MAGIC_VALUE_V2) - // case 0: there is only one batch so use the last offset - shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; - else - // case 1: Those single-record batches have same max timestamp, so the initial offset is equal with - // the last offset of earliest batch - shallowOffsetOfMaxTimestamp = initialOffset; + // those checks should be equal to MemoryRecordsBuilder#info + switch(toMagic) { + case RecordBatch.MAGIC_VALUE_V0: + // value will be the default value -1 + shallowOffsetOfMaxTimestamp = -1; + break; + case RecordBatch.MAGIC_VALUE_V1: + // Those single-record batches have same max timestamp, so the initial offset is equal with + // the last offset of earliest batch + shallowOffsetOfMaxTimestamp = initialOffset; + break; + default: + // there is only one batch so use the last offset + shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; + break; + } } return new ValidationResult( From a535095669c0ce43311da002eb0500374cefb207 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 2 Apr 2024 08:28:19 +0800 Subject: [PATCH 11/31] tweak --- .../kafka/common/record/FileRecords.java | 6 ++-- .../src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../unit/kafka/log/LogValidatorTest.scala | 16 +++++------ .../unit/kafka/server/MockFetcherThread.scala | 6 ++-- .../storage/internals/log/LogAppendInfo.java | 28 +++++++++---------- .../storage/internals/log/LogSegment.java | 6 ++-- .../storage/internals/log/LogValidator.java | 8 +++--- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 23a88c277c969..84eb6f20ac5f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -355,18 +355,18 @@ public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingP */ public TimestampAndOffset largestTimestampAfter(int startingPosition) { long maxTimestamp = RecordBatch.NO_TIMESTAMP; - long offsetOfMaxTimestamp = -1L; + long shallowOffsetOfMaxTimestamp = -1L; int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH; for (RecordBatch batch : batchesFrom(startingPosition)) { long timestamp = batch.maxTimestamp(); if (timestamp > maxTimestamp) { maxTimestamp = timestamp; - offsetOfMaxTimestamp = batch.lastOffset(); + shallowOffsetOfMaxTimestamp = batch.lastOffset(); leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch(); } } - return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp, + return new TimestampAndOffset(maxTimestamp, shallowOffsetOfMaxTimestamp, maybeLeaderEpoch(leaderEpochOfMaxTimestamp)); } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index e8cfea907ef48..5817d1c0a4aad 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) + appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp) appendInfo.setLastOffset(offset.value - 1) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 080901fac74e9..f95b4be519050 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -186,7 +186,7 @@ class LogValidatorTest { case RecordBatch.MAGIC_VALUE_V1 => 0 case _ => 2 } - assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs) + assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) } @@ -229,7 +229,7 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, s"The shallow offset of max timestamp should be 2 if logAppendTime is used") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -281,7 +281,7 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -414,7 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs) + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) assertFalse(validatingResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -488,7 +488,7 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs, + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp should be 2") assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -540,7 +540,7 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs) + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -586,7 +586,7 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, s"Offset of max timestamp should be the last offset 2.") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -659,7 +659,7 @@ class LogValidatorTest { assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") val expectedShallowOffsetOfMaxTimestamp = 2 - assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs, + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, s"Shallow offset of max timestamp should be 2") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala index 49efa0a49ba45..30528a6729ddc 100644 --- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala +++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala @@ -83,7 +83,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, // Now check message's crc val batches = FetchResponse.recordsOrFail(partitionData).batches.asScala var maxTimestamp = RecordBatch.NO_TIMESTAMP - var offsetOfMaxTimestamp = -1L + var shallowOffsetOfMaxTimestamp = -1L var lastOffset = state.logEndOffset var lastEpoch: OptionalInt = OptionalInt.empty() @@ -91,7 +91,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, batch.ensureValid() if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp - offsetOfMaxTimestamp = batch.baseOffset + shallowOffsetOfMaxTimestamp = batch.baseOffset } state.log.append(batch) state.logEndOffset = batch.nextOffset @@ -106,7 +106,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, lastOffset, lastEpoch, maxTimestamp, - offsetOfMaxTimestamp, + shallowOffsetOfMaxTimestamp, Time.SYSTEM.milliseconds(), state.logStartOffset, RecordValidationStats.EMPTY, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java index 6a3d00cae5e69..9211140aa3882 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java @@ -52,31 +52,31 @@ public class LogAppendInfo { /** * Creates an instance with the given params. * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. - * @param maxTimestamp The maximum timestamp of the message set. - * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCompression The source codec used in the message set (send by the producer) - * @param validBytes The number of valid bytes - * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. + * @param maxTimestamp The maximum timestamp of the message set. + * @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCompression The source codec used in the message set (send by the producer) + * @param validBytes The number of valid bytes + * @param lastOffsetOfFirstBatch The last offset of the first batch */ public LogAppendInfo(long firstOffset, long lastOffset, OptionalInt lastLeaderEpoch, long maxTimestamp, - long offsetOfMaxTimestamp, + long shallowOffsetOfMaxTimestamp, long logAppendTime, long logStartOffset, RecordValidationStats recordValidationStats, CompressionType sourceCompression, int validBytes, long lastOffsetOfFirstBatch) { - this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, + this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(), LeaderHwChange.NONE); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index d57a6340aebd1..727b6d184b657 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -275,7 +275,7 @@ private void ensureOffsetInRange(long offset) throws IOException { private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException { int bytesToAppend = 0; long maxTimestamp = Long.MIN_VALUE; - long offsetOfMaxTimestamp = Long.MIN_VALUE; + long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE; long maxOffset = Long.MIN_VALUE; ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024); @@ -286,7 +286,7 @@ private int appendChunkFromFile(FileRecords records, int position, BufferSupplie while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) { if (batch.maxTimestamp() > maxTimestamp) { maxTimestamp = batch.maxTimestamp(); - offsetOfMaxTimestamp = batch.lastOffset(); + shallowOffsetOfMaxTimestamp = batch.lastOffset(); } maxOffset = batch.lastOffset(); bytesToAppend += batch.sizeInBytes(); @@ -300,7 +300,7 @@ private int appendChunkFromFile(FileRecords records, int position, BufferSupplie readBuffer.limit(bytesToAppend); records.readInto(readBuffer, position); - append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); + append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); } bufferSupplier.release(readBuffer); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index fb25db1373bff..0767e44fdc9bd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -71,17 +71,17 @@ public static class ValidationResult { // we only maintain batch level offset for max timestamp since we want to align the behavior of updating time // indexing entries. The paths of follower append and replica recovery do not iterate all records, so they have no // idea about record level offset for max timestamp. - public final long shallowOffsetOfMaxTimestampMs; + public final long shallowOffsetOfMaxTimestamp; public final boolean messageSizeMaybeChanged; public final RecordValidationStats recordValidationStats; public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, - long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, + long shallowOffsetOfMaxTimestamp, boolean messageSizeMaybeChanged, RecordValidationStats recordValidationStats) { this.logAppendTimeMs = logAppendTimeMs; this.validatedRecords = validatedRecords; this.maxTimestampMs = maxTimestampMs; - this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs; + this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.recordValidationStats = recordValidationStats; } @@ -442,7 +442,7 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse now, records, maxTimestamp, - // there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestampMs + // there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestamp lastOffset, false, recordValidationStats); From 3bd14513f07959dbdc05826c4e775257ecca2b89 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 2 Apr 2024 08:33:22 +0800 Subject: [PATCH 12/31] fix build --- .../org/apache/kafka/storage/internals/log/LogValidator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 0767e44fdc9bd..c4b33a7f9801a 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -294,9 +294,9 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; // those checks should be equal to MemoryRecordsBuilder#info - switch(toMagic) { + switch (toMagic) { case RecordBatch.MAGIC_VALUE_V0: - // value will be the default value -1 + // value will be the default value: -1 shallowOffsetOfMaxTimestamp = -1; break; case RecordBatch.MAGIC_VALUE_V1: From 8e1878a5377624316eaf6482ae7e12b6ebff5d0b Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 00:27:57 +0800 Subject: [PATCH 13/31] fix bug --- .../org/apache/kafka/storage/internals/log/LogValidator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index c4b33a7f9801a..0d1aa6505173b 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -275,7 +275,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; - shallowOffsetOfMaxTimestamp = batch.lastOffset(); + shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; } batch.setLastOffset(offsetCounter.value - 1); From e78d09afc81c432c50bacc1d43f1e16c01626b63 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 01:14:59 +0800 Subject: [PATCH 14/31] address comments --- .../org/apache/kafka/common/record/MemoryRecordsBuilder.java | 4 ++-- core/src/test/scala/unit/kafka/log/LogValidatorTest.scala | 2 +- .../org/apache/kafka/storage/internals/log/LogValidator.java | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a9ebd6850c62d..85e9e3f3bbd0e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -243,11 +243,11 @@ public MemoryRecords build() { /** * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) - * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch having max timestamp. * If there are many batches having same max timestamp, we pick up the earliest batch. *

* If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * the message format version is 0 or 1, in which case, it will be -1. *

* If create time is used, the offset will be the last offset unless no compression is used and the message * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index f95b4be519050..edaa8b8483122 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -177,7 +177,7 @@ class LogValidatorTest { val now = mockTime.milliseconds if (magic >= RecordBatch.MAGIC_VALUE_V1) validatedRecords.batches.forEach(batch => validateLogAppendTime(now, 1234L, batch)) - assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") + assertEquals(if (magic == RecordBatch.MAGIC_VALUE_V0) RecordBatch.NO_TIMESTAMP else now, validatedResults.maxTimestampMs) assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") // If it's LOG_APPEND_TIME, the offset will be the offset of the first record diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 0d1aa6505173b..2f3d62b2e9334 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -296,6 +296,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, // those checks should be equal to MemoryRecordsBuilder#info switch (toMagic) { case RecordBatch.MAGIC_VALUE_V0: + maxTimestamp = RecordBatch.NO_TIMESTAMP; // value will be the default value: -1 shallowOffsetOfMaxTimestamp = -1; break; @@ -425,6 +426,7 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse // we can update the batch only and write the compressed payload as is; // again we assume only one record batch within the compressed set offsetCounter.value += validatedRecords.size(); + // there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestamp long lastOffset = offsetCounter.value - 1; firstBatch.setLastOffset(lastOffset); @@ -442,7 +444,6 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse now, records, maxTimestamp, - // there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestamp lastOffset, false, recordValidationStats); From f084c892f4216fab144e0d69dec8466c69de8a07 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 02:41:15 +0800 Subject: [PATCH 15/31] revise comment --- .../common/record/MemoryRecordsBuilder.java | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 85e9e3f3bbd0e..34d23843771d4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -242,37 +242,51 @@ public MemoryRecords build() { /** - * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) - * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch having max timestamp. - * If there are many batches having same max timestamp, we pick up the earliest batch. + * There are three cases of finding max timestamp to return: + * 1) version 0: The max timestamp is NO_TIMESTAMP (-1) + * 2) LogAppendTime: All records have same timestamp, and so the max timestamp is equal to logAppendTime + * 3) CreateTime: The max timestamp of record *

- * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be -1. + * Let's talk about OffsetOfMaxTimestamp. There are some paths that we don't try to find the OffsetOfMaxTimestamp + * to avoid expensive records iteration. Those paths include follower append and index recovery. In order to + * avoid inconsistent time index, we let all paths find shallowOffsetOfMaxTimestamp instead of OffsetOfMaxTimestamp. *

- * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. - * - * @return The max timestamp and its offset + * Let's define the shallowOffsetOfMaxTimestamp: It is last offset of the batch having max timestamp. If there are + * many batches having same max timestamp, we pick up the earliest batch. + *

+ * There are five cases of finding shallowOffsetOfMaxTimestamp to return: + * 1) version 0: It is always the -1 + * 2) LogAppendTime with single batch: It is the offset of last record + * 3) LogAppendTime with many single-record batches: Those single-record batches have same max timestamp, so the + * base offset is equal with the last offset of earliest batch + * 4) CreateTime with single batch: We return offset of last record to follow the spec we mentioned above. Of course, + * we do have the OffsetOfMaxTimestamp for this case, but we want to make all paths + * find the shallowOffsetOfMaxTimestamp rather than offsetOfMaxTimestamp + * 5) CreateTime with many single-record batches: Each batch is composed of single record, and hence offsetOfMaxTimestamp + * is equal to the last offset of earliest batch with max timestamp */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) - // case 0: there is only one batch so use the last offset + // maxTimestamp => case 2 + // shallowOffsetOfMaxTimestamp => case 2 return new RecordsInfo(logAppendTime, lastOffset); else - // case 1: Those single-record batches have same max timestamp, so the base offset is - // equal with the last offset of earliest batch + // maxTimestamp => case 2 + // shallowOffsetOfMaxTimestamp => case 3 return new RecordsInfo(logAppendTime, baseOffset); } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { - // it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] + // maxTimestamp => case 1 + // shallowOffsetOfMaxTimestamp => case 1 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, -1); } else { if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) - // ditto to case 0 + // maxTimestamp => case 3 + // shallowOffsetOfMaxTimestamp => case 4 return new RecordsInfo(maxTimestamp, lastOffset); else - // case 2: Each batch is composed of single record, and offsetOfMaxTimestamp points to the record having - // max timestamp. Hence, offsetOfMaxTimestamp is equal to the last offset of earliest batch with max timestamp + // maxTimestamp => case 3 + // shallowOffsetOfMaxTimestamp => case 5 return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); } } From d0f6a9a70c53d32ab07e6be18fcd09efc3209388 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 03:03:13 +0800 Subject: [PATCH 16/31] fix failed test --- core/src/main/scala/kafka/log/UnifiedLog.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 5817d1c0a4aad..743a9adb45eaf 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1342,10 +1342,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar // lookup the position of batch to avoid extra I/O val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) - latestTimestampSegment.log.batchesFrom(position.position).asScala + val lpc = latestEpochAsOptional(leaderEpochCache) + Some(latestTimestampSegment.log.batchesFrom(position.position).asScala .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) - .map(batch => new TimestampAndOffset(batch.maxTimestamp(), batch.offsetOfMaxTimestamp().orElse(-1), - latestEpochAsOptional(leaderEpochCache))) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return something for backward compatibility } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { From 7aebd43fb56f568d56277e4b9afca116a7eafe4b Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 03:07:40 +0800 Subject: [PATCH 17/31] address comment --- .../org/apache/kafka/common/record/MemoryRecordsBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 34d23843771d4..5adf0246d81e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -257,8 +257,8 @@ public MemoryRecords build() { * There are five cases of finding shallowOffsetOfMaxTimestamp to return: * 1) version 0: It is always the -1 * 2) LogAppendTime with single batch: It is the offset of last record - * 3) LogAppendTime with many single-record batches: Those single-record batches have same max timestamp, so the - * base offset is equal with the last offset of earliest batch + * 3) LogAppendTime with many single-record batches: Those single-record batches have same max timestamp, so we return + * the base offset, which is equal to the last offset of earliest batch * 4) CreateTime with single batch: We return offset of last record to follow the spec we mentioned above. Of course, * we do have the OffsetOfMaxTimestamp for this case, but we want to make all paths * find the shallowOffsetOfMaxTimestamp rather than offsetOfMaxTimestamp From 88f8c6a9faf35d745ca7f778ba4540e9aec1c6ca Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 03:49:21 +0800 Subject: [PATCH 18/31] fix --- core/src/main/scala/kafka/log/UnifiedLog.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 743a9adb45eaf..e138670ad9bd0 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1346,7 +1346,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, Some(latestTimestampSegment.log.batchesFrom(position.position).asScala .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) - .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return something for backward compatibility + // return the base offset for backward compatibility if there is no batches + .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { From 8a7ed30692bd070fb4160a6cbc76a868484529c3 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 3 Apr 2024 08:18:51 +0800 Subject: [PATCH 19/31] return noneif no batch --- core/src/main/scala/kafka/log/UnifiedLog.scala | 4 +--- .../kafka/admin/ListOffsetsIntegrationTest.scala | 9 +++++++++ .../src/test/scala/unit/kafka/server/LogOffsetTest.scala | 8 ++------ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index e138670ad9bd0..87d187ef92cd7 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1343,11 +1343,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, // lookup the position of batch to avoid extra I/O val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) val lpc = latestEpochAsOptional(leaderEpochCache) - Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + latestTimestampSegment.log.batchesFrom(position.position).asScala .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) - // return the base offset for backward compatibility if there is no batches - .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index b1c8f27539cd1..3f5eb70064d83 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -62,6 +63,14 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { super.tearDown() } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListMaxTimestampWithEmptyLog(quorum: String): Unit = { + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName) + assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset()) + assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 6783e83ca4457..fdbbb70ad8a81 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -112,9 +112,7 @@ class LogOffsetTest extends BaseRequestTest { log.truncateTo(0) - val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) - assertEquals(0L, secondOffset.get.offset) - assertEquals(-1L, secondOffset.get.timestamp) + assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @ParameterizedTest @@ -202,10 +200,8 @@ class LogOffsetTest extends BaseRequestTest { log.updateHighWatermark(log.logEndOffset) - val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) assertEquals(0L, log.logEndOffset) - assertEquals(0L, maxTimestampOffset.get.offset) - assertEquals(-1L, maxTimestampOffset.get.timestamp) + assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @deprecated("legacyFetchOffsetsBefore", since = "") From 4785371c54e2fc2c540895ffe2f94829449937e6 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 4 Apr 2024 02:36:56 +0800 Subject: [PATCH 20/31] address comment --- .../kafka/common/record/RecordBatch.java | 5 ++- .../admin/ListOffsetsIntegrationTest.scala | 44 +++++++++++++++---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index f569bbe48c37b..bcdfbe66dcd6b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -249,10 +249,13 @@ public interface RecordBatch extends Iterable { /** * iterate all records to find the offset of max timestamp. - * noted that the earliest offset will return if there are multi records having same (max) timestamp + * noted: + * 1) that the earliest offset will return if there are multi records having same (max) timestamp + * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} * @return offset of max timestamp */ default Optional offsetOfMaxTimestamp() { + if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty(); long maxTimestamp = maxTimestamp(); try (CloseableIterator iter = streamingIterator(BufferSupplier.create())) { while (iter.hasNext()) { diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 3f5eb70064d83..98e1b772b9b43 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -19,12 +19,13 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers, waitForAllReassignmentsToComplete} +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers, tempDir, waitForAllReassignmentsToComplete} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.junit.jupiter.api.Assertions.assertEquals @@ -42,8 +43,9 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ - var setOldMessageFormat: Boolean = false val mockTime: Time = new MockTime(1) + var version = RecordBatch.MAGIC_VALUE_V2 + var dataFolder = Seq.empty[String] @BeforeEach override def setUp(testInfo: TestInfo): Unit = { @@ -58,7 +60,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { - setOldMessageFormat = false + version = RecordBatch.MAGIC_VALUE_V2 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @@ -71,6 +73,25 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) + def testListVersion0(quorum: String): Unit = { + // make sure the data is not deleted after restarting cluster + dataFolder = Seq(tempDir().getAbsolutePath, tempDir().getAbsolutePath) + try { + // create records for version 0 + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0) + produceMessagesInSeparateBatch() + + // update version to version 1 to list offset for max timestamp + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) + // the offset of max timestamp is always -1 if the batch version is 0 + verifyListOffsets(expectedMaxTimestampOffset = -1) + + } finally dataFolder = Seq.empty + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { @@ -117,7 +138,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk")) def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = { - createOldMessageFormatBrokers() + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) produceMessagesInOneBatch() verifyListOffsets() @@ -133,7 +154,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk")) def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = { - createOldMessageFormatBrokers() + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) produceMessagesInSeparateBatch() verifyListOffsets() @@ -185,8 +206,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { createTopicWithConfig(topicNameWithCustomConfigs, props) } - private def createOldMessageFormatBrokers(): Unit = { - setOldMessageFormat = true + private def createMessageFormatBrokers(recordVersion: Byte): Unit = { + version = recordVersion recreateBrokers(reconfigure = true, startup = true) Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") adminClient = Admin.create(Map[String, Object]( @@ -304,11 +325,16 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(2, zkConnectOrNull).map{ props => - if (setOldMessageFormat) { + TestUtils.createBrokerConfigs(2, zkConnectOrNull).zipWithIndex.map{ case (props, index) => + if (version == RecordBatch.MAGIC_VALUE_V0) { + props.setProperty("log.message.format.version", "0.9.0") + props.setProperty("inter.broker.protocol.version", "0.9.0") + } + if (version == RecordBatch.MAGIC_VALUE_V1) { props.setProperty("log.message.format.version", "0.10.0") props.setProperty("inter.broker.protocol.version", "0.10.0") } + if (dataFolder.size > index) props.setProperty(KafkaConfig.LogDirProp, dataFolder(index)) props }.map(KafkaConfig.fromProps) } From 79b5226a7bcfcaa58ef64a4a5b022ef09ffde171 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 4 Apr 2024 04:20:29 +0800 Subject: [PATCH 21/31] address reviews --- .../main/java/org/apache/kafka/common/record/RecordBatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index bcdfbe66dcd6b..e36beff08f2ac 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -251,7 +251,7 @@ public interface RecordBatch extends Iterable { * iterate all records to find the offset of max timestamp. * noted: * 1) that the earliest offset will return if there are multi records having same (max) timestamp - * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} + * 2) it always returns None if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} * @return offset of max timestamp */ default Optional offsetOfMaxTimestamp() { From e6fa99fb361c5e407b19c8133d839ef002a1d445 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 4 Apr 2024 15:04:32 +0800 Subject: [PATCH 22/31] give more time to sync --- .../integration/kafka/admin/ListOffsetsIntegrationTest.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 98e1b772b9b43..1f938583844d4 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -242,7 +242,9 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader))))).all().get() waitForAllReassignmentsToComplete(adminClient) - assertEquals(newLeader, adminClient.describeTopics(java.util.Collections.singletonList(topic)) + TestUtils.waitUntilTrue(() => newLeader == adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id(), "expected leader: " + newLeader + + ", but actual leader: " + adminClient.describeTopics(java.util.Collections.singletonList(topic)) .allTopicNames().get().get(topic).partitions().get(0).leader().id()) check() From 3debf0b3b7d9db9079709303f20085b79124312e Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 5 Apr 2024 00:42:26 +0800 Subject: [PATCH 23/31] revise test --- .../admin/ListOffsetsIntegrationTest.scala | 50 +++++++++---------- .../storage/internals/log/LogSegment.java | 1 + 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 1f938583844d4..321acac57c9a3 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -40,12 +40,12 @@ import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { - val topicName = "foo" - val topicNameWithCustomConfigs = "foo2" - var adminClient: Admin = _ - val mockTime: Time = new MockTime(1) - var version = RecordBatch.MAGIC_VALUE_V2 - var dataFolder = Seq.empty[String] + private val topicName = "foo" + private val topicNameWithCustomConfigs = "foo2" + private var adminClient: Admin = _ + private val mockTime: Time = new MockTime(1) + private var version = RecordBatch.MAGIC_VALUE_V2 + private val dataFolder = Seq(tempDir().getAbsolutePath, tempDir().getAbsolutePath) @BeforeEach override def setUp(testInfo: TestInfo): Unit = { @@ -60,7 +60,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { - version = RecordBatch.MAGIC_VALUE_V2 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @@ -76,19 +75,15 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk")) def testListVersion0(quorum: String): Unit = { - // make sure the data is not deleted after restarting cluster - dataFolder = Seq(tempDir().getAbsolutePath, tempDir().getAbsolutePath) - try { - // create records for version 0 - createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0) - produceMessagesInSeparateBatch() + // create records for version 0 + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0) + produceMessagesInSeparateBatch() - // update version to version 1 to list offset for max timestamp - createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) - // the offset of max timestamp is always -1 if the batch version is 0 - verifyListOffsets(expectedMaxTimestampOffset = -1) + // update version to version 1 to list offset for max timestamp + createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) + // the offset of max timestamp is always -1 if the batch version is 0 + verifyListOffsets(expectedMaxTimestampOffset = -1) - } finally dataFolder = Seq.empty } @@ -236,16 +231,19 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { // case 1: test the offsets from follower's append path. // we make a follower be the new leader to handle the ListOffsetRequest - val partitionAssignment = adminClient.describeTopics(java.util.Collections.singletonList(topic)) - .allTopicNames().get().get(topic).partitions().get(0) - val newLeader = brokers.map(_.config.brokerId).find(_ != partitionAssignment.leader().id()).get + def leader(): Int = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id() + + val previousLeader = leader() + val newLeader = brokers.map(_.config.brokerId).find(_ != previousLeader).get + + // change the leader to new one adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader))))).all().get() + // wait for all reassignments get completed waitForAllReassignmentsToComplete(adminClient) - TestUtils.waitUntilTrue(() => newLeader == adminClient.describeTopics(java.util.Collections.singletonList(topic)) - .allTopicNames().get().get(topic).partitions().get(0).leader().id(), "expected leader: " + newLeader - + ", but actual leader: " + adminClient.describeTopics(java.util.Collections.singletonList(topic)) - .allTopicNames().get().get(topic).partitions().get(0).leader().id()) + // make sure we are able to see the new leader + TestUtils.waitUntilTrue(() => newLeader == leader(), s"expected leader: $newLeader but actual: ${leader()}") check() // case 2: test the offsets from recovery path. @@ -336,7 +334,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { props.setProperty("log.message.format.version", "0.10.0") props.setProperty("inter.broker.protocol.version", "0.10.0") } - if (dataFolder.size > index) props.setProperty(KafkaConfig.LogDirProp, dataFolder(index)) + props.setProperty(KafkaConfig.LogDirProp, dataFolder(index)) props }.map(KafkaConfig.fromProps) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 727b6d184b657..b8dcf522b4bb2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -484,6 +484,7 @@ public int recover(ProducerStateManager producerStateManager, Optional maxTimestampSoFar()) { maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp(), batch.lastOffset()); + System.out.println("[CHIA] recovery: " + maxTimestampAndOffsetSoFar); } // Build offset index From 20fa22f4ebf4c40c76aa35046c58244470220f8b Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 5 Apr 2024 03:34:35 +0800 Subject: [PATCH 24/31] revise test --- .../kafka/admin/ListOffsetsIntegrationTest.scala | 6 +++++- .../org/apache/kafka/storage/internals/log/LogSegment.java | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 321acac57c9a3..e7d5f152f7f69 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -243,7 +243,11 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { // wait for all reassignments get completed waitForAllReassignmentsToComplete(adminClient) // make sure we are able to see the new leader - TestUtils.waitUntilTrue(() => newLeader == leader(), s"expected leader: $newLeader but actual: ${leader()}") + var lastLeader = leader() + TestUtils.waitUntilTrue(() => { + lastLeader = leader() + lastLeader == newLeader + }, s"expected leader: $newLeader but actual: $lastLeader") check() // case 2: test the offsets from recovery path. diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index b8dcf522b4bb2..727b6d184b657 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -484,7 +484,6 @@ public int recover(ProducerStateManager producerStateManager, Optional maxTimestampSoFar()) { maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp(), batch.lastOffset()); - System.out.println("[CHIA] recovery: " + maxTimestampAndOffsetSoFar); } // Build offset index From b06dfccc7ef3e2df4ea4591203a5676173497d0d Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 5 Apr 2024 04:59:50 +0800 Subject: [PATCH 25/31] set lastLeader to -1 --- .../integration/kafka/admin/ListOffsetsIntegrationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index e7d5f152f7f69..fdeee8168d7d4 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -243,7 +243,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { // wait for all reassignments get completed waitForAllReassignmentsToComplete(adminClient) // make sure we are able to see the new leader - var lastLeader = leader() + var lastLeader = -1 TestUtils.waitUntilTrue(() => { lastLeader = leader() lastLeader == newLeader From ace94e44bedd05ba6283573fe4785a943430f8c8 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 6 Apr 2024 01:46:29 +0800 Subject: [PATCH 26/31] set log.retention.ms --- .../integration/kafka/admin/ListOffsetsIntegrationTest.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index fdeee8168d7d4..3e243cb5094fb 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -338,6 +338,9 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { props.setProperty("log.message.format.version", "0.10.0") props.setProperty("inter.broker.protocol.version", "0.10.0") } + // We use mock timer so the records can get removed if the test env is too busy to complete + // tests before kafka-log-retention. Hence, we disable the retention to avoid failed tests + props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "-1") props.setProperty(KafkaConfig.LogDirProp, dataFolder(index)) props }.map(KafkaConfig.fromProps) From 8b1005e4385dae14901d5b07fcf365e49bf93127 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 7 Apr 2024 00:46:18 +0800 Subject: [PATCH 27/31] address comments --- .../src/main/scala/kafka/log/UnifiedLog.scala | 21 ++++++++----------- .../admin/ListOffsetsIntegrationTest.scala | 3 +++ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 87d187ef92cd7..9c71ff155cff8 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1270,15 +1270,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { debug(s"Searching offset for timestamp $targetTimestamp") - def latestEpochAsOptional(leaderEpochCache: Option[LeaderEpochFileCache]): Optional[Integer] = { - leaderEpochCache match { - case Some(cache) => - val latestEpoch = cache.latestEpoch() - if (latestEpoch.isPresent) Optional.of(latestEpoch.getAsInt) else Optional.empty[Integer]() - case None => Optional.empty[Integer]() - } - } - if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) @@ -1311,7 +1302,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) + val epoch = leaderEpochCache match { + case Some(cache) => + val latestEpoch = cache.latestEpoch() + if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]() + case None => Optional.empty[Integer]() + } + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { if (remoteLogEnabled()) { val curHighestRemoteOffset = highestOffsetInRemoteStorage() @@ -1342,10 +1339,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar // lookup the position of batch to avoid extra I/O val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) - val lpc = latestEpochAsOptional(leaderEpochCache) latestTimestampSegment.log.batchesFrom(position.position).asScala .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) - .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, + Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled()) { diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index 3e243cb5094fb..a215665b28c07 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -224,6 +224,9 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + // the epoch is related to the returned offset. + // Hence, it should be zero (the earliest leader epoch), regardless of new leader election + assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch()) } // case 0: test the offsets from leader's append path From 728e7bba18b94eec288d7d4f70b5a277e5971cc6 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 7 Apr 2024 16:09:19 +0800 Subject: [PATCH 28/31] fix failed tests --- .../kafka/admin/ListOffsetsIntegrationTest.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index a215665b28c07..564590b48be81 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -224,9 +224,12 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) - // the epoch is related to the returned offset. - // Hence, it should be zero (the earliest leader epoch), regardless of new leader election - assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch()) + if (version >= RecordBatch.MAGIC_VALUE_V2) + // the epoch is related to the returned offset. + // Hence, it should be zero (the earliest leader epoch), regardless of new leader election + assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch()) + else + assertEquals(Optional.empty(), maxTimestampOffset.leaderEpoch()) } // case 0: test the offsets from leader's append path From fe2d7c9265a24f8e3e9bf16ce8cfb55cf631ee7f Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 8 Apr 2024 07:51:45 +0800 Subject: [PATCH 29/31] fix testResponseIncludesLeaderEpoch --- .../test/scala/unit/kafka/server/ListOffsetsRequestTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index db551b180fcd5..3d8efd664955c 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -208,7 +208,8 @@ class ListOffsetsRequestTest extends BaseRequestTest { // The latest offset reflects the updated epoch assertEquals((10L, secondLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) - assertEquals((9L, secondLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) + // The offset of max timestamp reflects the epoch of batch + assertEquals((9L, firstLeaderId, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) } @Test From 2dee70375deebcec92e02c583258625ffef29544 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 8 Apr 2024 16:20:31 +0800 Subject: [PATCH 30/31] fix failed test --- .../kafka/server/ListOffsetsRequestTest.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 3d8efd664955c..5a559565fc0da 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -182,11 +182,13 @@ class ListOffsetsRequestTest extends BaseRequestTest { TestUtils.generateAndProduceMessages(servers, topic, 9) TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version = -1)) - assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) - assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) + val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, partition, servers) + + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version = -1)) + assertEquals((10L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) + assertEquals((9L, firstLeaderEpoch), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) // Kill the first leader so that we can verify the epoch change when fetching the latest offset killBroker(firstLeaderId) @@ -197,19 +199,19 @@ class ListOffsetsRequestTest extends BaseRequestTest { val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, partition, servers) // No changes to written data - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) - assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 0L, -1)) + assertEquals((0L, firstLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1)) // The latest offset reflects the updated epoch assertEquals((10L, secondLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1)) - // The offset of max timestamp reflects the epoch of batch - assertEquals((9L, firstLeaderId, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) + // No changes of epoch since the offset of max timestamp reflects the epoch of batch + assertEquals((9L, firstLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1)) } @Test From 581242c1fa6c005bf91a7ced96932774c2c02cd9 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 9 Apr 2024 01:17:14 +0800 Subject: [PATCH 31/31] address comments --- .../kafka/admin/ListOffsetsIntegrationTest.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala index ae8251b5fd307..62dbd1a0e393b 100644 --- a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala @@ -83,7 +83,6 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) // the offset of max timestamp is always -1 if the batch version is 0 verifyListOffsets(expectedMaxTimestampOffset = -1) - } @@ -156,8 +155,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { // test LogAppendTime case setUpForLogAppendTimeCase() produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this separate batch test, it'll be the last offset 2 + // In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time + // for each batch, So it'll be the last offset 2 verifyListOffsets(topic = topicNameWithCustomConfigs, 2) } @@ -190,8 +189,8 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { // test LogAppendTime case setUpForLogAppendTimeCase() produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) - // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. - // So in this separate batch test, it'll be the last offset 2 + // In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time + // for each batch, So it'll be the last offset 2 verifyListOffsets(topic = topicNameWithCustomConfigs, 2) }