diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java index 727c7087c439a..f9f4f23c00aa1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java @@ -16,11 +16,8 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData; -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; -import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic; import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection; import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; @@ -28,11 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Optional; import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; @@ -71,23 +65,10 @@ public static Builder forConsumer(OffsetForLeaderTopicCollection epochsByPartiti return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data); } - public static Builder forFollower(short version, Map epochsByPartition, int replicaId) { + public static Builder forFollower(short version, OffsetForLeaderTopicCollection epochsByPartition, int replicaId) { OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData(); data.setReplicaId(replicaId); - - epochsByPartition.forEach((partitionKey, partitionValue) -> { - OffsetForLeaderTopic topic = data.topics().find(partitionKey.topic()); - if (topic == null) { - topic = new OffsetForLeaderTopic().setTopic(partitionKey.topic()); - data.topics().add(topic); - } - topic.partitions().add(new OffsetForLeaderPartition() - .setPartition(partitionKey.partition()) - .setLeaderEpoch(partitionValue.leaderEpoch) - .setCurrentLeaderEpoch(partitionValue.currentLeaderEpoch - .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) - ); - }); + data.setTopics(epochsByPartition); return new Builder(version, version, data); } @@ -143,25 +124,6 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { return new OffsetsForLeaderEpochResponse(responseData); } - public static class PartitionData { - public final Optional currentLeaderEpoch; - public final int leaderEpoch; - - public PartitionData(Optional currentLeaderEpoch, int leaderEpoch) { - this.currentLeaderEpoch = currentLeaderEpoch; - this.leaderEpoch = leaderEpoch; - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch). - append(", leaderEpoch=").append(leaderEpoch). - append(")"); - return bld.toString(); - } - } - /** * Check whether a broker allows Topic-level permissions in order to use the * OffsetForLeaderEpoch API. Old versions require Cluster permission. diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java index 929eb2fd2a0cb..2b781baf150b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -33,7 +33,7 @@ public final class RequestUtils { private RequestUtils() {} - static Optional getLeaderEpoch(int leaderEpoch) { + public static Optional getLeaderEpoch(int leaderEpoch) { return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java index 9754161eddde8..ffef9476b2032 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.junit.Test; -import java.util.Collections; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -47,7 +45,7 @@ public void testDefaultReplicaId() { for (short version = 0; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) { int replicaId = 1; OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower( - version, Collections.emptyMap(), replicaId); + version, new OffsetForLeaderTopicCollection(), replicaId); OffsetsForLeaderEpochRequest request = builder.build(); OffsetsForLeaderEpochRequest parsed = OffsetsForLeaderEpochRequest.parse(request.serialize(), version); if (version < 3) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 886cd2a3c7434..f70c7d08fe48a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1777,17 +1777,6 @@ private InitProducerIdResponse createInitPidResponse() { return new InitProducerIdResponse(responseData); } - private Map createOffsetForLeaderEpochPartitionData() { - Map epochs = new HashMap<>(); - epochs.put(new TopicPartition("topic1", 0), - new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1)); - epochs.put(new TopicPartition("topic1", 1), - new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1)); - epochs.put(new TopicPartition("topic2", 2), - new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3)); - return epochs; - } - private OffsetForLeaderTopicCollection createOffsetForLeaderTopicCollection() { OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection(); topics.add(new OffsetForLeaderTopic() @@ -1817,7 +1806,7 @@ private OffsetsForLeaderEpochRequest createLeaderEpochRequestForConsumer() { } private OffsetsForLeaderEpochRequest createLeaderEpochRequestForReplica(int version, int replicaId) { - Map epochs = createOffsetForLeaderEpochPartitionData(); + OffsetForLeaderTopicCollection epochs = createOffsetForLeaderTopicCollection(); return OffsetsForLeaderEpochRequest.Builder.forFollower((short) version, epochs, replicaId).build(); } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 9af23c6473be6..5e716e29e94f0 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -32,6 +32,7 @@ import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.protocol.Errors import scala.collection.{Map, Set, mutable} +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong @@ -41,6 +42,7 @@ import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import org.apache.kafka.common.{InvalidRecordException, TopicPartition} import org.apache.kafka.common.internals.PartitionStates +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} import org.apache.kafka.common.requests._ @@ -61,7 +63,7 @@ abstract class AbstractFetcherThread(name: String, extends ShutdownableThread(name, isInterruptible) { type FetchData = FetchResponse.PartitionData[Records] - type EpochData = OffsetsForLeaderEpochRequest.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition private val partitionStates = new PartitionStates[PartitionFetchState] protected val partitionMapLock = new ReentrantLock @@ -160,7 +162,10 @@ abstract class AbstractFetcherThread(name: String, if (state.isTruncating) { latestEpoch(tp) match { case Some(epoch) if isOffsetForLeaderEpochSupported => - partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch) + partitionsWithEpochs += tp -> new EpochData() + .setPartition(tp.partition) + .setCurrentLeaderEpoch(state.currentLeaderEpoch) + .setLeaderEpoch(epoch) case _ => partitionsWithoutEpochs += tp } @@ -218,7 +223,7 @@ abstract class AbstractFetcherThread(name: String, throw new IllegalStateException( s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request") }) - val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get + val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch } @@ -268,11 +273,10 @@ abstract class AbstractFetcherThread(name: String, fetchOffsets.put(tp, offsetTruncationState) case Errors.FENCED_LEADER_EPOCH => - if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap { - p => - if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get()) - else None - })) partitionsWithError += tp + val currentLeaderEpoch = latestEpochsForPartitions.get(tp) + .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava + if (onPartitionFenced(tp, currentLeaderEpoch)) + partitionsWithError += tp case error => info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error") @@ -287,10 +291,10 @@ abstract class AbstractFetcherThread(name: String, * remove the partition if the partition state is NOT updated. Otherwise, keep the partition active. * @return true if the epoch in this thread is updated. otherwise, false */ - private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) { + private def onPartitionFenced(tp: TopicPartition, requestEpoch: Optional[Integer]): Boolean = inLock(partitionMapLock) { Option(partitionStates.stateValue(tp)).exists { currentFetchState => val currentLeaderEpoch = currentFetchState.currentLeaderEpoch - if (requestEpoch.contains(currentLeaderEpoch)) { + if (requestEpoch.isPresent && requestEpoch.get == currentLeaderEpoch) { info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " + s"the new LeaderAndIsr state before resuming fetching.") markPartitionFailed(tp) @@ -336,7 +340,6 @@ abstract class AbstractFetcherThread(name: String, // the current offset is the same as the offset requested. val fetchPartitionData = sessionPartitions.get(topicPartition) if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { - val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None partitionData.error match { case Errors.NONE => try { @@ -390,7 +393,7 @@ abstract class AbstractFetcherThread(name: String, markPartitionFailed(topicPartition) } case Errors.OFFSET_OUT_OF_RANGE => - if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch)) + if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition case Errors.UNKNOWN_LEADER_EPOCH => @@ -399,7 +402,8 @@ abstract class AbstractFetcherThread(name: String, partitionsWithError += topicPartition case Errors.FENCED_LEADER_EPOCH => - if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition + if (onPartitionFenced(topicPartition, fetchPartitionData.currentLeaderEpoch)) + partitionsWithError += topicPartition case Errors.NOT_LEADER_OR_FOLLOWER => debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " + @@ -600,7 +604,7 @@ abstract class AbstractFetcherThread(name: String, */ private def handleOutOfRangeError(topicPartition: TopicPartition, fetchState: PartitionFetchState, - requestEpoch: Option[Int]): Boolean = { + requestEpoch: Optional[Integer]): Boolean = { try { val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 3315f30ee130c..7ad95c48e0040 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.record.Records import org.apache.kafka.common.requests.FetchResponse.PartitionData import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.requests.RequestUtils import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq, Set, mutable} @@ -172,7 +173,7 @@ class ReplicaAlterLogDirsThread(name: String, } else { val partition = replicaMgr.getPartitionOrException(tp) partition.lastOffsetForLeaderEpoch( - currentLeaderEpoch = epochData.currentLeaderEpoch, + currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch), leaderEpoch = epochData.leaderEpoch, fetchOnlyFromLeader = false) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 1d3a38a0c2b57..c440ea7d18544 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -30,6 +30,8 @@ import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors @@ -335,7 +337,18 @@ class ReplicaFetcherThread(name: String, return Map.empty } - val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId) + val topics = new OffsetForLeaderTopicCollection(partitions.size) + partitions.forKeyValue { (topicPartition, epochData) => + var topic = topics.find(topicPartition.topic) + if (topic == null) { + topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) + topics.add(topic) + } + topic.partitions.add(epochData) + } + + val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower( + offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId) debug(s"Sending offset for leader epoch request $epochRequest") try { @@ -343,7 +356,7 @@ class ReplicaFetcherThread(name: String, val responseBody = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse] debug(s"Received leaderEpoch response $response") responseBody.data.topics.asScala.flatMap { offsetForLeaderTopicResult => - offsetForLeaderTopicResult.partitions().asScala.map { offsetForLeaderPartitionResult => + offsetForLeaderTopicResult.partitions.asScala.map { offsetForLeaderPartitionResult => val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition) tp -> offsetForLeaderPartitionResult } diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 29ddebc362810..d58abdb2ed539 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -27,6 +27,7 @@ import kafka.message.NoCompressionCodec import kafka.metrics.KafkaYammerMetrics import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions +import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.TestUtils import org.apache.kafka.common.KafkaException import org.apache.kafka.common.TopicPartition @@ -640,7 +641,7 @@ class AbstractFetcherThreadTest { } private def testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): Unit = { - val partition = new TopicPartition("topic", 0) + val partition = new TopicPartition("topic", 1) val initialLeaderEpochOnFollower = 0 val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1 @@ -1006,7 +1007,9 @@ class AbstractFetcherThreadTest { override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = { - val epochData = new EpochData(Optional.empty[Integer](), epoch) + val epochData = new EpochData() + .setPartition(topicPartition.partition) + .setLeaderEpoch(epoch) val result = lookupEndOffsetForEpoch(topicPartition, epochData, replicaPartitionState(topicPartition)) if (result.endOffset == UNDEFINED_EPOCH_OFFSET) None @@ -1017,7 +1020,15 @@ class AbstractFetcherThreadTest { private def checkExpectedLeaderEpoch(expectedEpochOpt: Optional[Integer], partitionState: PartitionState): Option[Errors] = { if (expectedEpochOpt.isPresent) { - val expectedEpoch = expectedEpochOpt.get + checkExpectedLeaderEpoch(expectedEpochOpt.get, partitionState) + } else { + None + } + } + + private def checkExpectedLeaderEpoch(expectedEpoch: Int, + partitionState: PartitionState): Option[Errors] = { + if (expectedEpoch != RecordBatch.NO_PARTITION_LEADER_EPOCH) { if (expectedEpoch < partitionState.leaderEpoch) Some(Errors.FENCED_LEADER_EPOCH) else if (expectedEpoch > partitionState.leaderEpoch) @@ -1036,12 +1047,15 @@ class AbstractFetcherThreadTest { } } - private def divergingEpochAndOffset(partition: TopicPartition, + private def divergingEpochAndOffset(topicPartition: TopicPartition, lastFetchedEpoch: Optional[Integer], fetchOffset: Long, partitionState: PartitionState): Option[FetchResponseData.EpochEndOffset] = { lastFetchedEpoch.asScala.flatMap { fetchEpoch => - val epochEndOffset = fetchEpochEndOffsets(Map(partition -> new EpochData(Optional.empty[Integer], fetchEpoch)))(partition) + val epochEndOffset = fetchEpochEndOffsets( + Map(topicPartition -> new EpochData() + .setPartition(topicPartition.partition) + .setLeaderEpoch(fetchEpoch)))(topicPartition) if (partitionState.log.isEmpty || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET @@ -1091,7 +1105,9 @@ class AbstractFetcherThreadTest { override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() - partitions.foreach { case (partition, epochData) => + partitions.forKeyValue { (partition, epochData) => + assert(partition.partition == epochData.partition, + "Partition must be consistent between TopicPartition and EpochData") val leaderState = leaderPartitionState(partition) val epochEndOffset = lookupEndOffsetForEpoch(partition, epochData, leaderState) endOffsets.put(partition, epochEndOffset) @@ -1137,7 +1153,7 @@ class AbstractFetcherThreadTest { } private def checkLeaderEpochAndThrow(expectedEpoch: Int, partitionState: PartitionState): Unit = { - checkExpectedLeaderEpoch(Optional.of[Integer](expectedEpoch), partitionState).foreach { error => + checkExpectedLeaderEpoch(expectedEpoch, partitionState).foreach { error => throw error.exception() } } diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala index c04cb5c172995..6d0059c046fab 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala @@ -20,7 +20,11 @@ import java.util.Optional import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} import org.junit.Assert._ import org.junit.Test @@ -33,8 +37,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest { def testOffsetsForLeaderEpochErrorCodes(): Unit = { val topic = "topic" val partition = new TopicPartition(topic, 0) + val epochs = offsetForLeaderTopicCollectionFor(partition, 0, RecordBatch.NO_PARTITION_LEADER_EPOCH) - val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava val request = OffsetsForLeaderEpochRequest.Builder.forFollower( ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build() @@ -60,8 +64,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest { val firstLeaderId = partitionToLeader(topicPartition.partition) def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = { - val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData( - currentLeaderEpoch, 0)).asJava + val epochs = offsetForLeaderTopicCollectionFor(topicPartition, 0, + currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) val request = OffsetsForLeaderEpochRequest.Builder.forFollower( ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build() assertResponseError(error, brokerId, request) @@ -87,6 +91,22 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest { assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1)) } + private def offsetForLeaderTopicCollectionFor( + topicPartition: TopicPartition, + leaderEpoch: Int, + currentLeaderEpoch: Int + ): OffsetForLeaderTopicCollection = { + new OffsetForLeaderTopicCollection(List( + new OffsetForLeaderTopic() + .setTopic(topicPartition.topic) + .setPartitions(List( + new OffsetForLeaderPartition() + .setPartition(topicPartition.partition) + .setLeaderEpoch(leaderEpoch) + .setCurrentLeaderEpoch(currentLeaderEpoch) + ).asJava)).iterator.asJava) + } + private def assertResponseError(error: Errors, brokerId: Int, request: OffsetsForLeaderEpochRequest): Unit = { val response = sendRequest(brokerId, request) assertEquals(request.data.topics.size, response.data.topics.size) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index fd783273c9bac..6a5dd765d5535 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -25,10 +25,11 @@ import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UnboundedQuota import kafka.utils.{DelayedItem, TestUtils} import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{FetchRequest, OffsetsForLeaderEpochRequest} +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.easymock.EasyMock._ import org.easymock.{Capture, CaptureType, EasyMock, IExpectationSetters} @@ -321,8 +322,12 @@ class ReplicaAlterLogDirsThreadTest { brokerTopicStats = null) val result = thread.fetchEpochEndOffsets(Map( - t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p0), - t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p1))) + t1p0 -> new OffsetForLeaderPartition() + .setPartition(t1p0.partition) + .setLeaderEpoch(leaderEpochT1p0), + t1p1 -> new OffsetForLeaderPartition() + .setPartition(t1p1.partition) + .setLeaderEpoch(leaderEpochT1p1))) val expected = Map( t1p0 -> new EpochEndOffset() @@ -382,8 +387,12 @@ class ReplicaAlterLogDirsThreadTest { brokerTopicStats = null) val result = thread.fetchEpochEndOffsets(Map( - t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch), - t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch))) + t1p0 -> new OffsetForLeaderPartition() + .setPartition(t1p0.partition) + .setLeaderEpoch(leaderEpoch), + t1p1 -> new OffsetForLeaderPartition() + .setPartition(t1p1.partition) + .setLeaderEpoch(leaderEpoch))) val expected = Map( t1p0 -> new EpochEndOffset() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 51370178c4cc6..0d346b9b68f1c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -27,13 +27,14 @@ import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} -import org.apache.kafka.common.requests.{FetchResponse, OffsetsForLeaderEpochRequest} +import org.apache.kafka.common.requests.FetchResponse import org.apache.kafka.common.utils.SystemTime import org.easymock.EasyMock._ import org.easymock.{Capture, CaptureType} @@ -210,8 +211,12 @@ class ReplicaFetcherThreadTest { leaderEndpointBlockingSend = Some(mockBlockingSend)) val result = thread.fetchEpochEndOffsets(Map( - t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0), - t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0))) + t1p0 -> new OffsetForLeaderPartition() + .setPartition(t1p0.partition) + .setLeaderEpoch(0), + t1p1 -> new OffsetForLeaderPartition() + .setPartition(t1p1.partition) + .setLeaderEpoch(0))) val expected = Map( t1p0 -> newOffsetForLeaderPartitionResult(t1p0, Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index 8a49aac987b6a..9585587b0f8d9 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -16,10 +16,9 @@ */ package kafka.server.epoch -import java.util.Optional - import kafka.server.KafkaConfig._ import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend} +import kafka.utils.Implicits._ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.zk.ZooKeeperTestHarness @@ -29,6 +28,9 @@ import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.{LogContext, SystemTime} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} @@ -274,12 +276,20 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging { def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { - val partitionData = partitions.map { case (k, v) => - k -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), v) + val topics = new OffsetForLeaderTopicCollection(partitions.size) + partitions.forKeyValue { (topicPartition, leaderEpoch) => + var topic = topics.find(topicPartition.topic) + if (topic == null) { + topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) + topics.add(topic) + } + topic.partitions.add(new OffsetForLeaderPartition() + .setPartition(topicPartition.partition) + .setLeaderEpoch(leaderEpoch)) } val request = OffsetsForLeaderEpochRequest.Builder.forFollower( - ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, partitionData.asJava, 1) + ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, topics, 1) val response = sender.sendRequest(request) response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].data.topics.asScala.flatMap { topic => topic.partitions.asScala.map { partition => diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index ca3625a684539..a0a33873b3cc8 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -46,6 +46,7 @@ import kafka.utils.Pool; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.LeaderAndIsrRequestData; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -54,7 +55,6 @@ import org.apache.kafka.common.record.RecordsSend; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.mockito.Mockito; @@ -305,7 +305,7 @@ public long fetchEarliestOffsetFromLeader(TopicPartition topicPartition, int cur } @Override - public Map fetchEpochEndOffsets(Map partitions) { + public Map fetchEpochEndOffsets(Map partitions) { scala.collection.mutable.Map endOffsets = new scala.collection.mutable.HashMap<>(); Iterator iterator = partitions.keys().iterator(); while (iterator.hasNext()) {