From a929b4b4a817c3fcc783c2e3a95b3bf41695402a Mon Sep 17 00:00:00 2001 From: Christo Date: Wed, 17 Jan 2024 13:07:01 +0000 Subject: [PATCH 1/3] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP --- .../common/requests/ListOffsetsRequest.java | 2 + .../src/main/scala/kafka/log/UnifiedLog.scala | 40 ++++++--- .../scala/unit/kafka/log/UnifiedLogTest.scala | 88 +++++++++++++++++++ 3 files changed, 116 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index efdc7da2afe1e..fc996453d6470 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { */ public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; + public static final long LATEST_TIERED_TIMESTAMP = -5L; + public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index c0bb9d8cd66c6..98e5fc6b1fbe8 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -41,7 +41,7 @@ import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} import java.io.{File, IOException} import java.nio.file.{Files, Path} @@ -150,7 +150,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, def localLogStartOffset(): Long = _localLogStartOffset // This is the offset(inclusive) until which segments are copied to the remote storage. - @volatile private var highestOffsetInRemoteStorage: Long = -1L + @volatile private[kafka] var _highestOffsetInRemoteStorage: Long = -1L + + def highestOffsetInRemoteStorage(): Long = _highestOffsetInRemoteStorage locally { def updateLocalLogStartOffset(offset: Long): Unit = { @@ -544,8 +546,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, def updateHighestOffsetInRemoteStorage(offset: Long): Unit = { if (!remoteLogEnabled()) - warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is $highestOffsetInRemoteStorage.") - else if (offset > highestOffsetInRemoteStorage) highestOffsetInRemoteStorage = offset + warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is ${highestOffsetInRemoteStorage()}.") + else if (offset > highestOffsetInRemoteStorage()) _highestOffsetInRemoteStorage = offset } // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be @@ -1280,7 +1282,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && - targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) + targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP && + targetTimestamp != ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $IBP_0_10_0_IV0") @@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() - val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { + val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() + if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() }) - val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) - else Optional.empty[Integer]() - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) + } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { + if (remoteLogEnabled()) { + val curHighestRemoteOffset = highestOffsetInRemoteStorage() + + val optEpoch: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { + val epoch = cache.epochForOffset(curHighestRemoteOffset) + if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() + }) + + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, highestOffsetInRemoteStorage(), optEpoch)) + } else { + Option.empty + } } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. @@ -1448,7 +1460,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // 1. they are uploaded to the remote storage // 2. log-start-offset was incremented higher than the largest offset in the candidate segment if (remoteLogEnabled()) { - (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage) || + (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) || allowDeletionDueToLogStartOffsetIncremented } else { true @@ -1582,13 +1594,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, * The log size in bytes for all segments that are only in local log but not yet in remote log. */ def onlyLocalLogSegmentsSize: Long = - UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment])) + UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment])) /** * The number of segments that are only in local log but not yet in remote log. */ def onlyLocalLogSegmentsCount: Long = - logSegments.stream().filter(_.baseOffset >= highestOffsetInRemoteStorage).count() + logSegments.stream().filter(_.baseOffset >= highestOffsetInRemoteStorage()).count() /** * The offset of the next message that will be appended to the log diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 7b92a9e2df794..5065339308bde 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2127,6 +2127,94 @@ class UnifiedLogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } + @Test + def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) + + assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + + val firstTimestamp = mockTime.milliseconds + val leaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + val secondTimestamp = firstTimestamp + 1 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = leaderEpoch) + + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + } + + @Test + def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = { + val remoteLogManager = mock(classOf[RemoteLogManager]) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, + remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) + when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get)) + .thenReturn(Optional.empty[TimestampAndOffset]()) + + assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + + val firstTimestamp = mockTime.milliseconds + val firstLeaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = firstLeaderEpoch) + + val secondTimestamp = firstTimestamp + 1 + val secondLeaderEpoch = 1 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = secondLeaderEpoch) + + when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get))) + .thenAnswer(ans => { + val timestamp = ans.getArgument(1).asInstanceOf[Long] + Optional.of(timestamp) + .filter(_ == firstTimestamp) + .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch))) + }) + log._localLogStartOffset = 1 + log._highestOffsetInRemoteStorage = 0 + + assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), + log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), + log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager))) + + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + + // The cache can be updated directly after a leader change. + // The new latest offset should reflect the updated epoch. + log.maybeAssignEpochStartOffset(2, 2L) + + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + } + /** * Test the Log truncate operations */ From 7270351554d33e9daa10c0ce49e63ad024002b58 Mon Sep 17 00:00:00 2001 From: Christo Date: Tue, 23 Jan 2024 11:52:47 +0000 Subject: [PATCH 2/3] Address comments from first round of reviews --- .../src/main/scala/kafka/log/UnifiedLog.scala | 28 +++++++++---------- .../scala/unit/kafka/log/UnifiedLogTest.scala | 14 ++++------ 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 98e5fc6b1fbe8..c8a3c5eaf2f59 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1281,9 +1281,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && - targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && - targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP && - targetTimestamp != ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) + targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $IBP_0_10_0_IV0") @@ -1303,26 +1301,28 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() - val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { - val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() - }) + var epochResult: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) + if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset) + if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt) + } - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { if (remoteLogEnabled()) { val curHighestRemoteOffset = highestOffsetInRemoteStorage() - val optEpoch: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { - val epoch = cache.epochForOffset(curHighestRemoteOffset) - if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() - }) + var epochResult: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) + if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset) + if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt) + } - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, highestOffsetInRemoteStorage(), optEpoch)) + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)) } else { - Option.empty + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))) } } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 5065339308bde..9f6cebf50c205 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2132,7 +2132,8 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) val firstTimestamp = mockTime.milliseconds val leaderEpoch = 0 @@ -2147,12 +2148,8 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = leaderEpoch) - log.appendAsLeader(TestUtils.singletonRecords( - value = TestUtils.randomBytes(10), - timestamp = firstTimestamp), - leaderEpoch = leaderEpoch) - - assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) } @Test @@ -2165,7 +2162,7 @@ class UnifiedLogTest { .thenReturn(Optional.empty[TimestampAndOffset]()) assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.of(-1))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds @@ -2193,6 +2190,7 @@ class UnifiedLogTest { log._localLogStartOffset = 1 log._highestOffsetInRemoteStorage = 0 + // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager))) assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), From ac2014133d778eea5a6f4fcff56d70fecdb65cec Mon Sep 17 00:00:00 2001 From: Christo Date: Wed, 7 Feb 2024 14:20:05 +0000 Subject: [PATCH 3/3] Address comments from second round of reviews --- .../src/main/scala/kafka/log/UnifiedLog.scala | 30 ++++++++++++------- .../scala/unit/kafka/log/UnifiedLogTest.scala | 2 +- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index c8a3c5eaf2f59..f6198ce9d219e 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1301,11 +1301,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() - var epochResult: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) - if (leaderEpochCache.isDefined) { - val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset) - if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt) - } + val epochResult: Optional[Integer] = + if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset) + if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else Optional.empty() + } else { + Optional.empty() + } Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { @@ -1314,11 +1316,19 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (remoteLogEnabled()) { val curHighestRemoteOffset = highestOffsetInRemoteStorage() - var epochResult: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) - if (leaderEpochCache.isDefined) { - val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset) - if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt) - } + val epochResult: Optional[Integer] = + if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset) + if (epochOpt.isPresent) { + Optional.of(epochOpt.getAsInt) + } else if (curHighestRemoteOffset == -1) { + Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) + } else { + Optional.empty() + } + } else { + Optional.empty() + } Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)) } else { diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 9f6cebf50c205..ffa585a460916 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2162,7 +2162,7 @@ class UnifiedLogTest { .thenReturn(Optional.empty[TimestampAndOffset]()) assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) - assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.of(-1))), + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds