Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest.PartitionData

Expand Down Expand Up @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long,
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case H: A diverging epoch was found, return response to trigger truncation
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
Expand Down Expand Up @@ -118,6 +120,19 @@ class DelayedFetch(delayMs: Long,
accumulatedSize += bytesAvailable
}
}

// Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
if (epochEndOffset.error != Errors.NONE || epochEndOffset.hasUndefinedEpochOrOffset) {
debug(s"Could not obtain last offset for leader epoch for partition $topicPartition, epochEndOffset=$epochEndOffset.")
return forceComplete()
} else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " +
s"$topicPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
return forceComplete()
}
}
}
} catch {
case _: NotLeaderOrFollowerException => // Case A or Case B
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/kafka/server/FetchSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class CachedPartition(val topic: String,
var highWatermark: Long,
var leaderEpoch: Optional[Integer],
var fetcherLogStartOffset: Long,
var localLogStartOffset: Long)
var localLogStartOffset: Long,
var lastFetchedEpoch: Optional[Integer])
extends ImplicitLinkedHashCollection.Element {

var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX
Expand All @@ -89,28 +90,29 @@ class CachedPartition(val topic: String,
override def setPrev(prev: Int): Unit = this.cachedPrev = prev

def this(topic: String, partition: Int) =
this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1)
this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer])

def this(part: TopicPartition) =
this(part.topic, part.partition)

def this(part: TopicPartition, reqData: FetchRequest.PartitionData) =
this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, -1,
reqData.currentLeaderEpoch, reqData.logStartOffset, -1)
reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch)

def this(part: TopicPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponse.PartitionData[Records]) =
this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark,
reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset)
reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch)

def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch)
def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)

def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
// Update our cached request parameters.
maxBytes = reqData.maxBytes
fetchOffset = reqData.fetchOffset
fetcherLogStartOffset = reqData.logStartOffset
leaderEpoch = reqData.currentLeaderEpoch
lastFetchedEpoch = reqData.lastFetchedEpoch
}

