diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index f34b8e781f081..fb07077d1725b 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors._ +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation * Upon completion, should return whatever data is available for each valid partition */ override def tryComplete(): Boolean = { @@ -118,6 +120,19 @@ class DelayedFetch(delayMs: Long, accumulatedSize += bytesAvailable } } + + // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation + fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch => + val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false) + if (epochEndOffset.error != Errors.NONE || epochEndOffset.hasUndefinedEpochOrOffset) { + debug(s"Could not obtain last offset for leader epoch for partition $topicPartition, epochEndOffset=$epochEndOffset.") + return forceComplete() + } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { + debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " + + s"$topicPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") + return forceComplete() + } + } } } catch { case _: NotLeaderOrFollowerException => // Case A or Case B diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index ce579a6e56c6c..140fba0f0a57d 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -77,7 +77,8 @@ class CachedPartition(val topic: String, var highWatermark: Long, var leaderEpoch: Optional[Integer], var fetcherLogStartOffset: Long, - var localLogStartOffset: Long) + var localLogStartOffset: Long, + var lastFetchedEpoch: Optional[Integer]) extends ImplicitLinkedHashCollection.Element { var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX @@ -89,21 +90,21 @@ class CachedPartition(val topic: String, override def setPrev(prev: Int): Unit = this.cachedPrev = prev def this(topic: String, partition: Int) = - this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1) + this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) def this(part: TopicPartition) = this(part.topic, part.partition) def this(part: TopicPartition, reqData: FetchRequest.PartitionData) = this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, - reqData.currentLeaderEpoch, reqData.logStartOffset, -1) + reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) def this(part: TopicPartition, reqData: FetchRequest.PartitionData, respData: FetchResponse.PartitionData[Records]) = this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, - reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset) + reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch) + def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { // Update our cached request parameters. @@ -111,6 +112,7 @@ class CachedPartition(val topic: String, fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch + lastFetchedEpoch = reqData.lastFetchedEpoch } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9c7307b2f0feb..b9487fe0a5ee4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1048,6 +1048,7 @@ class ReplicaManager(val config: KafkaConfig, // check if this fetch request can be satisfied right away var bytesReadable: Long = 0 var errorReadingData = false + var hasDivergingEpoch = false val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] logReadResults.foreach { case (topicPartition, logReadResult) => brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark() @@ -1055,6 +1056,8 @@ class ReplicaManager(val config: KafkaConfig, if (logReadResult.error != Errors.NONE) errorReadingData = true + if (logReadResult.divergingEpoch.nonEmpty) + hasDivergingEpoch = true bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes logReadResultMap.put(topicPartition, logReadResult) } @@ -1063,7 +1066,8 @@ class ReplicaManager(val config: KafkaConfig, // 2) fetch request does not require any data // 3) has enough data to respond // 4) some error happens while reading data - if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { + // 5) we found a diverging epoch + if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId) tp -> FetchPartitionData( diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 01ebfd15cbfe2..5b9e056a18c63 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -20,11 +20,12 @@ import java.util.Optional import scala.collection.Seq import kafka.cluster.Partition +import kafka.log.LogOffsetSnapshot import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest} import org.easymock.{EasyMock, EasyMockSupport} import org.junit.Test import org.junit.Assert._ @@ -70,7 +71,7 @@ class DelayedFetchTest extends EasyMockSupport { .andThrow(new FencedLeaderEpochException("Requested epoch has been fenced")) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) - expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH) + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH) replayAll() @@ -110,7 +111,7 @@ class DelayedFetchTest extends EasyMockSupport { EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)) .andThrow(new NotLeaderOrFollowerException(s"Replica for $topicPartition not available")) - expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER) + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) replayAll() @@ -120,6 +121,51 @@ class DelayedFetchTest extends EasyMockSupport { assertTrue(fetchResultOpt.isDefined) } + @Test + def testDivergingEpoch(): Unit = { + val topicPartition = new TopicPartition("topic", 0) + val fetchOffset = 500L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](10) + val lastFetchedEpoch = Optional.of[Integer](9) + val replicaId = 1 + + val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) + val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) + + var fetchResultOpt: Option[FetchPartitionData] = None + def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) + } + + val delayedFetch = new DelayedFetch( + delayMs = 500, + fetchMetadata = fetchMetadata, + replicaManager = replicaManager, + quota = replicaQuota, + clientMetadata = None, + responseCallback = callback) + + val partition: Partition = mock(classOf[Partition]) + EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)).andReturn(partition) + val endOffsetMetadata = LogOffsetMetadata(messageOffset = 500L, segmentBaseOffset = 0L, relativePositionInSegment = 500) + EasyMock.expect(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .andReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + EasyMock.expect(partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOnlyFromLeader = false)) + .andReturn(new EpochEndOffset(Errors.NONE, lastFetchedEpoch.get, fetchOffset - 1)) + EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NONE) + replayAll() + + assertTrue(delayedFetch.tryComplete()) + assertTrue(delayedFetch.isCompleted) + assertTrue(fetchResultOpt.isDefined) + } + private def buildFetchMetadata(replicaId: Int, topicPartition: TopicPartition, fetchStatus: FetchPartitionStatus): FetchMetadata = { @@ -133,10 +179,10 @@ class DelayedFetchTest extends EasyMockSupport { fetchPartitionStatus = Seq((topicPartition, fetchStatus))) } - private def expectReadFromReplicaWithError(replicaId: Int, - topicPartition: TopicPartition, - fetchPartitionData: FetchRequest.PartitionData, - error: Errors): Unit = { + private def expectReadFromReplica(replicaId: Int, + topicPartition: TopicPartition, + fetchPartitionData: FetchRequest.PartitionData, + error: Errors): Unit = { EasyMock.expect(replicaManager.readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = true, @@ -146,12 +192,12 @@ class DelayedFetchTest extends EasyMockSupport { readPartitionInfo = Seq((topicPartition, fetchPartitionData)), clientMetadata = None, quota = replicaQuota)) - .andReturn(Seq((topicPartition, buildReadResultWithError(error)))) + .andReturn(Seq((topicPartition, buildReadResult(error)))) } - private def buildReadResultWithError(error: Errors): LogReadResult = { + private def buildReadResult(error: Errors): LogReadResult = { LogReadResult( - exception = Some(error.exception), + exception = if (error != Errors.NONE) Some(error.exception) else None, info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = -1L, diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index c876b9056d2fc..5ef3105e3a0ea 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -188,6 +188,67 @@ class FetchSessionTest { assertEquals(Optional.of(3), epochs3(tp2)) } + @Test + def testLastFetchedEpoch(): Unit = { + val time = new MockTime() + val cache = new FetchSessionCache(10, 1000) + val fetchManager = new FetchManager(time, cache) + + val tp0 = new TopicPartition("foo", 0) + val tp1 = new TopicPartition("foo", 1) + val tp2 = new TopicPartition("bar", 1) + + def cachedLeaderEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = { + val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]] + context.foreachPartition((tp, data) => mapBuilder += tp -> data.currentLeaderEpoch) + mapBuilder.result() + } + + def cachedLastFetchedEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = { + val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]] + context.foreachPartition((tp, data) => mapBuilder += tp -> data.lastFetchedEpoch) + mapBuilder.result() + } + + val request1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + request1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty[Integer], Optional.empty[Integer])) + request1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1), Optional.empty[Integer])) + request1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(2), Optional.of(1))) + + val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, request1, EMPTY_PART_LIST, false) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)), + cachedLeaderEpochs(context1)) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)), + cachedLastFetchedEpochs(context1)) + + val response = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + response.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null)) + response.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 10, 10, 10, null, null)) + response.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 5, 5, 5, null, null)) + + val sessionId = context1.updateAndGenerateResponseData(response).sessionId() + + // With no changes, the cached epochs should remain the same + val request2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + val context2 = fetchManager.newContext(new JFetchMetadata(sessionId, 1), request2, EMPTY_PART_LIST, false) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)), cachedLeaderEpochs(context2)) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)), + cachedLastFetchedEpochs(context2)) + context2.updateAndGenerateResponseData(response).sessionId() + + // Now verify we can change the leader epoch and the context is updated + val request3 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + request3.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.of(6), Optional.of(5))) + request3.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.empty[Integer], Optional.empty[Integer])) + request3.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(3), Optional.of(3))) + + val context3 = fetchManager.newContext(new JFetchMetadata(sessionId, 2), request3, EMPTY_PART_LIST, false) + assertEquals(Map(tp0 -> Optional.of(6), tp1 -> Optional.empty, tp2 -> Optional.of(3)), + cachedLeaderEpochs(context3)) + assertEquals(Map(tp0 -> Optional.of(5), tp1 -> Optional.empty, tp2 -> Optional.of(3)), + cachedLastFetchedEpochs(context2)) + } + @Test def testFetchRequests(): Unit = { val time = new MockTime()