diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index efdc7da2afe1e..fc996453d6470 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { */ public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; + public static final long LATEST_TIERED_TIMESTAMP = -5L; + public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index c0bb9d8cd66c6..f6198ce9d219e 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -41,7 +41,7 @@ import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard} import java.io.{File, IOException} import java.nio.file.{Files, Path} @@ -150,7 +150,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, def localLogStartOffset(): Long = _localLogStartOffset // This is the offset(inclusive) until which segments are copied to the remote storage. - @volatile private var highestOffsetInRemoteStorage: Long = -1L + @volatile private[kafka] var _highestOffsetInRemoteStorage: Long = -1L + + def highestOffsetInRemoteStorage(): Long = _highestOffsetInRemoteStorage locally { def updateLocalLogStartOffset(offset: Long): Unit = { @@ -544,8 +546,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, def updateHighestOffsetInRemoteStorage(offset: Long): Unit = { if (!remoteLogEnabled()) - warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is $highestOffsetInRemoteStorage.") - else if (offset > highestOffsetInRemoteStorage) highestOffsetInRemoteStorage = offset + warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is ${highestOffsetInRemoteStorage()}.") + else if (offset > highestOffsetInRemoteStorage()) _highestOffsetInRemoteStorage = offset } // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be @@ -1279,7 +1281,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && - targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + @@ -1300,18 +1301,39 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() - val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { - val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() - }) - - val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) - else Optional.empty[Integer]() + val epochResult: Optional[Integer] = + if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset) + if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else Optional.empty() + } else { + Optional.empty() + } - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) + } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { + if (remoteLogEnabled()) { + val curHighestRemoteOffset = highestOffsetInRemoteStorage() + + val epochResult: Optional[Integer] = + if (leaderEpochCache.isDefined) { + val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset) + if (epochOpt.isPresent) { + Optional.of(epochOpt.getAsInt) + } else if (curHighestRemoteOffset == -1) { + Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) + } else { + Optional.empty() + } + } else { + Optional.empty() + } + + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)) + } else { + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))) + } } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. @@ -1448,7 +1470,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // 1. they are uploaded to the remote storage // 2. log-start-offset was incremented higher than the largest offset in the candidate segment if (remoteLogEnabled()) { - (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage) || + (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) || allowDeletionDueToLogStartOffsetIncremented } else { true @@ -1582,13 +1604,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, * The log size in bytes for all segments that are only in local log but not yet in remote log. */ def onlyLocalLogSegmentsSize: Long = - UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment])) + UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment])) /** * The number of segments that are only in local log but not yet in remote log. */ def onlyLocalLogSegmentsCount: Long = - logSegments.stream().filter(_.baseOffset >= highestOffsetInRemoteStorage).count() + logSegments.stream().filter(_.baseOffset >= highestOffsetInRemoteStorage()).count() /** * The offset of the next message that will be appended to the log diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 7b92a9e2df794..ffa585a460916 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2127,6 +2127,92 @@ class UnifiedLogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } + @Test + def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) + + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + + val firstTimestamp = mockTime.milliseconds + val leaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + val secondTimestamp = firstTimestamp + 1 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = leaderEpoch) + + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + } + + @Test + def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = { + val remoteLogManager = mock(classOf[RemoteLogManager]) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, + remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) + when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get)) + .thenReturn(Optional.empty[TimestampAndOffset]()) + + assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + + val firstTimestamp = mockTime.milliseconds + val firstLeaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = firstLeaderEpoch) + + val secondTimestamp = firstTimestamp + 1 + val secondLeaderEpoch = 1 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = secondLeaderEpoch) + + when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get))) + .thenAnswer(ans => { + val timestamp = ans.getArgument(1).asInstanceOf[Long] + Optional.of(timestamp) + .filter(_ == firstTimestamp) + .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch))) + }) + log._localLogStartOffset = 1 + log._highestOffsetInRemoteStorage = 0 + + // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. + assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), + log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), + log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager))) + + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + + // The cache can be updated directly after a leader change. + // The new latest offset should reflect the updated epoch. + log.maybeAssignEpochStartOffset(2, 2L) + + assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + } + /** * Test the Log truncate operations */