Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ class Log(@volatile private var _dir: File,

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile)
new LeaderEpochFileCache(topicPartition, checkpointFile)
}

if (recordVersion.precedes(RecordVersion.V2)) {
Expand Down Expand Up @@ -1348,7 +1348,7 @@ class Log(@volatile private var _dir: File,

def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
leaderEpochCache.flatMap { cache =>
val (foundEpoch, foundOffset) = cache.endOffsetFor(leaderEpoch)
val (foundEpoch, foundOffset) = cache.endOffsetFor(leaderEpoch, logEndOffset)
if (foundOffset == UNDEFINED_EPOCH_OFFSET)
None
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ import scala.jdk.CollectionConverters._
*
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
* @param logEndOffset function to fetch the current log end offset
*/
class LeaderEpochFileCache(topicPartition: TopicPartition,
logEndOffset: () => Long,
checkpoint: LeaderEpochCheckpoint) extends Logging {
this.logIdent = s"[LeaderEpochCache $topicPartition] "

Expand Down Expand Up @@ -184,9 +182,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
* so that the follower falls back to High Water Mark.
*
* @param requestedEpoch requested leader epoch
* @return found leader epoch and end offset
* @param logEndOffset the existing Log End Offset
* @return found leader epoch and end offset
*/
def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
def endOffsetFor(requestedEpoch: Int, logEndOffset: Long): (Int, Long) = {
inReadLock(lock) {
val epochAndOffset =
if (requestedEpoch == UNDEFINED_EPOCH) {
Expand All @@ -198,7 +197,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
// Followers should not have any reason to query for the end offset of the current epoch, but a consumer
// might if it is verifying its committed offset following a group rebalance. In this case, we return
// the current log end offset which makes the truncation check work as expected.
(requestedEpoch, logEndOffset())
(requestedEpoch, logEndOffset)
} else {
val higherEntry = epochs.higherEntry(requestedEpoch)
if (higherEntry == null) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class LogSegmentTest {
override def read(): Seq[EpochEntry] = this.epochs
}

val cache = new LeaderEpochFileCache(topicPartition, () => seg.readNextOffset, checkpoint)
val cache = new LeaderEpochFileCache(topicPartition, checkpoint)
seg.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 104L, records = MemoryRecords.withRecords(104L, CompressionType.NONE, 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ import org.junit.jupiter.api.Test
*/
class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
private var logEndOffset = 0L
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq
override def read(): Seq[EpochEntry] = this.epochs
}
private val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)
private val cache = new LeaderEpochFileCache(tp, checkpoint)

@Test
def testPreviousEpoch(): Unit = {
Expand All @@ -63,23 +62,23 @@ class LeaderEpochFileCacheTest {
def shouldAddEpochAndMessageOffsetToCache() = {
//When
cache.assign(epoch = 2, startOffset = 10)
logEndOffset = 11
val logEndOffset = 11

//Then
assertEquals(Some(2), cache.latestEpoch)
assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //should match logEndOffset
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset)) //should match logEndOffset
}

@Test
def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
//When just one epoch
cache.assign(epoch = 2, startOffset = 11)
cache.assign(epoch = 2, startOffset = 12)
logEndOffset = 14
val logEndOffset = 14

//Then
assertEquals((2, logEndOffset), cache.endOffsetFor(2))
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset))
}

@Test
Expand All @@ -91,7 +90,7 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 3, startOffset = 12)

//When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH, 0L)

//Then
assertEquals(expectedEpochEndOffset,
Expand All @@ -101,7 +100,7 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = {
//Given
logEndOffset = 9
val logEndOffset = 9

cache.assign(2, logEndOffset)

Expand Down Expand Up @@ -139,15 +138,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldReturnUnsupportedIfNoEpochRecorded(): Unit = {
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0, 0L))
}

@Test
def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(): Unit = {
logEndOffset = 73

//When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH, 73)

//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET),
Expand All @@ -161,7 +158,7 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 7, startOffset = 13)

//When
val epochAndOffset = cache.endOffsetFor(4)
val epochAndOffset = cache.endOffsetFor(4, 0L)

//Then
assertEquals((4, 11), epochAndOffset)
Expand All @@ -176,8 +173,8 @@ class LeaderEpochFileCacheTest {
// epoch 7 starts at an earlier offset
cache.assign(epoch = 7, startOffset = 12)

assertEquals((5, 12), cache.endOffsetFor(5))
assertEquals((5, 12), cache.endOffsetFor(6))
assertEquals((5, 12), cache.endOffsetFor(5, 0L))
assertEquals((5, 12), cache.endOffsetFor(6, 0L))
}

@Test
Expand All @@ -189,10 +186,9 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 2, startOffset = 14)
cache.assign(epoch = 3, startOffset = 15)
cache.assign(epoch = 3, startOffset = 16)
logEndOffset = 17

//Then get the start offset of the next epoch
assertEquals((2, 15), cache.endOffsetFor(2))
assertEquals((2, 15), cache.endOffsetFor(2, 17))
}

@Test
Expand All @@ -203,9 +199,9 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 4, startOffset = 17)

//Then
assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1))
assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 2))
assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 3))
assertEquals((0, 13), cache.endOffsetFor(1, 0L))
assertEquals((2, 17), cache.endOffsetFor(2, 0L))
assertEquals((2, 17), cache.endOffsetFor(3, 0L))
}

@Test
Expand All @@ -221,23 +217,21 @@ class LeaderEpochFileCacheTest {

@Test
def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
logEndOffset = 100

//When
cache.assign(epoch = 2, startOffset = 100)

//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3, 100))
}

@Test
def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
//When
cache.assign(epoch = 2, startOffset = 6)
logEndOffset = 7
val logEndOffset = 7

