Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,17 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
Expand Down Expand Up @@ -71,23 +65,10 @@ public static Builder forConsumer(OffsetForLeaderTopicCollection epochsByPartiti
return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
}

public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
public static Builder forFollower(short version, OffsetForLeaderTopicCollection epochsByPartition, int replicaId) {
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData();
data.setReplicaId(replicaId);

epochsByPartition.forEach((partitionKey, partitionValue) -> {
OffsetForLeaderTopic topic = data.topics().find(partitionKey.topic());
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(partitionKey.topic());
data.topics().add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(partitionKey.partition())
.setLeaderEpoch(partitionValue.leaderEpoch)
.setCurrentLeaderEpoch(partitionValue.currentLeaderEpoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
});
data.setTopics(epochsByPartition);
return new Builder(version, version, data);
}

Expand Down Expand Up @@ -143,25 +124,6 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new OffsetsForLeaderEpochResponse(responseData);
}

public static class PartitionData {
public final Optional<Integer> currentLeaderEpoch;
public final int leaderEpoch;

public PartitionData(Optional<Integer> currentLeaderEpoch, int leaderEpoch) {
this.currentLeaderEpoch = currentLeaderEpoch;
this.leaderEpoch = leaderEpoch;
}

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
append(", leaderEpoch=").append(leaderEpoch).
append(")");
return bld.toString();
}
}

