From f4b48bc38d71af7fdf3bca38b19be63356208dd2 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 14 Oct 2020 17:05:10 +0100 Subject: [PATCH 1/4] MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch --- .../scala/kafka/server/DelayedFetch.scala | 11 +++- .../scala/kafka/server/FetchSession.scala | 10 +-- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/server/DelayedFetchTest.scala | 63 +++++++++++++++---- .../unit/kafka/server/FetchSessionTest.scala | 61 ++++++++++++++++++ .../server/ReplicaManagerQuotasTest.scala | 2 +- 6 files changed, 131 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index f34b8e781f081..ca54318d96281 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -27,11 +27,12 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import scala.collection._ -case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) { +case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData, hasDivergingEpoch: Boolean) { override def toString: String = { "[startOffsetMetadata: " + startOffsetMetadata + ", fetchInfo: " + fetchInfo + + ", hasDivergingEpoch: " + hasDivergingEpoch + "]" } } @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation * Upon completion, should return whatever data is available for each valid partition */ override def tryComplete(): Boolean = { @@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long, try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition) + + // Case H: Return diverging epoch in response to trigger truncation + if (fetchStatus.hasDivergingEpoch) { + debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition $topicPartition.") + return forceComplete() + } + val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader) val endOffset = fetchMetadata.fetchIsolation match { diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index ce579a6e56c6c..de5fae0d57de2 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -77,7 +77,8 @@ class CachedPartition(val topic: String, var highWatermark: Long, var leaderEpoch: Optional[Integer], var fetcherLogStartOffset: Long, - var localLogStartOffset: Long) + var localLogStartOffset: Long, + var lastFetchedEpoch: Optional[Integer] = Optional.empty[Integer]) extends ImplicitLinkedHashCollection.Element { var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX @@ -96,14 +97,14 @@ class CachedPartition(val topic: String, def this(part: TopicPartition, reqData: FetchRequest.PartitionData) = this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, - reqData.currentLeaderEpoch, reqData.logStartOffset, -1) + reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) def this(part: TopicPartition, reqData: FetchRequest.PartitionData, respData: FetchResponse.PartitionData[Records]) = this(part.topic, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, - reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset) + reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch) + def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { // Update our cached request parameters. @@ -111,6 +112,7 @@ class CachedPartition(val topic: String, fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch + lastFetchedEpoch = reqData.lastFetchedEpoch } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9c7307b2f0feb..5527704530915 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig, fetchInfos.foreach { case (topicPartition, partitionData) => logReadResultMap.get(topicPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) + fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData, logReadResult.divergingEpoch.nonEmpty)) }) } val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 01ebfd15cbfe2..18f0a5229af31 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -44,7 +44,9 @@ class DelayedFetchTest extends EasyMockSupport { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch), + hasDivergingEpoch = false + ) val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) var fetchResultOpt: Option[FetchPartitionData] = None @@ -70,7 +72,7 @@ class DelayedFetchTest extends EasyMockSupport { .andThrow(new FencedLeaderEpochException("Requested epoch has been fenced")) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) - expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH) + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.FENCED_LEADER_EPOCH) replayAll() @@ -92,7 +94,8 @@ class DelayedFetchTest extends EasyMockSupport { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch), + hasDivergingEpoch = false) val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) var fetchResultOpt: Option[FetchPartitionData] = None @@ -110,7 +113,7 @@ class DelayedFetchTest extends EasyMockSupport { EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)) .andThrow(new NotLeaderOrFollowerException(s"Replica for $topicPartition not available")) - expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER) + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) replayAll() @@ -120,6 +123,44 @@ class DelayedFetchTest extends EasyMockSupport { assertTrue(fetchResultOpt.isDefined) } + @Test + def testDivergingEpoch(): Unit = { + val topicPartition = new TopicPartition("topic", 0) + val fetchOffset = 500L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](10) + val replicaId = 1 + + val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch), + hasDivergingEpoch = true) + val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) + + var fetchResultOpt: Option[FetchPartitionData] = None + def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) + } + + val delayedFetch = new DelayedFetch( + delayMs = 500, + fetchMetadata = fetchMetadata, + replicaManager = replicaManager, + quota = replicaQuota, + clientMetadata = None, + responseCallback = callback) + + val partition: Partition = mock(classOf[Partition]) + EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)).andReturn(partition) + EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NONE) + replayAll() + + assertTrue(delayedFetch.tryComplete()) + assertTrue(delayedFetch.isCompleted) + assertTrue(fetchResultOpt.isDefined) + } + private def buildFetchMetadata(replicaId: Int, topicPartition: TopicPartition, fetchStatus: FetchPartitionStatus): FetchMetadata = { @@ -133,10 +174,10 @@ class DelayedFetchTest extends EasyMockSupport { fetchPartitionStatus = Seq((topicPartition, fetchStatus))) } - private def expectReadFromReplicaWithError(replicaId: Int, - topicPartition: TopicPartition, - fetchPartitionData: FetchRequest.PartitionData, - error: Errors): Unit = { + private def expectReadFromReplica(replicaId: Int, + topicPartition: TopicPartition, + fetchPartitionData: FetchRequest.PartitionData, + error: Errors): Unit = { EasyMock.expect(replicaManager.readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = true, @@ -146,12 +187,12 @@ class DelayedFetchTest extends EasyMockSupport { readPartitionInfo = Seq((topicPartition, fetchPartitionData)), clientMetadata = None, quota = replicaQuota)) - .andReturn(Seq((topicPartition, buildReadResultWithError(error)))) + .andReturn(Seq((topicPartition, buildReadResult(error)))) } - private def buildReadResultWithError(error: Errors): LogReadResult = { + private def buildReadResult(error: Errors): LogReadResult = { LogReadResult( - exception = Some(error.exception), + exception = if (error != Errors.NONE) Some(error.exception) else None, info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = -1L, diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index c876b9056d2fc..01c5aaddd7466 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -188,6 +188,67 @@ class FetchSessionTest { assertEquals(Optional.of(3), epochs3(tp2)) } + @Test + def testLastFetchedEpoch(): Unit = { + val time = new MockTime() + val cache = new FetchSessionCache(10, 1000) + val fetchManager = new FetchManager(time, cache) + + val tp0 = new TopicPartition("foo", 0) + val tp1 = new TopicPartition("foo", 1) + val tp2 = new TopicPartition("bar", 1) + + def cachedLeaderEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = { + val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]] + context.foreachPartition((tp, data) => mapBuilder += tp -> data.currentLeaderEpoch) + mapBuilder.result() + } + + def cachedLastFetchedEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = { + val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]] + context.foreachPartition((tp, data) => mapBuilder += tp -> data.lastFetchedEpoch) + mapBuilder.result() + } + + val request1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + request1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty, Optional.empty)) + request1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1), Optional.empty)) + request1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(2), Optional.of(1))) + + val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, request1, EMPTY_PART_LIST, false) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)), + cachedLeaderEpochs(context1)) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)), + cachedLastFetchedEpochs(context1)) + + val response = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + response.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null)) + response.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 10, 10, 10, null, null)) + response.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 5, 5, 5, null, null)) + + val sessionId = context1.updateAndGenerateResponseData(response).sessionId() + + // With no changes, the cached epochs should remain the same + val request2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + val context2 = fetchManager.newContext(new JFetchMetadata(sessionId, 1), request2, EMPTY_PART_LIST, false) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.of(1), tp2 -> Optional.of(2)), cachedLeaderEpochs(context2)) + assertEquals(Map(tp0 -> Optional.empty, tp1 -> Optional.empty, tp2 -> Optional.of(1)), + cachedLastFetchedEpochs(context2)) + context2.updateAndGenerateResponseData(response).sessionId() + + // Now verify we can change the leader epoch and the context is updated + val request3 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + request3.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.of(6), Optional.of(5))) + request3.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.empty, Optional.empty)) + request3.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(3), Optional.of(3))) + + val context3 = fetchManager.newContext(new JFetchMetadata(sessionId, 2), request3, EMPTY_PART_LIST, false) + assertEquals(Map(tp0 -> Optional.of(6), tp1 -> Optional.empty, tp2 -> Optional.of(3)), + cachedLeaderEpochs(context3)) + assertEquals(Map(tp0 -> Optional.of(5), tp1 -> Optional.empty, tp2 -> Optional.of(3)), + cachedLastFetchedEpochs(context2)) + } + @Test def testFetchRequests(): Unit = { val time = new MockTime() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 76cc470674d14..e229bb902c708 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -175,7 +175,7 @@ class ReplicaManagerQuotasTest { val tp = new TopicPartition("t1", 0) val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L, - relativePositionInSegment = 250), new PartitionData(50, 0, 1, Optional.empty())) + relativePositionInSegment = 250), new PartitionData(50, 0, 1, Optional.empty()), hasDivergingEpoch = false) val fetchMetadata = FetchMetadata(fetchMinBytes = 1, fetchMaxBytes = 1000, hardMaxBytesLimit = true, From 3d554d91fc2b1bbf1b38049d0b4b9d2c87e9b14a Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 14 Oct 2020 21:02:10 +0100 Subject: [PATCH 2/4] Address review comments --- .../main/scala/kafka/cluster/Partition.scala | 7 +++++++ .../main/scala/kafka/server/DelayedFetch.scala | 18 ++++++++++-------- .../scala/kafka/server/ReplicaManager.scala | 8 ++++++-- .../kafka/server/DelayedFetchTest.scala | 18 +++++++++++------- .../server/ReplicaManagerQuotasTest.scala | 2 +- 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 7b5d6b715f634..fcc9d331ce0f1 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1162,6 +1162,13 @@ class Partition(val topicPartition: TopicPartition, localLog.fetchOffsetSnapshot } + def hasDivergingEpoch(currentLeaderEpoch: Optional[Integer], + lastFetchedEpoch: Int, + fetchOffset: Long): Boolean = { + val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch, fetchOnlyFromLeader = false) + epochEndOffset.leaderEpoch < lastFetchedEpoch || epochEndOffset.endOffset < fetchOffset + } + def legacyFetchOffsetsForTimestamp(timestamp: Long, maxNumOffsets: Int, isFromConsumer: Boolean, diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index ca54318d96281..5d4df4854ef99 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -27,12 +27,11 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import scala.collection._ -case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData, hasDivergingEpoch: Boolean) { +case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) { override def toString: String = { "[startOffsetMetadata: " + startOffsetMetadata + ", fetchInfo: " + fetchInfo + - ", hasDivergingEpoch: " + hasDivergingEpoch + "]" } } @@ -91,12 +90,6 @@ class DelayedFetch(delayMs: Long, if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition) - // Case H: Return diverging epoch in response to trigger truncation - if (fetchStatus.hasDivergingEpoch) { - debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition $topicPartition.") - return forceComplete() - } - val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader) val endOffset = fetchMetadata.fetchIsolation match { @@ -105,6 +98,7 @@ class DelayedFetch(delayMs: Long, case FetchTxnCommitted => offsetSnapshot.lastStableOffset } + // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. @@ -127,6 +121,14 @@ class DelayedFetch(delayMs: Long, accumulatedSize += bytesAvailable } } + + // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation + fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch => + if (partition.hasDivergingEpoch(fetchLeaderEpoch, fetchEpoch, fetchStatus.fetchInfo.fetchOffset)) { + debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition $topicPartition.") + return forceComplete() + } + } } } catch { case _: NotLeaderOrFollowerException => // Case A or Case B diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5527704530915..b9487fe0a5ee4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1048,6 +1048,7 @@ class ReplicaManager(val config: KafkaConfig, // check if this fetch request can be satisfied right away var bytesReadable: Long = 0 var errorReadingData = false + var hasDivergingEpoch = false val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] logReadResults.foreach { case (topicPartition, logReadResult) => brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark() @@ -1055,6 +1056,8 @@ class ReplicaManager(val config: KafkaConfig, if (logReadResult.error != Errors.NONE) errorReadingData = true + if (logReadResult.divergingEpoch.nonEmpty) + hasDivergingEpoch = true bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes logReadResultMap.put(topicPartition, logReadResult) } @@ -1063,7 +1066,8 @@ class ReplicaManager(val config: KafkaConfig, // 2) fetch request does not require any data // 3) has enough data to respond // 4) some error happens while reading data - if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { + // 5) we found a diverging epoch + if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId) tp -> FetchPartitionData( @@ -1084,7 +1088,7 @@ class ReplicaManager(val config: KafkaConfig, fetchInfos.foreach { case (topicPartition, partitionData) => logReadResultMap.get(topicPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData, logReadResult.divergingEpoch.nonEmpty)) + fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 18f0a5229af31..a5d3f2705ba62 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -20,6 +20,7 @@ import java.util.Optional import scala.collection.Seq import kafka.cluster.Partition +import kafka.log.LogOffsetSnapshot import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} import org.apache.kafka.common.protocol.Errors @@ -44,9 +45,7 @@ class DelayedFetchTest extends EasyMockSupport { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch), - hasDivergingEpoch = false - ) + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) var fetchResultOpt: Option[FetchPartitionData] = None @@ -94,8 +93,7 @@ class DelayedFetchTest extends EasyMockSupport { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch), - hasDivergingEpoch = false) + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) var fetchResultOpt: Option[FetchPartitionData] = None @@ -129,12 +127,12 @@ class DelayedFetchTest extends EasyMockSupport { val fetchOffset = 500L val logStartOffset = 0L val currentLeaderEpoch = Optional.of[Integer](10) + val lastFetchedEpoch = Optional.of[Integer](9) val replicaId = 1 val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch), - hasDivergingEpoch = true) + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) var fetchResultOpt: Option[FetchPartitionData] = None @@ -152,6 +150,12 @@ class DelayedFetchTest extends EasyMockSupport { val partition: Partition = mock(classOf[Partition]) EasyMock.expect(replicaManager.getPartitionOrException(topicPartition)).andReturn(partition) + val endOffsetMetadata = LogOffsetMetadata(messageOffset = 500L, segmentBaseOffset = 0L, relativePositionInSegment = 500) + EasyMock.expect(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .andReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + EasyMock.expect(partition.hasDivergingEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOffset)).andReturn(true) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NONE) replayAll() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index e229bb902c708..76cc470674d14 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -175,7 +175,7 @@ class ReplicaManagerQuotasTest { val tp = new TopicPartition("t1", 0) val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L, - relativePositionInSegment = 250), new PartitionData(50, 0, 1, Optional.empty()), hasDivergingEpoch = false) + relativePositionInSegment = 250), new PartitionData(50, 0, 1, Optional.empty())) val fetchMetadata = FetchMetadata(fetchMinBytes = 1, fetchMaxBytes = 1000, hardMaxBytesLimit = true, From 38ce7eda1aab049d4008ba1881ea26ba1dac294d Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 15 Oct 2020 10:38:10 +0100 Subject: [PATCH 3/4] Fix test to compile with scala 2.12 --- .../src/test/scala/unit/kafka/server/FetchSessionTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index 01c5aaddd7466..5ef3105e3a0ea 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -211,8 +211,8 @@ class FetchSessionTest { } val request1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - request1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty, Optional.empty)) - request1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1), Optional.empty)) + request1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty[Integer], Optional.empty[Integer])) + request1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1), Optional.empty[Integer])) request1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(2), Optional.of(1))) val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, request1, EMPTY_PART_LIST, false) @@ -239,7 +239,7 @@ class FetchSessionTest { // Now verify we can change the leader epoch and the context is updated val request3 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] request3.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.of(6), Optional.of(5))) - request3.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.empty, Optional.empty)) + request3.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.empty[Integer], Optional.empty[Integer])) request3.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(3), Optional.of(3))) val context3 = fetchManager.newContext(new JFetchMetadata(sessionId, 2), request3, EMPTY_PART_LIST, false) From cda32dc16f228c2a1e90ad0a9d161b8cec6fd446 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 15 Oct 2020 18:27:43 +0100 Subject: [PATCH 4/4] Address review comments --- core/src/main/scala/kafka/cluster/Partition.scala | 7 ------- core/src/main/scala/kafka/server/DelayedFetch.scala | 12 ++++++++---- core/src/main/scala/kafka/server/FetchSession.scala | 4 ++-- .../integration/kafka/server/DelayedFetchTest.scala | 5 +++-- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index fcc9d331ce0f1..7b5d6b715f634 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1162,13 +1162,6 @@ class Partition(val topicPartition: TopicPartition, localLog.fetchOffsetSnapshot } - def hasDivergingEpoch(currentLeaderEpoch: Optional[Integer], - lastFetchedEpoch: Int, - fetchOffset: Long): Boolean = { - val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch, fetchOnlyFromLeader = false) - epochEndOffset.leaderEpoch < lastFetchedEpoch || epochEndOffset.endOffset < fetchOffset - } - def legacyFetchOffsetsForTimestamp(timestamp: Long, maxNumOffsets: Int, isFromConsumer: Boolean, diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 5d4df4854ef99..fb07077d1725b 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors._ +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -89,7 +90,6 @@ class DelayedFetch(delayMs: Long, try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition) - val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader) val endOffset = fetchMetadata.fetchIsolation match { @@ -98,7 +98,6 @@ class DelayedFetch(delayMs: Long, case FetchTxnCommitted => offsetSnapshot.lastStableOffset } - // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. @@ -124,8 +123,13 @@ class DelayedFetch(delayMs: Long, // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch => - if (partition.hasDivergingEpoch(fetchLeaderEpoch, fetchEpoch, fetchStatus.fetchInfo.fetchOffset)) { - debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition $topicPartition.") + val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false) + if (epochEndOffset.error != Errors.NONE || epochEndOffset.hasUndefinedEpochOrOffset) { + debug(s"Could not obtain last offset for leader epoch for partition $topicPartition, epochEndOffset=$epochEndOffset.") + return forceComplete() + } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { + debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " + + s"$topicPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") return forceComplete() } } diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index de5fae0d57de2..140fba0f0a57d 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -78,7 +78,7 @@ class CachedPartition(val topic: String, var leaderEpoch: Optional[Integer], var fetcherLogStartOffset: Long, var localLogStartOffset: Long, - var lastFetchedEpoch: Optional[Integer] = Optional.empty[Integer]) + var lastFetchedEpoch: Optional[Integer]) extends ImplicitLinkedHashCollection.Element { var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX @@ -90,7 +90,7 @@ class CachedPartition(val topic: String, override def setPrev(prev: Int): Unit = this.cachedPrev = prev def this(topic: String, partition: Int) = - this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1) + this(topic, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) def this(part: TopicPartition) = this(part.topic, part.partition) diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index a5d3f2705ba62..5b9e056a18c63 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest} import org.easymock.{EasyMock, EasyMockSupport} import org.junit.Test import org.junit.Assert._ @@ -155,7 +155,8 @@ class DelayedFetchTest extends EasyMockSupport { currentLeaderEpoch, fetchOnlyFromLeader = true)) .andReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) - EasyMock.expect(partition.hasDivergingEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOffset)).andReturn(true) + EasyMock.expect(partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOnlyFromLeader = false)) + .andReturn(new EpochEndOffset(Errors.NONE, lastFetchedEpoch.get, fetchOffset - 1)) EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false) expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NONE) replayAll()