Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
*/
public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;

public static final long LATEST_TIERED_TIMESTAMP = -5L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, if we add a new targetTimestamp value, we will need to bump up the version of the ListOffsetsRequest. See https://github.com/apache/kafka/pull/10760/files. Otherwise, a client could be setting LATEST_TIERED_TIMESTAMP and assuming that the server supports it, but the server actually does not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks a lot for bringing this up in the mailing list and here, I will open a pull request to amend this miss!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for following up, @clolov !


public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;

Expand Down
56 changes: 39 additions & 17 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}

import java.io.{File, IOException}
import java.nio.file.{Files, Path}
Expand Down Expand Up @@ -150,7 +150,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
def localLogStartOffset(): Long = _localLogStartOffset

// This is the offset(inclusive) until which segments are copied to the remote storage.
@volatile private var highestOffsetInRemoteStorage: Long = -1L
@volatile private[kafka] var _highestOffsetInRemoteStorage: Long = -1L

def highestOffsetInRemoteStorage(): Long = _highestOffsetInRemoteStorage

locally {
def updateLocalLogStartOffset(offset: Long): Unit = {
Expand Down Expand Up @@ -544,8 +546,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,

def updateHighestOffsetInRemoteStorage(offset: Long): Unit = {
if (!remoteLogEnabled())
warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is $highestOffsetInRemoteStorage.")
else if (offset > highestOffsetInRemoteStorage) highestOffsetInRemoteStorage = offset
warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is ${highestOffsetInRemoteStorage()}.")
else if (offset > highestOffsetInRemoteStorage()) _highestOffsetInRemoteStorage = offset
}

// Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be
Expand Down Expand Up @@ -1279,7 +1281,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,

if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a bug fix, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, Kamal called this out

targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP)
throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
Expand All @@ -1300,18 +1301,39 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
val curLocalLogStartOffset = localLogStartOffset()

val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => {
val epoch = cache.epochForOffset(curLocalLogStartOffset)
if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]()
})

val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
else Optional.empty[Integer]()
val epochResult: Optional[Integer] =
if (leaderEpochCache.isDefined) {
val epochOpt = leaderEpochCache.get.epochForOffset(curLocalLogStartOffset)
if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else Optional.empty()
} else {
Optional.empty()
}

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt))
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
if (remoteLogEnabled()) {
val curHighestRemoteOffset = highestOffsetInRemoteStorage()

val epochResult: Optional[Integer] =
if (leaderEpochCache.isDefined) {
val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset)
if (epochOpt.isPresent) {
Optional.of(epochOpt.getAsInt)
} else if (curHighestRemoteOffset == -1) {
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
} else {
Optional.empty()
}
} else {
Optional.empty()
}

Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))
} else {
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))
}
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
Expand Down Expand Up @@ -1448,7 +1470,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// 1. they are uploaded to the remote storage
// 2. log-start-offset was incremented higher than the largest offset in the candidate segment
if (remoteLogEnabled()) {
(upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage) ||
(upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) ||
allowDeletionDueToLogStartOffsetIncremented
} else {
true
Expand Down Expand Up @@ -1582,13 +1604,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* The log size in bytes for all segments that are only in local log but not yet in remote log.
*/
def onlyLocalLogSegmentsSize: Long =
UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage).collect(Collectors.toList[LogSegment]))
UnifiedLog.sizeInBytes(logSegments.stream.filter(_.baseOffset >= highestOffsetInRemoteStorage()).collect(Collectors.toList[LogSegment]))

/**
* The number of segments that are only in local log but not yet in remote log.
*/
def onlyLocalLogSegmentsCount: Long =
logSegments.stream().filter(_.baseOffset >= highestOffsetInRemoteStorage).count()
logSegments.stream().filter(_.baseOffset >= highestOffsetInRemoteStorage()).count()

/**
* The offset of the next message that will be appended to the log
Expand Down
86 changes: 86 additions & 0 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,92 @@ class UnifiedLogTest {
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
}

@Test
def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)

assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))

val firstTimestamp = mockTime.milliseconds
val leaderEpoch = 0
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = firstTimestamp),
leaderEpoch = leaderEpoch)

val secondTimestamp = firstTimestamp + 1
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = secondTimestamp),
leaderEpoch = leaderEpoch)

assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
}

@Test
def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could be combined with testFetchOffsetByTimestampFromRemoteStorage as the only difference it has are lines 2167, 2193, 2203 and 2204. Let me know your thoughts!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maintain two tests for clarity. Can you add comments for L2197 log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager)), that it search for offset from both remote and local storage?

val remoteLogManager = mock(classOf[RemoteLogManager])
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get))
.thenReturn(Optional.empty[TimestampAndOffset]())

assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))

val firstTimestamp = mockTime.milliseconds
val firstLeaderEpoch = 0
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = firstTimestamp),
leaderEpoch = firstLeaderEpoch)

val secondTimestamp = firstTimestamp + 1
val secondLeaderEpoch = 1
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = secondTimestamp),
leaderEpoch = secondLeaderEpoch)

when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get)))
.thenAnswer(ans => {
val timestamp = ans.getArgument(1).asInstanceOf[Long]
Optional.of(timestamp)
.filter(_ == firstTimestamp)
.map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch)))
})
log._localLogStartOffset = 1
log._highestOffsetInRemoteStorage = 0

// In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage.
assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager)))

assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))

// The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L)

assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
}

/**
* Test the Log truncate operations
*/
Expand Down