/**
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1048,13 +1048,16 @@ class ReplicaManager(val config: KafkaConfig,
// check if this fetch request can be satisfied right away
var bytesReadable: Long = 0
var errorReadingData = false
var hasDivergingEpoch = false
val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
logReadResults.foreach { case (topicPartition, logReadResult) =>
brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()

if (logReadResult.error != Errors.NONE)
errorReadingData = true
if (logReadResult.divergingEpoch.nonEmpty)
hasDivergingEpoch = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicPartition, logReadResult)
}
Expand All @@ -1063,7 +1066,8 @@ class ReplicaManager(val config: KafkaConfig,
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
// 5) we found a diverging epoch
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
tp -> FetchPartitionData(
Expand Down
66 changes: 56 additions & 10 deletions core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import java.util.Optional

import scala.collection.Seq
import kafka.cluster.Partition
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest}
import org.easymock.{EasyMock, EasyMockSupport}
import org.junit.Test
import org.junit.Assert._
Expand Down Expand Up @@ -70,7 +71,7 @@ class DelayedFetchTest extends EasyMockSupport {
.andThrow(new FencedLeaderEpochException("Requested epoch has been fenced"))
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)

expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH)
expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH)

replayAll()

Expand Down Expand Up @@ -110,7 +111,7 @@ class DelayedFetchTest extends EasyMockSupport {

EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
.andThrow(new NotLeaderOrFollowerException(s"Replica for $topicPartition not available"))
expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER)
expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER)
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)

replayAll()
Expand All @@ -120,6 +121,51 @@ class DelayedFetchTest extends EasyMockSupport {
assertTrue(fetchResultOpt.isDefined)
}

@Test
def testDivergingEpoch(): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
val currentLeaderEpoch = Optional.of[Integer](10)
val lastFetchedEpoch = Optional.of[Integer](9)
val replicaId = 1

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))
val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)

var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}

val delayedFetch = new DelayedFetch(
delayMs = 500,
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback)

val partition: Partition = mock(classOf[Partition])
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)).andReturn(partition)
val endOffsetMetadata = LogOffsetMetadata(messageOffset = 500L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
EasyMock.expect(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
.andReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
EasyMock.expect(partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOnlyFromLeader = false))
.andReturn(new EpochEndOffset(Errors.NONE, lastFetchedEpoch.get, fetchOffset - 1))
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NONE)
replayAll()

assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)
}

private def buildFetchMetadata(replicaId: Int,
topicPartition: TopicPartition,
fetchStatus: FetchPartitionStatus): FetchMetadata = {
Expand All @@ -133,10 +179,10 @@ class DelayedFetchTest extends EasyMockSupport {
fetchPartitionStatus = Seq((topicPartition, fetchStatus)))
}

private def expectReadFromReplicaWithError(replicaId: Int,
topicPartition: TopicPartition,
fetchPartitionData: FetchRequest.PartitionData,
error: Errors): Unit = {
private def expectReadFromReplica(replicaId: Int,
topicPartition: TopicPartition,
fetchPartitionData: FetchRequest.PartitionData,
error: Errors): Unit = {
EasyMock.expect(replicaManager.readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = true,
Expand All @@ -146,12 +192,12 @@ class DelayedFetchTest extends EasyMockSupport {
readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
clientMetadata = None,
quota = replicaQuota))
.andReturn(Seq((topicPartition, buildReadResultWithError(error))))
.andReturn(Seq((topicPartition, buildReadResult(error))))
}

private def buildReadResultWithError(error: Errors): LogReadResult = {
private def buildReadResult(error: Errors): LogReadResult = {
LogReadResult(
exception = Some(error.exception),
exception = if (error != Errors.NONE) Some(error.exception) else None,
info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = -1L,
Expand Down
61 changes: 61 additions & 0 deletions core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,67 @@ class FetchSessionTest {
assertEquals(Optional.of(3), epochs3(tp2))
}

@Test
def testLastFetchedEpoch(): Unit = {
val time = new MockTime()
val cache = new FetchSessionCache(10, 1000)
val fetchManager = new FetchManager(time, cache)

val tp0 = new TopicPartition("foo", 0)
val tp1 = new TopicPartition("foo", 1)
val tp2 = new TopicPartition("bar", 1)

def cachedLeaderEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = {
val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]]
context.foreachPartition((tp, data) => mapBuilder += tp -> data.currentLeaderEpoch)
mapBuilder.result()
}

def cachedLastFetchedEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = {
val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]]
context.foreachPartition((tp, data) => mapBuilder += tp -> data.lastFetchedEpoch)
mapBuilder.result()
}

val request1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
request1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty[Integer], Optional.empty[Integer]))
request1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1), Optional.empty[Integer]))
request1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(2), Optional.of(1)))

val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, request1, EMPTY_PART_LIST, false)
assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)),
cachedLeaderEpochs(context1))
assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)),
cachedLastFetchedEpochs(context1))

val response = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
response.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
response.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 10, 10, 10, null, null))
response.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 5, 5, 5, null, null))

val sessionId = context1.updateAndGenerateResponseData(response).sessionId()

// With no changes, the cached epochs should remain the same
val request2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val context2 = fetchManager.newContext(new JFetchMetadata(sessionId, 1), request2, EMPTY_PART_LIST, false)
assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)), cachedLeaderEpochs(context2))
assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)),
cachedLastFetchedEpochs(context2))
context2.updateAndGenerateResponseData(response).sessionId()

// Now verify we can change the leader epoch and the context is updated
val request3 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
request3.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.of(6), Optional.of(5)))
request3.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.empty[Integer], Optional.empty[Integer]))
request3.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(3), Optional.of(3)))

val context3 = fetchManager.newContext(new JFetchMetadata(sessionId, 2), request3, EMPTY_PART_LIST, false)
assertEquals(Map(tp0 -> Optional.of(6), tp1 -> Optional.empty, tp2 -> Optional.of(3)),
cachedLeaderEpochs(context3))
assertEquals(Map(tp0 -> Optional.of(5), tp1 -> Optional.empty, tp2 -> Optional.of(3)),
cachedLastFetchedEpochs(context2))
}

@Test
def testFetchRequests(): Unit = {
val time = new MockTime()
Expand Down