/**
* Check whether a broker allows Topic-level permissions in order to use the
* OffsetForLeaderEpoch API. Old versions require Cluster permission.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class RequestUtils {

private RequestUtils() {}

static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
public static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Thanks for pointing this out. I think that we should tackle this in follow-up PRs as this is not strictly related to this change. Ok for you?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok to me :)

return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
Optional.empty() : Optional.of(leaderEpoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.junit.Test;

import java.util.Collections;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

Expand All @@ -47,7 +45,7 @@ public void testDefaultReplicaId() {
for (short version = 0; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) {
int replicaId = 1;
OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower(
version, Collections.emptyMap(), replicaId);
version, new OffsetForLeaderTopicCollection(), replicaId);
OffsetsForLeaderEpochRequest request = builder.build();
OffsetsForLeaderEpochRequest parsed = OffsetsForLeaderEpochRequest.parse(request.serialize(), version);
if (version < 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1777,17 +1777,6 @@ private InitProducerIdResponse createInitPidResponse() {
return new InitProducerIdResponse(responseData);
}

private Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> createOffsetForLeaderEpochPartitionData() {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
epochs.put(new TopicPartition("topic1", 0),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1));
epochs.put(new TopicPartition("topic1", 1),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1));
epochs.put(new TopicPartition("topic2", 2),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
return epochs;
}

private OffsetForLeaderTopicCollection createOffsetForLeaderTopicCollection() {
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
topics.add(new OffsetForLeaderTopic()
Expand Down Expand Up @@ -1817,7 +1806,7 @@ private OffsetsForLeaderEpochRequest createLeaderEpochRequestForConsumer() {
}

private OffsetsForLeaderEpochRequest createLeaderEpochRequestForReplica(int version, int replicaId) {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = createOffsetForLeaderEpochPartitionData();
OffsetForLeaderTopicCollection epochs = createOffsetForLeaderTopicCollection();
return OffsetsForLeaderEpochRequest.Builder.forFollower((short) version, epochs, replicaId).build();
}

Expand Down
32 changes: 18 additions & 14 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors

import scala.collection.{Map, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -41,6 +42,7 @@ import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests._
Expand All @@ -61,7 +63,7 @@ abstract class AbstractFetcherThread(name: String,
extends ShutdownableThread(name, isInterruptible) {

type FetchData = FetchResponse.PartitionData[Records]
type EpochData = OffsetsForLeaderEpochRequest.PartitionData
type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition

private val partitionStates = new PartitionStates[PartitionFetchState]
protected val partitionMapLock = new ReentrantLock
Expand Down Expand Up @@ -160,7 +162,10 @@ abstract class AbstractFetcherThread(name: String,
if (state.isTruncating) {
latestEpoch(tp) match {
case Some(epoch) if isOffsetForLeaderEpochSupported =>
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
partitionsWithEpochs += tp -> new EpochData()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it call setPartition?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Let me fix this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated tests to catch this.

.setPartition(tp.partition)
.setCurrentLeaderEpoch(state.currentLeaderEpoch)
.setLeaderEpoch(epoch)
case _ =>
partitionsWithoutEpochs += tp
}
Expand Down Expand Up @@ -218,7 +223,7 @@ abstract class AbstractFetcherThread(name: String,
throw new IllegalStateException(
s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request")
})
val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}

Expand Down Expand Up @@ -268,11 +273,10 @@ abstract class AbstractFetcherThread(name: String,
fetchOffsets.put(tp, offsetTruncationState)

case Errors.FENCED_LEADER_EPOCH =>
if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
p =>
if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
else None
})) partitionsWithError += tp
val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
.map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava
if (onPartitionFenced(tp, currentLeaderEpoch))
partitionsWithError += tp

case error =>
info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
Expand All @@ -287,10 +291,10 @@ abstract class AbstractFetcherThread(name: String,
* remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
* @return true if the epoch in this thread is updated. otherwise, false
*/
private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) {
private def onPartitionFenced(tp: TopicPartition, requestEpoch: Optional[Integer]): Boolean = inLock(partitionMapLock) {
Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
if (requestEpoch.contains(currentLeaderEpoch)) {
if (requestEpoch.isPresent && requestEpoch.get == currentLeaderEpoch) {
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
s"the new LeaderAndIsr state before resuming fetching.")
markPartitionFailed(tp)
Expand Down Expand Up @@ -336,7 +340,6 @@ abstract class AbstractFetcherThread(name: String,
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None
partitionData.error match {
case Errors.NONE =>
try {
Expand Down Expand Up @@ -390,7 +393,7 @@ abstract class AbstractFetcherThread(name: String,
markPartitionFailed(topicPartition)
}
case Errors.OFFSET_OUT_OF_RANGE =>
if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
partitionsWithError += topicPartition

case Errors.UNKNOWN_LEADER_EPOCH =>
Expand All @@ -399,7 +402,8 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition

case Errors.FENCED_LEADER_EPOCH =>
if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
if (onPartitionFenced(topicPartition, fetchPartitionData.currentLeaderEpoch))
partitionsWithError += topicPartition

case Errors.NOT_LEADER_OR_FOLLOWER =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
Expand Down Expand Up @@ -600,7 +604,7 @@ abstract class AbstractFetcherThread(name: String,
*/
private def handleOutOfRangeError(topicPartition: TopicPartition,
fetchState: PartitionFetchState,
requestEpoch: Option[Int]): Boolean = {
requestEpoch: Optional[Integer]): Boolean = {
try {
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
import org.apache.kafka.common.requests.RequestUtils

import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, mutable}
Expand Down Expand Up @@ -172,7 +173,7 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
val partition = replicaMgr.getPartitionOrException(tp)
partition.lastOffsetForLeaderEpoch(
currentLeaderEpoch = epochData.currentLeaderEpoch,
currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch),
leaderEpoch = epochData.leaderEpoch,
fetchOnlyFromLeader = false)
}
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
Expand Down Expand Up @@ -335,15 +337,26 @@ class ReplicaFetcherThread(name: String,
return Map.empty
}

val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId)
val topics = new OffsetForLeaderTopicCollection(partitions.size)
partitions.forKeyValue { (topicPartition, epochData) =>
var topic = topics.find(topicPartition.topic)
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
topics.add(topic)
}
topic.partitions.add(epochData)
}

val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(
offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
debug(s"Sending offset for leader epoch request $epochRequest")

try {
val response = leaderEndpoint.sendRequest(epochRequest)
val responseBody = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse]
debug(s"Received leaderEpoch response $response")
responseBody.data.topics.asScala.flatMap { offsetForLeaderTopicResult =>
offsetForLeaderTopicResult.partitions().asScala.map { offsetForLeaderPartitionResult =>
offsetForLeaderTopicResult.partitions.asScala.map { offsetForLeaderPartitionResult =>
val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition)
tp -> offsetForLeaderPartitionResult
}
Expand Down
Loading