//Then
assertEquals((2, logEndOffset), cache.endOffsetFor(2))
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset))
assertEquals(1, cache.epochEntries.size)
assertEquals(EpochEntry(2, 6), cache.epochEntries(0))
}
Expand All @@ -248,12 +242,12 @@ class LeaderEpochFileCacheTest {
val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))

//Given
val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)
val cache = new LeaderEpochFileCache(tp, checkpoint)
cache.assign(epoch = 2, startOffset = 6)

//When
val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
val cache2 = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint2)
val cache2 = new LeaderEpochFileCache(tp, checkpoint2)

//Then
assertEquals(1, cache2.epochEntries.size)
Expand All @@ -263,24 +257,27 @@ class LeaderEpochFileCacheTest {
@Test
def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
//Given
cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
cache.assign(epoch = 1, startOffset = 5);
var logEndOffset = 6
cache.assign(epoch = 2, startOffset = 6);
logEndOffset = 7

//When we update an epoch in the past with a different offset, the log has already reached
//an inconsistent state. Our options are either to raise an error, ignore the new append,
//or truncate the cached epochs to the point of conflict. We take this latter approach in
//order to guarantee that epochs and offsets in the cache increase monotonically, which makes
//the search logic simpler to reason about.
cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8
cache.assign(epoch = 1, startOffset = 7);
logEndOffset = 8

//Then later epochs will be removed
assertEquals(Some(1), cache.latestEpoch)

//Then end offset for epoch 1 will have changed
assertEquals((1, 8), cache.endOffsetFor(1))
assertEquals((1, 8), cache.endOffsetFor(1, logEndOffset))

//Then end offset for epoch 2 is now undefined
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2, logEndOffset))
assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
}

Expand All @@ -296,6 +293,8 @@ class LeaderEpochFileCacheTest {

@Test
def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = {
var logEndOffset = 0L

//Given
cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0

Expand All @@ -305,63 +304,72 @@ class LeaderEpochFileCacheTest {
//Then epoch should go up
assertEquals(Some(1), cache.latestEpoch)
//offset for 1 should still be 0
assertEquals((1, 0), cache.endOffsetFor(1))
assertEquals((1, 0), cache.endOffsetFor(1, logEndOffset))
//offset for epoch 0 should still be 0
assertEquals((0, 0), cache.endOffsetFor(0))
assertEquals((0, 0), cache.endOffsetFor(0, logEndOffset))

//When we write 5 messages as epoch 1
logEndOffset = 5
logEndOffset = 5L

//Then end offset for epoch(1) should be logEndOffset => 5
assertEquals((1, 5), cache.endOffsetFor(1))
assertEquals((1, 5), cache.endOffsetFor(1, logEndOffset))
//Epoch 0 should still be at offset 0
assertEquals((0, 0), cache.endOffsetFor(0))
assertEquals((0, 0), cache.endOffsetFor(0, logEndOffset))

//When
cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5

logEndOffset = 10 //write another 5 messages

//Then end offset for epoch(2) should be logEndOffset => 10
assertEquals((2, 10), cache.endOffsetFor(2))
assertEquals((2, 10), cache.endOffsetFor(2, logEndOffset))

//end offset for epoch(1) should be the start offset of epoch(2) => 5
assertEquals((1, 5), cache.endOffsetFor(1))
assertEquals((1, 5), cache.endOffsetFor(1, logEndOffset))

//epoch (0) should still be 0
assertEquals((0, 0), cache.endOffsetFor(0))
assertEquals((0, 0), cache.endOffsetFor(0, logEndOffset))
}

@Test
def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
//When Messages come in
cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1
cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2
cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3
cache.assign(epoch = 0, startOffset = 0);
var logEndOffset = 1
cache.assign(epoch = 0, startOffset = 1);
logEndOffset = 2
cache.assign(epoch = 0, startOffset = 2);
logEndOffset = 3

//Then epoch should stay, offsets should grow
assertEquals(Some(0), cache.latestEpoch)
assertEquals((0, logEndOffset), cache.endOffsetFor(0))
assertEquals((0, logEndOffset), cache.endOffsetFor(0, logEndOffset))

//When messages arrive with greater epoch
cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4
cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5
cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
cache.assign(epoch = 1, startOffset = 3);
logEndOffset = 4
cache.assign(epoch = 1, startOffset = 4);
logEndOffset = 5
cache.assign(epoch = 1, startOffset = 5);
logEndOffset = 6

assertEquals(Some(1), cache.latestEpoch)
assertEquals((1, logEndOffset), cache.endOffsetFor(1))
assertEquals((1, logEndOffset), cache.endOffsetFor(1, logEndOffset))

//When
cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8
cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9
cache.assign(epoch = 2, startOffset = 6);
logEndOffset = 7
cache.assign(epoch = 2, startOffset = 7);
logEndOffset = 8
cache.assign(epoch = 2, startOffset = 8);
logEndOffset = 9

assertEquals(Some(2), cache.latestEpoch)
assertEquals((2, logEndOffset), cache.endOffsetFor(2))
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset))

//Older epochs should return the start offset of the first message in the subsequent epoch.
assertEquals((0, 3), cache.endOffsetFor(0))
assertEquals((1, 6), cache.endOffsetFor(1))
assertEquals((0, 3), cache.endOffsetFor(0, logEndOffset))
assertEquals((1, 6), cache.endOffsetFor(1, logEndOffset))
}

@Test
Expand Down Expand Up @@ -556,7 +564,7 @@ class LeaderEpochFileCacheTest {
@Test
def shouldFetchEndOffsetOfEmptyCache(): Unit = {
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7, 0L))
}

@Test
Expand Down