From 5ea4672525aef29aabd6f785643481a950eb44a9 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 16 Apr 2018 12:48:03 -0700 Subject: [PATCH 01/24] KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over --- .../kafka/common/protocol/CommonFields.java | 1 + .../kafka/common/requests/EpochEndOffset.java | 23 ++-- .../OffsetsForLeaderEpochRequest.java | 8 +- .../OffsetsForLeaderEpochResponse.java | 23 +++- .../common/requests/RequestResponseTest.java | 6 +- .../main/scala/kafka/cluster/Partition.scala | 9 +- .../consumer/ConsumerFetcherThread.scala | 4 +- .../kafka/server/AbstractFetcherThread.scala | 13 ++- .../server/ReplicaAlterLogDirsThread.scala | 58 +++++++--- .../kafka/server/ReplicaFetcherThread.scala | 78 +++++++++++--- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../server/epoch/LeaderEpochFileCache.scala | 36 ++++--- .../server/AbstractFetcherThreadTest.scala | 3 +- .../server/ReplicaFetcherThreadTest.scala | 100 +++++++++++++++--- ...venReplicationProtocolAcceptanceTest.scala | 84 ++++++++++++++- .../epoch/LeaderEpochFileCacheTest.scala | 55 +++++----- .../epoch/OffsetsForLeaderEpochTest.scala | 12 +-- .../util/ReplicaFetcherMockBlockingSend.scala | 18 +++- 18 files changed, 413 insertions(+), 122 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java index a436dff1d03e1..7f43caf869632 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java @@ -26,6 +26,7 @@ public class CommonFields { public static final Field.Int32 PARTITION_ID = new Field.Int32("partition", "Topic partition id"); public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code"); public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message"); + public static final Field.Int32 LEADER_EPOCH = new Field.Int32("leader_epoch", "The epoch"); // Group APIs public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java index 0965e3612d8aa..b6a973e37c34e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java @@ -20,6 +20,8 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH; +import java.util.Objects; + /** * The offset, fetched from a leader, for a particular partition. */ @@ -29,15 +31,18 @@ public class EpochEndOffset { public static final int UNDEFINED_EPOCH = -1; private Errors error; + private int leaderEpoch; // introduced in V1 private long endOffset; - public EpochEndOffset(Errors error, long endOffset) { + public EpochEndOffset(Errors error, int leaderEpoch, long endOffset) { this.error = error; + this.leaderEpoch = leaderEpoch; this.endOffset = endOffset; } - public EpochEndOffset(long endOffset) { + public EpochEndOffset(int leaderEpoch, long endOffset) { this.error = Errors.NONE; + this.leaderEpoch = leaderEpoch; this.endOffset = endOffset; } @@ -53,10 +58,15 @@ public long endOffset() { return endOffset; } + public int leaderEpoch() { + return leaderEpoch; + } + @Override public String toString() { return "EpochEndOffset{" + "error=" + error + + ", leaderEpoch=" + leaderEpoch + ", endOffset=" + endOffset + '}'; } @@ -68,14 +78,13 @@ public boolean equals(Object o) { EpochEndOffset that = (EpochEndOffset) o; - if (error != that.error) return false; - return endOffset == that.endOffset; + return Objects.equals(error, that.error) + && Objects.equals(leaderEpoch, that.leaderEpoch) + && Objects.equals(endOffset, that.endOffset); } @Override public int hashCode() { - int result = (int) error.code(); - result = 31 * result + (int) (endOffset ^ (endOffset >>> 32)); - return result; + return Objects.hash(error, leaderEpoch, endOffset); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java index d0585bed6d552..0df15241b1e57 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java @@ -50,8 +50,11 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest { private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema( new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get epochs for")); + /* v2 request is the same as v1. Per-partition leader epoch has been added to response */ + private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = OFFSET_FOR_LEADER_EPOCH_REQUEST_V0; + public static Schema[] schemaVersions() { - return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0}; + return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, OFFSET_FOR_LEADER_EPOCH_REQUEST_V1}; } private Map epochsByPartition; @@ -150,7 +153,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); Map errorResponse = new HashMap<>(); for (TopicPartition tp : epochsByPartition.keySet()) { - errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)); + errorResponse.put(tp, new EpochEndOffset( + error, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)); } return new OffsetsForLeaderEpochResponse(errorResponse); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java index 4a91533938d84..a7f08acf3fadf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; @@ -34,6 +35,7 @@ import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH; import static org.apache.kafka.common.protocol.types.Type.INT64; public class OffsetsForLeaderEpochResponse extends AbstractResponse { @@ -52,8 +54,23 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse { new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0), "An array of topics for which we have leader offsets for some requested Partition Leader Epoch")); + + // OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 added a per-partition leader epoch field, + // which specifies which leader epoch the end offset belongs to + private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 = new Schema( + ERROR_CODE, + PARTITION_ID, + LEADER_EPOCH, + new Field(END_OFFSET_KEY_NAME, INT64, "The end offset")); + private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1))); + private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema( + new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1), + "An array of topics for which we have leader offsets for some requested Partition Leader Epoch")); + public static Schema[] schemaVersions() { - return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0}; + return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1}; } private Map epochEndOffsetsByPartition; @@ -68,8 +85,9 @@ public OffsetsForLeaderEpochResponse(Struct struct) { Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE)); int partitionId = partitionAndEpoch.get(PARTITION_ID); TopicPartition tp = new TopicPartition(topic, partitionId); + int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, RecordBatch.NO_PARTITION_LEADER_EPOCH); long endOffset = partitionAndEpoch.getLong(END_OFFSET_KEY_NAME); - epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, endOffset)); + epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, leaderEpoch, endOffset)); } } } @@ -110,6 +128,7 @@ protected Struct toStruct(short version) { Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code()); partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey()); + partitionStruct.setIfExists(LEADER_EPOCH, partitionEndOffset.getValue().leaderEpoch()); partitionStruct.set(END_OFFSET_KEY_NAME, partitionEndOffset.getValue().endOffset()); partitions.add(partitionStruct); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c63cecdda2813..ecb5aa3f48678 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1026,9 +1026,9 @@ private OffsetsForLeaderEpochRequest createLeaderEpochRequest() { private OffsetsForLeaderEpochResponse createLeaderEpochResponse() { Map epochs = new HashMap<>(); - epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 0)); - epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1)); - epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 2)); + epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 1, 0)); + epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1, 1)); + epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 1, 2)); return new OffsetsForLeaderEpochResponse(epochs); } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 93377bad00ede..d9d62449c5af7 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -647,15 +647,18 @@ class Partition(val topic: String, /** * @param leaderEpoch Requested leader epoch - * @return The last offset of messages published under this leader epoch. + * @return The last offset of messages published under this leader epoch, or if the the + * requested leader epoch is unknown, the last offset of the largest epoch less than + * requested epoch */ def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = { inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => - new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch)) + val epochAndOffset = leaderReplica.epochs.get.endOffsetFor(leaderEpoch) + new EpochEndOffset(NONE, epochAndOffset._1, epochAndOffset._2) case None => - new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) + new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } } } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index ac83fa17a767b..9426884629670 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -20,7 +20,7 @@ package kafka.consumer import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request} import kafka.cluster.BrokerEndPoint import kafka.message.ByteBufferMessageSet -import kafka.server.{AbstractFetcherThread, PartitionFetchState} +import kafka.server.{AbstractFetcherThread, PartitionFetchState, OffsetTruncationState} import AbstractFetcherThread.ResultWithPartitions import kafka.common.{ErrorMapping, TopicAndPartition} @@ -129,7 +129,7 @@ class ConsumerFetcherThread(consumerIdString: String, override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() } - override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { + override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { ResultWithPartitions(Map(), Set()) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f919ddf017c92..d39ba631f5c2d 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -76,7 +76,7 @@ abstract class AbstractFetcherThread(name: String, protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] - protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] + protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[REQ] @@ -276,11 +276,11 @@ abstract class AbstractFetcherThread(name: String, * * @param fetchOffsets the partitions to mark truncation complete */ - private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, Long]) { + private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, OffsetTruncationState]) { val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala .map { state => val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) match { - case Some(offset) => PartitionFetchState(offset, state.value.delay, truncatingLog = false) + case Some(offsetTruncationState) => PartitionFetchState(offsetTruncationState.offset, state.value.delay, truncatingLog = !offsetTruncationState.truncationCompleted) case None => state.value() } (state.topicPartition(), maybeTruncationComplete) @@ -446,3 +446,10 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncating override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog) } + +case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { + + def this (offset: Long) = this(offset, true) + + override def toString = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted) +} diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 0faf5dc383804..eba4493ebe502 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util import AbstractFetcherThread.ResultWithPartitions -import kafka.cluster.BrokerEndPoint +import kafka.cluster.{Replica, BrokerEndPoint} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest} @@ -36,7 +36,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords} import scala.collection.JavaConverters._ import scala.collection.{Map, Seq, Set, mutable} - +import scala.math._ class ReplicaAlterLogDirsThread(name: String, sourceBroker: BrokerEndPoint, @@ -164,19 +164,29 @@ class ReplicaAlterLogDirsThread(name: String, def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { partitions.map { case (tp, epoch) => try { - tp -> new EpochEndOffset(Errors.NONE, replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)) + val epochAndOffset = replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch) + tp -> new EpochEndOffset(Errors.NONE, epochAndOffset._1, epochAndOffset._2) } catch { case t: Throwable => warn(s"Error when getting EpochEndOffset for $tp", t) - tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET) + tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } } } - def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { - val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long] + def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { + val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.Set[TopicPartition]() + def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, futureReplica: Replica): OffsetTruncationState = { + val fetchOffset = + if (offsetToTruncateTo >= futureReplica.logEndOffset.messageOffset) + futureReplica.logEndOffset.messageOffset + else + offsetToTruncateTo + OffsetTruncationState(fetchOffset, truncationCompleted = true) + } + fetchedEpochs.foreach { case (topicPartition, epochOffset) => try { val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId) @@ -186,16 +196,32 @@ class ReplicaAlterLogDirsThread(name: String, info(s"Retrying leaderEpoch request for partition $topicPartition as the current replica reported an error: ${epochOffset.error}") partitionsWithError += topicPartition } else { - val fetchOffset = - if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) - partitionStates.stateValue(topicPartition).fetchOffset - else if (epochOffset.endOffset >= futureReplica.logEndOffset.messageOffset) - futureReplica.logEndOffset.messageOffset - else - epochOffset.endOffset - - partition.truncateTo(fetchOffset, isFuture = true) - fetchOffsets.put(topicPartition, fetchOffset) + val offsetTruncationState = + if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { + OffsetTruncationState(partitionStates.stateValue(topicPartition).fetchOffset, truncationCompleted = true) + } else if (epochOffset.leaderEpoch == UNDEFINED_EPOCH) { + // this may happen if the leader used version 0 of OffsetForLeaderEpoch request/response + finalFetchLeaderEpochOffset(epochOffset.endOffset, futureReplica) + } else { + // get (leader epoch, end offset) pair that corresponds to the largest leader epoch + // less than or equal to the requested epoch. + val epochAndOffset = futureReplica.epochs.get.endOffsetFor(epochOffset.leaderEpoch) + if (epochAndOffset._2 == UNDEFINED_EPOCH_OFFSET) { + // This can happen if replica was not tracking offsets at that point (before the + // upgrade, or if this broker is new). + finalFetchLeaderEpochOffset(epochOffset.endOffset, futureReplica) + } else if (epochAndOffset._1 != epochOffset.leaderEpoch) { + // the replica does not know about the epoch that leader replied with + val intermediateOffsetToTruncateTo = min(epochAndOffset._2, futureReplica.logEndOffset.messageOffset) + OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) + } else { + val offsetToTruncateTo = min(epochAndOffset._2, epochOffset.endOffset) + finalFetchLeaderEpochOffset(offsetToTruncateTo, futureReplica) + } + } + + partition.truncateTo(offsetTruncationState.offset, isFuture = true) + fetchOffsets.put(topicPartition, offsetTruncationState) } } catch { case e: KafkaStorageException => diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 8344d5beb349a..77b3ae1d3e898 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.{LogContext, Time} import scala.collection.JavaConverters._ import scala.collection.{Map, mutable} +import scala.math.min class ReplicaFetcherThread(name: String, fetcherId: Int, @@ -290,11 +291,26 @@ class ReplicaFetcherThread(name: String, * - If the leader's offset is greater, we stick with the Log End Offset * otherwise we truncate to the leaders offset. * - If the leader replied with undefined epoch offset we must use the high watermark + * - If the leader replied with leader epoch not known to this follower, we truncate to the + * end offset of the largest epoch that is smaller than the epoch the leader replied with, + * and send offset for leader epoch request with that leader epoch. */ - override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { - val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long] + override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { + val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.Set[TopicPartition]() + // Called when 'offsetToTruncateTo' is the final offset to truncate to. + // Returns log end offset if given 'offsetToTruncateTo' is equal or larger than log end + // offset and logs the message that truncation is not needed. Otherwise returns given 'offsetToTruncateTo' + def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, replica: Replica): OffsetTruncationState = { + val fetchOffset = + if (offsetToTruncateTo >= replica.logEndOffset.messageOffset) + logEndOffset(replica, offsetToTruncateTo) + else + offsetToTruncateTo + OffsetTruncationState(fetchOffset, truncationCompleted = true) + } + fetchedEpochs.foreach { case (tp, epochOffset) => try { val replica = replicaMgr.getReplicaOrException(tp) @@ -304,19 +320,49 @@ class ReplicaFetcherThread(name: String, info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}") partitionsWithError += tp } else { - val fetchOffset = + val offsetTruncationState = if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " + s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.") - partitionStates.stateValue(tp).fetchOffset - } else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset) - logEndOffset(replica, epochOffset) - else - epochOffset.endOffset - - partition.truncateTo(fetchOffset, isFuture = false) - replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset) - fetchOffsets.put(tp, fetchOffset) + OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true) + } else if (epochOffset.leaderEpoch == UNDEFINED_EPOCH) { + // this may happen if the leader used version 0 of OffsetForLeaderEpoch + // request/response + warn(s"Based on follower's leader epoch, leader replied with an unknown leader epoch in ${replica.topicPartition}. " + + s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") + finalFetchLeaderEpochOffset(epochOffset.endOffset, replica) + } else { + // get (leader epoch, end offset) pair that corresponds to the largest leader epoch + // less than or equal to the requested epoch. + val epochAndOffset = replica.epochs.get.endOffsetFor(epochOffset.leaderEpoch) + if (epochAndOffset._2 == UNDEFINED_EPOCH_OFFSET) { + // This can happen if replica was not tracking offsets at that point (before the + // upgrade, or if this broker is new). + // I think we then should truncate to start offset of epoch that we sent + // initially? but it's possible that the leader just wasn't trucking, so we + // cannot distinguish this situation? Unless this is epoch 0? Special case? + warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + + s"below any follower's tracked epochs for ${replica.topicPartition}. " + + s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") + finalFetchLeaderEpochOffset(epochOffset.endOffset, replica) + } else if (epochAndOffset._1 != epochOffset.leaderEpoch) { + // the replica does not know about the epoch that leader replied with + // we truncate to the end offset of the largest epoch that is smaller than the + // epoch the leader replied with, and send another offset for leader epoch request + val intermediateOffsetToTruncateTo = min(epochAndOffset._2, replica.logEndOffset.messageOffset) + warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + + s"unknown to the follower for ${replica.topicPartition}. " + + s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.") + OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) + } else { + val offsetToTruncateTo = min(epochAndOffset._2, epochOffset.endOffset) + finalFetchLeaderEpochOffset(offsetToTruncateTo, replica) + } + } + + partition.truncateTo(offsetTruncationState.offset, isFuture = false) + replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset) + fetchOffsets.put(tp, offsetTruncationState) } } catch { case e: KafkaStorageException => @@ -355,22 +401,22 @@ class ReplicaFetcherThread(name: String, // if we get any unexpected exception, mark all partitions with an error result = partitions.map { case (tp, _) => - tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET) + tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } } } else { // just generate a response with no error but UNDEFINED_OFFSET so that we can fall back to truncating using // high watermark in maybeTruncate() result = partitions.map { case (tp, _) => - tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET) + tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } } result } - private def logEndOffset(replica: Replica, epochOffset: EpochEndOffset): Long = { + private def logEndOffset(replica: Replica, epochOffset: Long): Long = { val logEndOffset = replica.logEndOffset.messageOffset - info(s"Based on follower's leader epoch, leader replied with an offset ${epochOffset.endOffset} >= the " + + info(s"Based on follower's leader epoch, leader replied with an offset $epochOffset >= the " + s"follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.") logEndOffset } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index da501174acdef..0895319f2a42b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1475,11 +1475,11 @@ class ReplicaManager(val config: KafkaConfig, val epochEndOffset = getPartition(tp) match { case Some(partition) => if (partition eq ReplicaManager.OfflinePartition) - new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET) + new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) else partition.lastOffsetForLeaderEpoch(leaderEpoch) case None => - new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) + new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } tp -> epochEndOffset } diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 220432d32c0f9..fe870ad0d184b 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.ListBuffer trait LeaderEpochCache { def assign(leaderEpoch: Int, offset: Long) def latestEpoch(): Int - def endOffsetFor(epoch: Int): Long + def endOffsetFor(epoch: Int): (Int, Long) def clearAndFlushLatest(offset: Long) def clearAndFlushEarliest(offset: Long) def clearAndFlush() @@ -81,36 +81,42 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM } /** - * Returns the End Offset for a requested Leader Epoch. + * Returns the Leader Epoch and the End Offset for a requested Leader Epoch. * - * This is defined as the start offset of the first Leader Epoch larger than the - * Leader Epoch requested, or else the Log End Offset if the latest epoch was requested. + * The Leader Epoch returned is the largest epoch less than or equal to the requested Leader + * Epoch. The End Offset is the end offset of this epoch, which is defined as the start offset + * of the first Leader Epoch larger than the Leader Epoch requested, or else the Log End + * Offset if the latest epoch was requested. * * During the upgrade phase, where there are existing messages may not have a leader epoch, * if requestedEpoch is < the first epoch cached, UNSUPPORTED_EPOCH_OFFSET will be returned * so that the follower falls back to High Water Mark. * - * @param requestedEpoch - * @return offset + * @param requestedEpoch requested leader epoch + * @return leader epoch and offset */ - override def endOffsetFor(requestedEpoch: Int): Long = { + override def endOffsetFor(requestedEpoch: Int): (Int, Long) = { inReadLock(lock) { - val offset = + val epochAndOffset = if (requestedEpoch == UNDEFINED_EPOCH) { // this may happen if a bootstrapping follower sends a request with undefined epoch or // a follower is on the older message format where leader epochs are not recorded - UNDEFINED_EPOCH_OFFSET + (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } else if (requestedEpoch == latestEpoch) { - leo().messageOffset + (requestedEpoch, leo().messageOffset) } else { val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch) if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) - UNDEFINED_EPOCH_OFFSET - else - subsequentEpochs.head.startOffset + (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) + else { + // we must get at least one element in previous epochs list, because if we are here, + // it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is + val previousEpochs = epochs.filter(e => e.epoch <= requestedEpoch) + (previousEpochs.last.epoch, subsequentEpochs.head.startOffset) + } } - debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch list of size ${epochs.size}") - offset + debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning epoch ${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size ${epochs.size}") + epochAndOffset } } diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index b95f66cba55ca..bf6db2fc3cfc8 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -21,6 +21,7 @@ import AbstractFetcherThread._ import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData} +import kafka.server.OffsetTruncationState import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors @@ -131,7 +132,7 @@ class AbstractFetcherThreadTest { override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() } - override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { + override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { ResultWithPartitions(Map(), Set()) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 2074044d0e129..f2344a0b474f2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -64,8 +64,8 @@ class ReplicaFetcherThreadTest { val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0)) val expected = Map( - t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET), - t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET) + t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), + t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) ) assertEquals("results from leader epoch request should have undefined offset", expected, result) @@ -95,8 +95,8 @@ class ReplicaFetcherThreadTest { val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0)) val expected = Map( - t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH_OFFSET), - t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH_OFFSET) + t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), + t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) ) assertEquals("results from leader epoch request should have undefined offset", expected, result) @@ -104,7 +104,7 @@ class ReplicaFetcherThreadTest { } @Test - def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = { + def shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth(): Unit = { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) //Setup all dependencies @@ -120,6 +120,7 @@ class ReplicaFetcherThreadTest { expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() expect(leaderEpochs.latestEpoch).andReturn(5) + expect(leaderEpochs.endOffsetFor(5)).andReturn((5, 0)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -131,7 +132,7 @@ class ReplicaFetcherThreadTest { replay(leaderEpochs, replicaManager, logManager, quota, replica) //Define the offsets for the OffsetsForLeaderEpochResponse - val offsets = Map(t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)).asJava + val offsets = Map(t1p0 -> new EpochEndOffset(5, 1), t1p1 -> new EpochEndOffset(5, 1)).asJava //Create the fetcher thread val endPoint = new BrokerEndPoint(0, "localhost", 1000) @@ -181,6 +182,7 @@ class ReplicaFetcherThreadTest { expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes() + expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLEO)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -188,7 +190,7 @@ class ReplicaFetcherThreadTest { replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation - val offsetsReply = Map(t1p0 -> new EpochEndOffset(156), t2p1 -> new EpochEndOffset(172)).asJava + val offsetsReply = Map(t1p0 -> new EpochEndOffset(5, 156), t2p1 -> new EpochEndOffset(5, 172)).asJava //Create the thread val endPoint = new BrokerEndPoint(0, "localhost", 1000) @@ -204,6 +206,72 @@ class ReplicaFetcherThreadTest { assertTrue(truncateToCapture.getValues.asScala.contains(172)) } + @Test + def shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower(): Unit = { + + // Create a capture to track what partitions/offsets are truncated + val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL) + + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + + // Setup all dependencies + val quota = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val logManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica = createNiceMock(classOf[Replica]) + val partition = createMock(classOf[Partition]) + val replicaManager = createMock(classOf[ReplicaManager]) + + val initialLEO = 200 + + // Stubs + expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes() + expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() + expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(5) + expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes() + expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes() + expect(replicaManager.logManager).andReturn(logManager).anyTimes() + expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() + stub(replica, partition, replicaManager) + + replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) + + // Define the offsets for the OffsetsForLeaderEpochResponse + val offsets = Map(t1p0 -> new EpochEndOffset(4, 155), t1p1 -> new EpochEndOffset(4, 143)).asJava + + // Create the fetcher thread + val endPoint = new BrokerEndPoint(0, "localhost", 1000) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) + + // Loop 1 -- both topic partitions will need to fetch another leader epoch + thread.doWork() + assertEquals(1, mockNetwork.epochFetchCount) + assertEquals(0, mockNetwork.fetchCount) + + // Loop 2 should do the second fetch for both topic partitions because the leader replied with + // epoch 4 while follower knows only about epoch 3 + val nextOffsets = Map(t1p0 -> new EpochEndOffset(3, 101), t1p1 -> new EpochEndOffset(3, 102)).asJava + mockNetwork.setOffsetsForNextResponse(nextOffsets) + thread.doWork() + assertEquals(2, mockNetwork.epochFetchCount) + assertEquals(1, mockNetwork.fetchCount) + + //Loop 3 we should not fetch epochs + thread.doWork() + assertEquals(2, mockNetwork.epochFetchCount) + assertEquals(2, mockNetwork.fetchCount) + + + //We should have truncated to the offsets in the second response + assertTrue("Capture trancate to values " + truncateToCapture.getValues, + truncateToCapture.getValues.asScala.contains(101)) + assertTrue(truncateToCapture.getValues.asScala.contains(102)) + } + @Test def shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset(): Unit = { @@ -234,7 +302,7 @@ class ReplicaFetcherThreadTest { replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation - val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava + val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava //Create the thread val endPoint = new BrokerEndPoint(0, "localhost", 1000) @@ -274,6 +342,8 @@ class ReplicaFetcherThreadTest { expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes() expect(leaderEpochs.latestEpoch).andReturn(5) + // this is for the last reply with EpochEndOffset(5, 156) + expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLeo)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -281,8 +351,8 @@ class ReplicaFetcherThreadTest { //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation val offsetsReply = mutable.Map( - t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH_OFFSET), - t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH_OFFSET) + t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET), + t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET) ).asJava //Create the thread @@ -300,7 +370,7 @@ class ReplicaFetcherThreadTest { assertEquals(0, truncated.getValues.size()) //New leader elected and replies - offsetsReply.put(t1p0, new EpochEndOffset(156)) + offsetsReply.put(t1p0, new EpochEndOffset(5, 156)) thread.doWork() @@ -325,6 +395,7 @@ class ReplicaFetcherThreadTest { expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() expect(leaderEpochs.latestEpoch).andReturn(5) + expect(leaderEpochs.endOffsetFor(5)).andReturn((5, 0)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -333,7 +404,7 @@ class ReplicaFetcherThreadTest { //Define the offsets for the OffsetsForLeaderEpochResponse val offsetsReply = Map( - t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1) + t1p0 -> new EpochEndOffset(5, 1), t1p1 -> new EpochEndOffset(5, 1) ).asJava //Create the fetcher thread @@ -374,6 +445,7 @@ class ReplicaFetcherThreadTest { expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() expect(leaderEpochs.latestEpoch).andReturn(5) + expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLEO)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -382,7 +454,7 @@ class ReplicaFetcherThreadTest { //Define the offsets for the OffsetsForLeaderEpochResponse val offsetsReply = Map( - t1p0 -> new EpochEndOffset(52), t1p1 -> new EpochEndOffset(49) + t1p0 -> new EpochEndOffset(5, 52), t1p1 -> new EpochEndOffset(5, 49) ).asJava //Create the fetcher thread @@ -417,4 +489,4 @@ class ReplicaFetcherThreadTest { expect(replicaManager.getReplicaOrException(t2p1)).andReturn(replica).anyTimes() expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes() } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index 6288d8faf1d94..7b07c6f1be7be 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -38,6 +38,7 @@ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable.{ListBuffer => Buffer} +import scala.collection.Seq /** * These tests were written to assert the addition of leader epochs to the replication protocol fix the problems @@ -298,6 +299,86 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness } } + @Test + def logsShouldNotDivergeOnUncleanLeaderElections(): Unit = { + + // Given two brokers, unclean leader election is enabled + brokers = (100 to 101).map(createBroker(_, enableUncleanLeaderElection = true)) + + // A single partition topic with 2 replicas, min.isr = 1 + adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK( + topic, Map(0 -> Seq(100, 101)), config = CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1")) + ) + producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1) + + // Write one message while both brokers are up + (0 until 1).foreach { i => + producer.send(new ProducerRecord(topic, 0, null, msg)) + producer.flush()} + + // Since we use producer with acks = 1, make sure that logs match for the first epoch + waitForLogsToMatch(brokers(0), brokers(1)) + + // shutdown broker 100 + brokers(0).shutdown() + + //Write 1 message + (0 until 1).foreach { i => + producer.send(new ProducerRecord(topic, 0, null, msg)) + producer.flush()} + + brokers(1).shutdown() + brokers(0).startup() + + //Bounce the producer (this is required, although I'm unsure as to why?) + producer.close() + producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1) + + //Write 3 messages + (0 until 3).foreach { i => + producer.send(new ProducerRecord(topic, 0, null, msgBigger)) + producer.flush()} + + brokers(0).shutdown() + brokers(1).startup() + + //Bounce the producer (this is required, although I'm unsure as to why?) + producer.close() + producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1) + + //Write 1 message + (0 until 1).foreach { i => + producer.send(new ProducerRecord(topic, 0, null, msg)) + producer.flush()} + + brokers(1).shutdown() + brokers(0).startup() + + //Bounce the producer (this is required, although I'm unsure as to why?) + producer.close() + producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1) + + //Write 2 messages + (0 until 2).foreach { i => + producer.send(new ProducerRecord(topic, 0, null, msgBigger)) + producer.flush()} + + printSegments() + + brokers(1).startup() + + waitForLogsToMatch(brokers(0), brokers(1)) + printSegments() + + def crcSeq(broker: KafkaServer, partition: Int = 0): Seq[Long] = { + val batches = getLog(broker, partition).activeSegment.read(0, None, Integer.MAX_VALUE) + .records.batches().asScala.toSeq + batches.map(_.checksum) + } + assertTrue(s"Logs on Broker 100 and Broker 101 should match", + crcSeq(brokers(0)) == crcSeq(brokers(1))) + } + private def log(leader: KafkaServer, follower: KafkaServer): Unit = { info(s"Bounce complete for follower ${follower.config.brokerId}") info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries()) @@ -389,12 +470,13 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness brokers.filter(_.config.brokerId != leader)(0) } - private def createBroker(id: Int): KafkaServer = { + private def createBroker(id: Int, enableUncleanLeaderElection: Boolean = false): KafkaServer = { val config = createBrokerConfig(id, zkConnect) if(!KIP_101_ENABLED) { config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_0_11_0_IV1.version) config.setProperty(KafkaConfig.LogMessageFormatVersionProp, KAFKA_0_11_0_IV1.version) } + config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, enableUncleanLeaderElection.toString) createServer(fromProps(config)) } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 4a8df11f8a367..657856b151059 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -50,7 +50,7 @@ class LeaderEpochFileCacheTest { //Then assertEquals(2, cache.latestEpoch()) assertEquals(EpochEntry(2, 10), cache.epochEntries()(0)) - assertEquals(11, cache.endOffsetFor(2)) //should match leo + assertEquals((2, 11), cache.endOffsetFor(2)) //should match leo } @Test @@ -67,7 +67,7 @@ class LeaderEpochFileCacheTest { leo = 14 //Then - assertEquals(14, cache.endOffsetFor(2)) + assertEquals((2, 14), cache.endOffsetFor(2)) } @Test @@ -80,10 +80,10 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 3, offset = 12) //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH - val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) + val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) //Then - assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor) + assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffsetFor) } @Test @@ -140,7 +140,7 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //Then - assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0)) + assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0)) } @Test @@ -170,10 +170,10 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 7, offset = 13) //When - val offset = cache.endOffsetFor(5 - 1) + val epochAndOffset = cache.endOffsetFor(5 - 1) //Then - assertEquals(UNDEFINED_EPOCH_OFFSET, offset) + assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset) } @Test @@ -194,7 +194,7 @@ class LeaderEpochFileCacheTest { leo = 17 //Then get the start offset of the next epoch - assertEquals(15, cache.endOffsetFor(2)) + assertEquals((2, 15), cache.endOffsetFor(2)) } @Test @@ -210,8 +210,9 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 4, offset = 17) //Then - assertEquals(13, cache.endOffsetFor(requestedEpoch = 1)) - assertEquals(17, cache.endOffsetFor(requestedEpoch = 2)) + assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1)) + assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 2)) + assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 3)) } @Test @@ -242,7 +243,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 2, offset = 100) //Then - assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3)) + assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3)) } @Test @@ -258,7 +259,7 @@ class LeaderEpochFileCacheTest { leo = 7 //Then - assertEquals(leo, cache.endOffsetFor(2)) + assertEquals((2, leo), cache.endOffsetFor(2)) assertEquals(1, cache.epochEntries.size) assertEquals(EpochEntry(2, 6), cache.epochEntries()(0)) } @@ -300,10 +301,10 @@ class LeaderEpochFileCacheTest { assertEquals(2, cache.latestEpoch()) //Then end offset for epoch 1 shouldn't have changed - assertEquals(6, cache.endOffsetFor(1)) + assertEquals((1, 6), cache.endOffsetFor(1)) //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't thing of a better option) - assertEquals(8, cache.endOffsetFor(2)) + assertEquals((2, 8), cache.endOffsetFor(2)) //Epoch history shouldn't have changed assertEquals(EpochEntry(1, 5), cache.epochEntries()(0)) @@ -340,17 +341,17 @@ class LeaderEpochFileCacheTest { //Then epoch should go up assertEquals(1, cache.latestEpoch()) //offset for 1 should still be 0 - assertEquals(0, cache.endOffsetFor(1)) + assertEquals((1, 0), cache.endOffsetFor(1)) //offset for epoch 0 should still be 0 - assertEquals(0, cache.endOffsetFor(0)) + assertEquals((0, 0), cache.endOffsetFor(0)) //When we write 5 messages as epoch 1 leo = 5 //Then end offset for epoch(1) should be leo => 5 - assertEquals(5, cache.endOffsetFor(1)) + assertEquals((1, 5), cache.endOffsetFor(1)) //Epoch 0 should still be at offset 0 - assertEquals(0, cache.endOffsetFor(0)) + assertEquals((0, 0), cache.endOffsetFor(0)) //When cache.assign(epoch = 2, offset = 5) //leo=5 @@ -358,13 +359,13 @@ class LeaderEpochFileCacheTest { leo = 10 //write another 5 messages //Then end offset for epoch(2) should be leo => 10 - assertEquals(10, cache.endOffsetFor(2)) + assertEquals((2, 10), cache.endOffsetFor(2)) //end offset for epoch(1) should be the start offset of epoch(2) => 5 - assertEquals(5, cache.endOffsetFor(1)) + assertEquals((1, 5), cache.endOffsetFor(1)) //epoch (0) should still be 0 - assertEquals(0, cache.endOffsetFor(0)) + assertEquals((0, 0), cache.endOffsetFor(0)) } @Test @@ -382,7 +383,7 @@ class LeaderEpochFileCacheTest { //Then epoch should stay, offsets should grow assertEquals(0, cache.latestEpoch()) - assertEquals(leo, cache.endOffsetFor(0)) + assertEquals((0, leo), cache.endOffsetFor(0)) //When messages arrive with greater epoch cache.assign(epoch = 1, offset = 3); leo = 4 @@ -390,7 +391,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 1, offset = 5); leo = 6 assertEquals(1, cache.latestEpoch()) - assertEquals(leo, cache.endOffsetFor(1)) + assertEquals((1, leo), cache.endOffsetFor(1)) //When cache.assign(epoch = 2, offset = 6); leo = 7 @@ -398,11 +399,11 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 2, offset = 8); leo = 9 assertEquals(2, cache.latestEpoch()) - assertEquals(leo, cache.endOffsetFor(2)) + assertEquals((2, leo), cache.endOffsetFor(2)) //Older epochs should return the start offset of the first message in the subsequent epoch. - assertEquals(3, cache.endOffsetFor(0)) - assertEquals(6, cache.endOffsetFor(1)) + assertEquals((0, 3), cache.endOffsetFor(0)) + assertEquals((1, 6), cache.endOffsetFor(1)) } @Test @@ -648,7 +649,7 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //Then - assertEquals(-1, cache.endOffsetFor(7)) + assertEquals((UNDEFINED_EPOCH, -1), cache.endOffsetFor(7)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 1c01d622438eb..5c60c0017db38 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -41,7 +41,7 @@ class OffsetsForLeaderEpochTest { @Test def shouldGetEpochsFromReplica(): Unit = { //Given - val offset = 42 + val epochAndOffset = (5, 42L) val epochRequested: Integer = 5 val request = Map(tp -> epochRequested) @@ -49,7 +49,7 @@ class OffsetsForLeaderEpochTest { val mockLog = createNiceMock(classOf[kafka.log.Log]) val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) val logManager = createNiceMock(classOf[kafka.log.LogManager]) - expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset) + expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset) expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() replay(mockCache, mockLog, logManager) @@ -67,7 +67,7 @@ class OffsetsForLeaderEpochTest { val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp)) + assertEquals(new EpochEndOffset(Errors.NONE, epochAndOffset._1, epochAndOffset._2), response(tp)) } @Test @@ -90,7 +90,7 @@ class OffsetsForLeaderEpochTest { val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) + assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp)) } @Test @@ -112,6 +112,6 @@ class OffsetsForLeaderEpochTest { val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) + assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp)) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala index 1f5bec1cf8220..a0585a276297f 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala @@ -28,17 +28,27 @@ import org.apache.kafka.common.utils.{SystemTime, Time} /** * Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing + * + * The common case is that there is only one OFFSET_FOR_LEADER_EPOCH request/response. So, the + * response to OFFSET_FOR_LEADER_EPOCH is 'offsets' map. If the test needs to set another round of + * OFFSET_FOR_LEADER_EPOCH with different offsets in response, it should update offsets using + * setOffsetsForNextResponse */ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend { private val client = new MockClient(new SystemTime) var fetchCount = 0 var epochFetchCount = 0 var callback: Option[() => Unit] = None + var updatedOffsetsOpt: Option[java.util.Map[TopicPartition, EpochEndOffset]] = None def setEpochRequestCallback(postEpochFunction: () => Unit){ callback = Some(postEpochFunction) } + def setOffsetsForNextResponse(newOffsets: java.util.Map[TopicPartition, EpochEndOffset]): Unit = { + updatedOffsetsOpt = Some(newOffsets) + } + override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { //Send the request to the mock client @@ -50,7 +60,11 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc case ApiKeys.OFFSET_FOR_LEADER_EPOCH => callback.foreach(_.apply()) epochFetchCount += 1 - new OffsetsForLeaderEpochResponse(offsets) + val offsetsForResponse = updatedOffsetsOpt match { + case Some(updatedOffsets) => updatedOffsets + case None => offsets + } + new OffsetsForLeaderEpochResponse(offsetsForResponse) case ApiKeys.FETCH => fetchCount += 1 @@ -75,4 +89,4 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc } override def close(): Unit = {} -} \ No newline at end of file +} From 00bca8153951a376185816226e6abb78f79756fa Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 16 Apr 2018 12:59:38 -0700 Subject: [PATCH 02/24] set EpochEndOffset.UNDEFINED_EPOCH default to NO_PARTITION_LEADER_EPOCH (same -1 as before) --- .../java/org/apache/kafka/common/requests/EpochEndOffset.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java index b6a973e37c34e..ce938aad4f192 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java @@ -28,7 +28,7 @@ public class EpochEndOffset { public static final long UNDEFINED_EPOCH_OFFSET = NO_PARTITION_LEADER_EPOCH; - public static final int UNDEFINED_EPOCH = -1; + public static final int UNDEFINED_EPOCH = NO_PARTITION_LEADER_EPOCH; private Errors error; private int leaderEpoch; // introduced in V1 From bc514780bfa78a36819220a413d2857c5671af44 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Tue, 24 Apr 2018 21:16:24 -0700 Subject: [PATCH 03/24] some cleanup and tests --- .../main/scala/kafka/cluster/Partition.scala | 4 +- .../kafka/server/ReplicaFetcherThread.scala | 10 +- .../server/epoch/LeaderEpochFileCache.scala | 6 +- .../server/ReplicaFetcherThreadTest.scala | 129 ++++++++++++------ .../epoch/LeaderEpochFileCacheTest.scala | 18 ++- 5 files changed, 113 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d9d62449c5af7..2179f90eb2b3a 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -655,8 +655,8 @@ class Partition(val topic: String, inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => - val epochAndOffset = leaderReplica.epochs.get.endOffsetFor(leaderEpoch) - new EpochEndOffset(NONE, epochAndOffset._1, epochAndOffset._2) + val (epoch, offset) = leaderReplica.epochs.get.endOffsetFor(leaderEpoch) + new EpochEndOffset(NONE, epoch, offset) case None => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 77b3ae1d3e898..502c144093b34 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -334,8 +334,8 @@ class ReplicaFetcherThread(name: String, } else { // get (leader epoch, end offset) pair that corresponds to the largest leader epoch // less than or equal to the requested epoch. - val epochAndOffset = replica.epochs.get.endOffsetFor(epochOffset.leaderEpoch) - if (epochAndOffset._2 == UNDEFINED_EPOCH_OFFSET) { + val (replicaLeaderEpoch, replicaEndOffset) = replica.epochs.get.endOffsetFor (epochOffset.leaderEpoch) + if (replicaEndOffset == UNDEFINED_EPOCH_OFFSET) { // This can happen if replica was not tracking offsets at that point (before the // upgrade, or if this broker is new). // I think we then should truncate to start offset of epoch that we sent @@ -345,17 +345,17 @@ class ReplicaFetcherThread(name: String, s"below any follower's tracked epochs for ${replica.topicPartition}. " + s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") finalFetchLeaderEpochOffset(epochOffset.endOffset, replica) - } else if (epochAndOffset._1 != epochOffset.leaderEpoch) { + } else if (replicaLeaderEpoch != epochOffset.leaderEpoch) { // the replica does not know about the epoch that leader replied with // we truncate to the end offset of the largest epoch that is smaller than the // epoch the leader replied with, and send another offset for leader epoch request - val intermediateOffsetToTruncateTo = min(epochAndOffset._2, replica.logEndOffset.messageOffset) + val intermediateOffsetToTruncateTo = min(replicaEndOffset, replica.logEndOffset.messageOffset) warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + s"unknown to the follower for ${replica.topicPartition}. " + s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.") OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) } else { - val offsetToTruncateTo = min(epochAndOffset._2, epochOffset.endOffset) + val offsetToTruncateTo = min(replicaEndOffset, epochOffset.endOffset) finalFetchLeaderEpochOffset(offsetToTruncateTo, replica) } } diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index fe870ad0d184b..23a53056f3209 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -20,7 +20,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.server.LogOffsetMetadata import kafka.server.checkpoints.LeaderEpochCheckpoint -import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} +import org.apache.kafka.common.requests.EpochEndOffset._ import kafka.utils.CoreUtils._ import kafka.utils.Logging import org.apache.kafka.common.TopicPartition @@ -105,13 +105,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM } else if (requestedEpoch == latestEpoch) { (requestedEpoch, leo().messageOffset) } else { - val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch) + val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch} if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) + // no epochs recorded or requested epoch < the first epoch cached (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) else { // we must get at least one element in previous epochs list, because if we are here, // it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is - val previousEpochs = epochs.filter(e => e.epoch <= requestedEpoch) (previousEpochs.last.epoch, subsequentEpochs.head.startOffset) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index f2344a0b474f2..dd6499be6ca9d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.requests.EpochEndOffset +import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.utils.SystemTime import org.easymock.EasyMock._ import org.easymock.{Capture, CaptureType} @@ -43,17 +44,18 @@ class ReplicaFetcherThreadTest { private val t1p1 = new TopicPartition("topic1", 1) private val t2p1 = new TopicPartition("topic2", 1) + private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000) + @Test def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = { val props = TestUtils.createBrokerConfig(1, "localhost:1234") props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2") props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2") val config = KafkaConfig.fromProps(props) - val endPoint = new BrokerEndPoint(0, "localhost", 1000) val thread = new ReplicaFetcherThread( name = "bob", fetcherId = 0, - sourceBroker = endPoint, + sourceBroker = brokerEndPoint, brokerConfig = config, replicaMgr = null, metrics = new Metrics(), @@ -75,7 +77,6 @@ class ReplicaFetcherThreadTest { def shouldHandleExceptionFromBlockingSend(): Unit = { val props = TestUtils.createBrokerConfig(1, "localhost:1234") val config = KafkaConfig.fromProps(props) - val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockBlockingSend = createMock(classOf[BlockingSend]) expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once() @@ -84,7 +85,7 @@ class ReplicaFetcherThreadTest { val thread = new ReplicaFetcherThread( name = "bob", fetcherId = 0, - sourceBroker = endPoint, + sourceBroker = brokerEndPoint, brokerConfig = config, replicaMgr = null, metrics = new Metrics(), @@ -116,28 +117,28 @@ class ReplicaFetcherThreadTest { val partition = createMock(classOf[Partition]) val replicaManager = createMock(classOf[ReplicaManager]) + val leaderEpoch = 5 + //Stubs expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() - expect(leaderEpochs.latestEpoch).andReturn(5) - expect(leaderEpochs.endOffsetFor(5)).andReturn((5, 0)).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch) + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 0)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) - //Expectations expect(partition.truncateTo(anyLong(), anyBoolean())).once replay(leaderEpochs, replicaManager, logManager, quota, replica) //Define the offsets for the OffsetsForLeaderEpochResponse - val offsets = Map(t1p0 -> new EpochEndOffset(5, 1), t1p1 -> new EpochEndOffset(5, 1)).asJava + val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava //Create the fetcher thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) //Loop 1 @@ -175,14 +176,15 @@ class ReplicaFetcherThreadTest { val partition = createMock(classOf[Partition]) val replicaManager = createMock(classOf[ReplicaManager]) + val leaderEpoch = 5 val initialLEO = 200 //Stubs expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() - expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes() - expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLEO)).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes() + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, initialLEO)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -190,12 +192,11 @@ class ReplicaFetcherThreadTest { replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation - val offsetsReply = Map(t1p0 -> new EpochEndOffset(5, 156), t2p1 -> new EpochEndOffset(5, 172)).asJava + val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 156), t2p1 -> new EpochEndOffset(leaderEpoch, 172)).asJava //Create the thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0)) //Run it @@ -206,6 +207,56 @@ class ReplicaFetcherThreadTest { assertTrue(truncateToCapture.getValues.asScala.contains(172)) } + @Test + def shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs(): Unit = { + // Create a capture to track what partitions/offsets are truncated + val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL) + + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + + // Setup all the dependencies + val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) + val quota = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs = createMock(classOf[LeaderEpochCache]) + val logManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica = createNiceMock(classOf[Replica]) + val partition = createMock(classOf[Partition]) + val replicaManager = createMock(classOf[ReplicaManager]) + + val leaderEpochAtFollower = 5 + val leaderEpochAtLeader = 4 + val initialLEO = 200 + + //Stubs + expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes() + expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() + expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(leaderEpochAtFollower).anyTimes() + expect(leaderEpochs.endOffsetFor(leaderEpochAtLeader)).andReturn((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).anyTimes() + expect(replicaManager.logManager).andReturn(logManager).anyTimes() + expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() + stub(replica, partition, replicaManager) + + replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) + + //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation + val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpochAtLeader, 156), + t2p1 -> new EpochEndOffset(leaderEpochAtLeader, 202)).asJava + + //Create the thread + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0)) + + //Run it + thread.doWork() + + //We should have truncated to the offsets in the response + assertTrue(truncateToCapture.getValues.asScala.contains(156)) + assertTrue(truncateToCapture.getValues.asScala.contains(initialLEO)) + } + @Test def shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower(): Unit = { @@ -242,9 +293,8 @@ class ReplicaFetcherThreadTest { val offsets = Map(t1p0 -> new EpochEndOffset(4, 155), t1p1 -> new EpochEndOffset(4, 143)).asJava // Create the fetcher thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) // Loop 1 -- both topic partitions will need to fetch another leader epoch @@ -268,7 +318,7 @@ class ReplicaFetcherThreadTest { //We should have truncated to the offsets in the second response assertTrue("Capture trancate to values " + truncateToCapture.getValues, - truncateToCapture.getValues.asScala.contains(101)) + truncateToCapture.getValues.asScala.contains(102)) assertTrue(truncateToCapture.getValues.asScala.contains(102)) } @@ -305,15 +355,14 @@ class ReplicaFetcherThreadTest { val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava //Create the thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) thread.addPartitions(Map(t1p0 -> initialFetchOffset)) //Run it thread.doWork() - //We should have truncated to the highwatermark for partitino 2 only + //We should have truncated to initial fetch offset assertEquals(initialFetchOffset, truncated.getValue) } @@ -333,6 +382,7 @@ class ReplicaFetcherThreadTest { val partition = createMock(classOf[Partition]) val replicaManager = createMock(classOf[kafka.server.ReplicaManager]) + val leaderEpoch = 5 val highWaterMark = 100 val initialLeo = 300 @@ -341,9 +391,9 @@ class ReplicaFetcherThreadTest { expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes() - expect(leaderEpochs.latestEpoch).andReturn(5) + expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch) // this is for the last reply with EpochEndOffset(5, 156) - expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLeo)).anyTimes() + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, initialLeo)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -356,9 +406,8 @@ class ReplicaFetcherThreadTest { ).asJava //Create the thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0)) //Run thread 3 times @@ -370,7 +419,7 @@ class ReplicaFetcherThreadTest { assertEquals(0, truncated.getValues.size()) //New leader elected and replies - offsetsReply.put(t1p0, new EpochEndOffset(5, 156)) + offsetsReply.put(t1p0, new EpochEndOffset(leaderEpoch, 156)) thread.doWork() @@ -391,11 +440,13 @@ class ReplicaFetcherThreadTest { val partition = createMock(classOf[Partition]) val replicaManager = createNiceMock(classOf[ReplicaManager]) + val leaderEpoch = 4 + //Stub return values expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() - expect(leaderEpochs.latestEpoch).andReturn(5) - expect(leaderEpochs.endOffsetFor(5)).andReturn((5, 0)).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch) + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 0)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) @@ -404,13 +455,12 @@ class ReplicaFetcherThreadTest { //Define the offsets for the OffsetsForLeaderEpochResponse val offsetsReply = Map( - t1p0 -> new EpochEndOffset(5, 1), t1p1 -> new EpochEndOffset(5, 1) + t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1) ).asJava //Create the fetcher thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) //When thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) @@ -458,9 +508,8 @@ class ReplicaFetcherThreadTest { ).asJava //Create the fetcher thread - val endPoint = new BrokerEndPoint(0, "localhost", 1000) - val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) - val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) //When thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 657856b151059..bae43c896a15e 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -50,7 +50,7 @@ class LeaderEpochFileCacheTest { //Then assertEquals(2, cache.latestEpoch()) assertEquals(EpochEntry(2, 10), cache.epochEntries()(0)) - assertEquals((2, 11), cache.endOffsetFor(2)) //should match leo + assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo } @Test @@ -67,15 +67,24 @@ class LeaderEpochFileCacheTest { leo = 14 //Then - assertEquals((2, 14), cache.endOffsetFor(2)) + assertEquals((leo, 14), cache.endOffsetFor(2)) } @Test def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = { def leoFinder() = new LogOffsetMetadata(0) + val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) //Given cache with some data on leader val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + + // when requesting undefined offset from empty cache + val epochAndOffsetForUndefined = cache.endOffsetFor(UNDEFINED_EPOCH) + // should return undefined offset + assertEquals("Expected undefined epoch and offset if undefined epoch requested. Empty cache.", + expectedEpochEndOffset, epochAndOffsetForUndefined) + + // assign couple of epochs cache.assign(epoch = 2, offset = 11) cache.assign(epoch = 3, offset = 12) @@ -83,7 +92,8 @@ class LeaderEpochFileCacheTest { val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) //Then - assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffsetFor) + assertEquals("Expected undefined epoch and offset if undefined epoch requested. Cache not empty.", + expectedEpochEndOffset, epochAndOffsetFor) } @Test @@ -303,7 +313,7 @@ class LeaderEpochFileCacheTest { //Then end offset for epoch 1 shouldn't have changed assertEquals((1, 6), cache.endOffsetFor(1)) - //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't thing of a better option) + //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't think of a better option) assertEquals((2, 8), cache.endOffsetFor(2)) //Epoch history shouldn't have changed From 4fce82fd5263c8eb69295f2a4db127cb5f2166cb Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Wed, 25 Apr 2018 16:09:16 -0700 Subject: [PATCH 04/24] Few small fixes to truncation logic --- .../kafka/server/AbstractFetcherThread.scala | 31 ++++++++++++++++- .../server/ReplicaAlterLogDirsThread.scala | 33 ++++++++----------- .../kafka/server/ReplicaFetcherThread.scala | 30 ++++------------- 3 files changed, 50 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index d39ba631f5c2d..c302b7293d4c3 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.concurrent.locks.ReentrantLock -import kafka.cluster.BrokerEndPoint +import kafka.cluster.{Replica, BrokerEndPoint} import kafka.utils.{DelayedItem, Pool, ShutdownableThread} import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException} import kafka.common.{ClientIdAndBroker, KafkaException} @@ -288,6 +288,35 @@ abstract class AbstractFetcherThread(name: String, partitionStates.set(newStates.asJava) } + /** + * Called from ReplicaFetcherThread and ReplicaAlterLogDirsThread maybeTruncate when + * 'offsetToTruncateTo' is the final offset to truncate to. + * @param offsetToTruncateTo Final offset to truncate to + * @param replica Follower's replica, which is either local replica (ReplicaFetcherThread) + * or future replica (ReplicaAlterLogDirsThread) + * @return Log end offset if given 'offsetToTruncateTo' is equal or larger + * than log end offset and logs the message that truncation is not + * needed. Otherwise, returns given 'offsetToTruncateTo' + */ + def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, replica: Replica, isFutureReplica: Boolean = false): OffsetTruncationState = { + val fetchOffset = + if (offsetToTruncateTo >= replica.logEndOffset.messageOffset) { + val logEndOffset = replica.logEndOffset.messageOffset + // to make sure we can distinguish log output for fetching from remote leader or local replica + val followerName = + if (isFutureReplica) + "future replica" + else + "follower" + info( + s"Based on $followerName's leader epoch, leader replied with an offset $logEndOffset >= the " + + s"$followerName's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.") + logEndOffset + } else + offsetToTruncateTo + OffsetTruncationState(fetchOffset, truncationCompleted = true) + } + def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) { partitionMapLock.lockInterruptibly() try { diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index eba4493ebe502..cd0adef0e39a5 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -178,15 +178,6 @@ class ReplicaAlterLogDirsThread(name: String, val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.Set[TopicPartition]() - def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, futureReplica: Replica): OffsetTruncationState = { - val fetchOffset = - if (offsetToTruncateTo >= futureReplica.logEndOffset.messageOffset) - futureReplica.logEndOffset.messageOffset - else - offsetToTruncateTo - OffsetTruncationState(fetchOffset, truncationCompleted = true) - } - fetchedEpochs.foreach { case (topicPartition, epochOffset) => try { val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId) @@ -198,25 +189,29 @@ class ReplicaAlterLogDirsThread(name: String, } else { val offsetTruncationState = if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { + // since both replicas are on the same broker, they are using same version of OffsetForLeaderEpoch + // request/response, so we cannot get a valid epoch with undefined offset; log warn + // in case that happens so we know we made a wrong assumption somewhere + if (epochOffset.leaderEpoch != UNDEFINED_EPOCH) + info(s"Replica responded with undefined offset but a valid leader epoch ${epochOffset.leaderEpoch} for $topicPartition") OffsetTruncationState(partitionStates.stateValue(topicPartition).fetchOffset, truncationCompleted = true) - } else if (epochOffset.leaderEpoch == UNDEFINED_EPOCH) { - // this may happen if the leader used version 0 of OffsetForLeaderEpoch request/response - finalFetchLeaderEpochOffset(epochOffset.endOffset, futureReplica) } else { // get (leader epoch, end offset) pair that corresponds to the largest leader epoch // less than or equal to the requested epoch. - val epochAndOffset = futureReplica.epochs.get.endOffsetFor(epochOffset.leaderEpoch) - if (epochAndOffset._2 == UNDEFINED_EPOCH_OFFSET) { + val (futureLeaderEpoch, futureEndOffset) = futureReplica.epochs.get.endOffsetFor (epochOffset.leaderEpoch) + if (futureEndOffset == UNDEFINED_EPOCH_OFFSET) { // This can happen if replica was not tracking offsets at that point (before the // upgrade, or if this broker is new). - finalFetchLeaderEpochOffset(epochOffset.endOffset, futureReplica) - } else if (epochAndOffset._1 != epochOffset.leaderEpoch) { + // to be safe, we truncate to min of start offset and leader's end offset + val offsetToTruncateTo = min(partitionStates.stateValue(topicPartition).fetchOffset, epochOffset.endOffset) + finalFetchLeaderEpochOffset(offsetToTruncateTo, futureReplica, isFutureReplica = true) + } else if (futureLeaderEpoch != epochOffset.leaderEpoch) { // the replica does not know about the epoch that leader replied with - val intermediateOffsetToTruncateTo = min(epochAndOffset._2, futureReplica.logEndOffset.messageOffset) + val intermediateOffsetToTruncateTo = min(futureEndOffset, futureReplica.logEndOffset.messageOffset) OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) } else { - val offsetToTruncateTo = min(epochAndOffset._2, epochOffset.endOffset) - finalFetchLeaderEpochOffset(offsetToTruncateTo, futureReplica) + val offsetToTruncateTo = min(futureEndOffset, epochOffset.endOffset) + finalFetchLeaderEpochOffset(offsetToTruncateTo, futureReplica, isFutureReplica = true) } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 502c144093b34..6d9505171e684 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -299,18 +299,6 @@ class ReplicaFetcherThread(name: String, val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.Set[TopicPartition]() - // Called when 'offsetToTruncateTo' is the final offset to truncate to. - // Returns log end offset if given 'offsetToTruncateTo' is equal or larger than log end - // offset and logs the message that truncation is not needed. Otherwise returns given 'offsetToTruncateTo' - def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, replica: Replica): OffsetTruncationState = { - val fetchOffset = - if (offsetToTruncateTo >= replica.logEndOffset.messageOffset) - logEndOffset(replica, offsetToTruncateTo) - else - offsetToTruncateTo - OffsetTruncationState(fetchOffset, truncationCompleted = true) - } - fetchedEpochs.foreach { case (tp, epochOffset) => try { val replica = replicaMgr.getReplicaOrException(tp) @@ -337,10 +325,9 @@ class ReplicaFetcherThread(name: String, val (replicaLeaderEpoch, replicaEndOffset) = replica.epochs.get.endOffsetFor (epochOffset.leaderEpoch) if (replicaEndOffset == UNDEFINED_EPOCH_OFFSET) { // This can happen if replica was not tracking offsets at that point (before the - // upgrade, or if this broker is new). - // I think we then should truncate to start offset of epoch that we sent - // initially? but it's possible that the leader just wasn't trucking, so we - // cannot distinguish this situation? Unless this is epoch 0? Special case? + // upgrade, or if this broker is new). Since the leader replied with epoch <= + // requested epoch from follower, so should be safe to truncate to leader's + // offset (this is the same behavior as post-KIP-101 and pre-KIP-279) warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + s"below any follower's tracked epochs for ${replica.topicPartition}. " + s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") @@ -361,7 +348,9 @@ class ReplicaFetcherThread(name: String, } partition.truncateTo(offsetTruncationState.offset, isFuture = false) - replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset) + // mark the future replica for truncation only when we do last truncation + if (offsetTruncationState.truncationCompleted) + replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset) fetchOffsets.put(tp, offsetTruncationState) } } catch { @@ -414,13 +403,6 @@ class ReplicaFetcherThread(name: String, result } - private def logEndOffset(replica: Replica, epochOffset: Long): Long = { - val logEndOffset = replica.logEndOffset.messageOffset - info(s"Based on follower's leader epoch, leader replied with an offset $epochOffset >= the " + - s"follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.") - logEndOffset - } - /** * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, * the quota is exceeded and the replica is not in sync. From 2f6cf3b31e6c11a24a2aeb6b13af4a46fbf34100 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Wed, 25 Apr 2018 16:21:51 -0700 Subject: [PATCH 05/24] fixed merge errors --- .../ReplicaAlterLogDirsThreadTest.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index a0f1dae8c76e0..c78ee7f50f3c6 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -61,7 +61,7 @@ class ReplicaAlterLogDirsThreadTest { //Stubs expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() - expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes() + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, leo)).anyTimes() stub(replica, replica, futureReplica, partition, replicaManager) replay(leaderEpochs, replicaManager, replica) @@ -78,8 +78,8 @@ class ReplicaAlterLogDirsThreadTest { val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch)) val expected = Map( - t1p0 -> new EpochEndOffset(Errors.NONE, leo), - t1p1 -> new EpochEndOffset(Errors.NONE, leo) + t1p0 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo), + t1p1 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo) ) assertEquals("results from leader epoch request should have offset from local replica", @@ -101,7 +101,7 @@ class ReplicaAlterLogDirsThreadTest { //Stubs expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() - expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes() + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, leo)).anyTimes() expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes() expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes() expect(replicaManager.getReplicaOrException(t1p1)).andThrow(new KafkaStorageException).once() @@ -121,8 +121,8 @@ class ReplicaAlterLogDirsThreadTest { val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch)) val expected = Map( - t1p0 -> new EpochEndOffset(Errors.NONE, leo), - t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET) + t1p0 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo), + t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) ) assertEquals(expected, result) @@ -161,8 +161,8 @@ class ReplicaAlterLogDirsThreadTest { expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes() expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes() - expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn(replicaT1p0LEO).anyTimes() - expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn(replicaT1p1LEO).anyTimes() + expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaT1p0LEO)).anyTimes() + expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaT1p1LEO)).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager, responseCallback) @@ -221,7 +221,7 @@ class ReplicaAlterLogDirsThreadTest { // since UNDEFINED_EPOCH is -1 wich will be lower than any valid leader epoch, the method // will return UNDEFINED_EPOCH_OFFSET if requested epoch is lower than the first epoch cached - expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn(UNDEFINED_EPOCH_OFFSET).anyTimes() + expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).anyTimes() stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback) replay(replicaManager, logManager, quotaManager, leaderEpochs, futureReplicaLeaderEpochs, replica, futureReplica, partition) @@ -273,7 +273,7 @@ class ReplicaAlterLogDirsThreadTest { expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes() - expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn(replicaLEO).anyTimes() + expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch, replicaLEO)).anyTimes() expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes() @@ -355,7 +355,7 @@ class ReplicaAlterLogDirsThreadTest { expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch) - expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(replicaLEO) + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaLEO)) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback) From c69e85eee24209eb2e89c70ca312fa51415d6094 Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 30 Apr 2018 10:37:23 -0700 Subject: [PATCH 06/24] Fixed build failures and fixed logging --- .../src/main/scala/kafka/server/AbstractFetcherThread.scala | 6 ++++-- .../main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 4 ++-- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 6 +++--- .../unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala | 4 ++++ .../unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala | 2 +- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index c302b7293d4c3..71637ede70e70 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -292,13 +292,15 @@ abstract class AbstractFetcherThread(name: String, * Called from ReplicaFetcherThread and ReplicaAlterLogDirsThread maybeTruncate when * 'offsetToTruncateTo' is the final offset to truncate to. * @param offsetToTruncateTo Final offset to truncate to + * @param offsetFromLeader Offset received from the leader, could be same as + * offsetToTruncateTo, this parameter is used only for logging * @param replica Follower's replica, which is either local replica (ReplicaFetcherThread) * or future replica (ReplicaAlterLogDirsThread) * @return Log end offset if given 'offsetToTruncateTo' is equal or larger * than log end offset and logs the message that truncation is not * needed. Otherwise, returns given 'offsetToTruncateTo' */ - def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, replica: Replica, isFutureReplica: Boolean = false): OffsetTruncationState = { + def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, offsetFromLeader: Long, replica: Replica, isFutureReplica: Boolean = false): OffsetTruncationState = { val fetchOffset = if (offsetToTruncateTo >= replica.logEndOffset.messageOffset) { val logEndOffset = replica.logEndOffset.messageOffset @@ -309,7 +311,7 @@ abstract class AbstractFetcherThread(name: String, else "follower" info( - s"Based on $followerName's leader epoch, leader replied with an offset $logEndOffset >= the " + + s"Based on $followerName's leader epoch, leader replied with an offset $offsetFromLeader. Min(leader offset, leader epoch end offset on the follower) $offsetToTruncateTo >= the " + s"$followerName's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.") logEndOffset } else diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index cd0adef0e39a5..14d0ea85cc844 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -204,14 +204,14 @@ class ReplicaAlterLogDirsThread(name: String, // upgrade, or if this broker is new). // to be safe, we truncate to min of start offset and leader's end offset val offsetToTruncateTo = min(partitionStates.stateValue(topicPartition).fetchOffset, epochOffset.endOffset) - finalFetchLeaderEpochOffset(offsetToTruncateTo, futureReplica, isFutureReplica = true) + finalFetchLeaderEpochOffset(offsetToTruncateTo, epochOffset.endOffset, futureReplica, isFutureReplica = true) } else if (futureLeaderEpoch != epochOffset.leaderEpoch) { // the replica does not know about the epoch that leader replied with val intermediateOffsetToTruncateTo = min(futureEndOffset, futureReplica.logEndOffset.messageOffset) OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) } else { val offsetToTruncateTo = min(futureEndOffset, epochOffset.endOffset) - finalFetchLeaderEpochOffset(offsetToTruncateTo, futureReplica, isFutureReplica = true) + finalFetchLeaderEpochOffset(offsetToTruncateTo, epochOffset.endOffset, futureReplica, isFutureReplica = true) } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 6d9505171e684..af5b14e0f4f18 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -318,7 +318,7 @@ class ReplicaFetcherThread(name: String, // request/response warn(s"Based on follower's leader epoch, leader replied with an unknown leader epoch in ${replica.topicPartition}. " + s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") - finalFetchLeaderEpochOffset(epochOffset.endOffset, replica) + finalFetchLeaderEpochOffset(epochOffset.endOffset, epochOffset.endOffset, replica) } else { // get (leader epoch, end offset) pair that corresponds to the largest leader epoch // less than or equal to the requested epoch. @@ -331,7 +331,7 @@ class ReplicaFetcherThread(name: String, warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + s"below any follower's tracked epochs for ${replica.topicPartition}. " + s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") - finalFetchLeaderEpochOffset(epochOffset.endOffset, replica) + finalFetchLeaderEpochOffset(epochOffset.endOffset, epochOffset.endOffset, replica) } else if (replicaLeaderEpoch != epochOffset.leaderEpoch) { // the replica does not know about the epoch that leader replied with // we truncate to the end offset of the largest epoch that is smaller than the @@ -343,7 +343,7 @@ class ReplicaFetcherThread(name: String, OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) } else { val offsetToTruncateTo = min(replicaEndOffset, epochOffset.endOffset) - finalFetchLeaderEpochOffset(offsetToTruncateTo, replica) + finalFetchLeaderEpochOffset(offsetToTruncateTo, epochOffset.endOffset, replica) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index c78ee7f50f3c6..b064d4e805b50 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -163,6 +163,8 @@ class ReplicaAlterLogDirsThreadTest { expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes() expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaT1p0LEO)).anyTimes() expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaT1p1LEO)).anyTimes() + expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, futureReplicaLEO)).anyTimes() + expect(replicaManager.logManager).andReturn(logManager).anyTimes() stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager, responseCallback) @@ -274,6 +276,7 @@ class ReplicaAlterLogDirsThreadTest { expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes() expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch, replicaLEO)).anyTimes() + expect(futureReplicaLeaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch, futureReplicaLEO)).anyTimes() expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes() @@ -356,6 +359,7 @@ class ReplicaAlterLogDirsThreadTest { expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch) expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaLEO)) + expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, futureReplicaLEO)) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback) diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index bae43c896a15e..3390061c5b503 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -67,7 +67,7 @@ class LeaderEpochFileCacheTest { leo = 14 //Then - assertEquals((leo, 14), cache.endOffsetFor(2)) + assertEquals((2, leo), cache.endOffsetFor(2)) } @Test From ab86ba30015e242520be05b455c588e8c85654ae Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Mon, 30 Apr 2018 11:00:25 -0700 Subject: [PATCH 07/24] Updated comments based on review comments --- .../kafka/server/ReplicaFetcherThread.scala | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index af5b14e0f4f18..dfbb16732bdeb 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -287,14 +287,21 @@ class ReplicaFetcherThread(name: String, } /** - * - Truncate the log to the leader's offset for each partition's epoch. - * - If the leader's offset is greater, we stick with the Log End Offset - * otherwise we truncate to the leaders offset. - * - If the leader replied with undefined epoch offset we must use the high watermark - * - If the leader replied with leader epoch not known to this follower, we truncate to the - * end offset of the largest epoch that is smaller than the epoch the leader replied with, - * and send offset for leader epoch request with that leader epoch. - */ + * Truncate the log for each partition's epoch based on leader's returned epoch and offset. + * -- If the leader replied with undefined epoch offset, we must use the watermark. This can + * happen if 1) the leader is on the protocol version < KAFKA_0_11_0_IV2; 2) the follower + * requested leader epoch < the first leader epoch known to the leader. + * -- If the leader replied with the valid offset but undefined leader epoch, we truncate to + * leader's offset if it is lower than follower's Log End Offset. This may happen if the + * leader in on the protocol version < KAFKA_1_1_IV0 + * -- If the leader replied with leader epoch not known to the follower, we truncate to the + * end offset of the largest epoch that is smaller than the epoch the leader replied with, and + * send offset for leader epoch request with that leader epoch. In a more rare case, where the + * follower was not tracking epochs smaller than the epoch the leader replied with, we + * truncate the leader's offset (and do not send any more leader epoch requesrs). + * -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that + * leader replied with, follower's Log End Offset). + */ override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.Set[TopicPartition]() @@ -310,6 +317,7 @@ class ReplicaFetcherThread(name: String, } else { val offsetTruncationState = if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { + // truncate to initial offset which is the high watermark warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " + s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.") OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true) @@ -325,7 +333,7 @@ class ReplicaFetcherThread(name: String, val (replicaLeaderEpoch, replicaEndOffset) = replica.epochs.get.endOffsetFor (epochOffset.leaderEpoch) if (replicaEndOffset == UNDEFINED_EPOCH_OFFSET) { // This can happen if replica was not tracking offsets at that point (before the - // upgrade, or if this broker is new). Since the leader replied with epoch <= + // upgrade, or if this broker is new). Since the leader replied with epoch < // requested epoch from follower, so should be safe to truncate to leader's // offset (this is the same behavior as post-KIP-101 and pre-KIP-279) warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + From b2be14dea06ed551556ba52278a5bc397e4a596c Mon Sep 17 00:00:00 2001 From: Anna Povzner Date: Tue, 1 May 2018 11:16:06 -0700 Subject: [PATCH 08/24] Updated the upgrade doc and minor fixes in comments --- .../src/main/scala/kafka/api/ApiVersion.scala | 1 + .../kafka/server/ReplicaFetcherThread.scala | 42 +++++++-------- docs/upgrade.html | 52 +++++++++++++++++++ 3 files changed, 74 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 62b91a0eb8b5a..f38435633f2d1 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -77,6 +77,7 @@ object ApiVersion { // and KafkaStorageException for fetch requests. "1.1-IV0" -> KAFKA_1_1_IV0, "1.1" -> KAFKA_1_1_IV0, + // Introduced OffsetsForLeaderEpochRequest/OffsetsForLeaderEpochResponse V1 via KIP-279 "2.0-IV0" -> KAFKA_2_0_IV0, "2.0" -> KAFKA_2_0_IV0 ) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index dfbb16732bdeb..7fcc2178e13e1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -288,15 +288,15 @@ class ReplicaFetcherThread(name: String, /** * Truncate the log for each partition's epoch based on leader's returned epoch and offset. - * -- If the leader replied with undefined epoch offset, we must use the watermark. This can - * happen if 1) the leader is on the protocol version < KAFKA_0_11_0_IV2; 2) the follower + * -- If the leader replied with undefined epoch offset, we must use the high watermark. This can + * happen if 1) the leader is still using message format older than KAFKA_0_11_0_IV2; 2) the follower * requested leader epoch < the first leader epoch known to the leader. * -- If the leader replied with the valid offset but undefined leader epoch, we truncate to * leader's offset if it is lower than follower's Log End Offset. This may happen if the - * leader in on the protocol version < KAFKA_1_1_IV0 + * leader is on the inter-broker protocol version < KAFKA_2_0_IV0 * -- If the leader replied with leader epoch not known to the follower, we truncate to the * end offset of the largest epoch that is smaller than the epoch the leader replied with, and - * send offset for leader epoch request with that leader epoch. In a more rare case, where the + * send OffsetsForLeaderEpochRequest with that leader epoch. In a more rare case, where the * follower was not tracking epochs smaller than the epoch the leader replied with, we * truncate the leader's offset (and do not send any more leader epoch requesrs). * -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that @@ -306,52 +306,52 @@ class ReplicaFetcherThread(name: String, val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.Set[TopicPartition]() - fetchedEpochs.foreach { case (tp, epochOffset) => + fetchedEpochs.foreach { case (tp, leaderEpochOffset) => try { val replica = replicaMgr.getReplicaOrException(tp) val partition = replicaMgr.getPartition(tp).get - if (epochOffset.hasError) { - info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}") + if (leaderEpochOffset.hasError) { + info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${leaderEpochOffset.error}") partitionsWithError += tp } else { val offsetTruncationState = - if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { + if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { // truncate to initial offset which is the high watermark warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " + s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.") OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true) - } else if (epochOffset.leaderEpoch == UNDEFINED_EPOCH) { - // this may happen if the leader used version 0 of OffsetForLeaderEpoch - // request/response + } else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH) { + // this may happen if the leader uses inter-broker protocol version < KAFKA_2_0_IV0 + // (version 0 of OffsetForLeaderEpoch request/response) warn(s"Based on follower's leader epoch, leader replied with an unknown leader epoch in ${replica.topicPartition}. " + - s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") - finalFetchLeaderEpochOffset(epochOffset.endOffset, epochOffset.endOffset, replica) + s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation.") + finalFetchLeaderEpochOffset(leaderEpochOffset.endOffset, leaderEpochOffset.endOffset, replica) } else { // get (leader epoch, end offset) pair that corresponds to the largest leader epoch // less than or equal to the requested epoch. - val (replicaLeaderEpoch, replicaEndOffset) = replica.epochs.get.endOffsetFor (epochOffset.leaderEpoch) + val (replicaLeaderEpoch, replicaEndOffset) = replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch) if (replicaEndOffset == UNDEFINED_EPOCH_OFFSET) { // This can happen if replica was not tracking offsets at that point (before the // upgrade, or if this broker is new). Since the leader replied with epoch < // requested epoch from follower, so should be safe to truncate to leader's // offset (this is the same behavior as post-KIP-101 and pre-KIP-279) - warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + + warn(s"Based on follower's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " + s"below any follower's tracked epochs for ${replica.topicPartition}. " + - s"The leader's offset only ${epochOffset.endOffset} will be used for truncation.") - finalFetchLeaderEpochOffset(epochOffset.endOffset, epochOffset.endOffset, replica) - } else if (replicaLeaderEpoch != epochOffset.leaderEpoch) { + s"The leader's offset only ${leaderEpochOffset.endOffset} will be used for truncation.") + finalFetchLeaderEpochOffset(leaderEpochOffset.endOffset, leaderEpochOffset.endOffset, replica) + } else if (replicaLeaderEpoch != leaderEpochOffset.leaderEpoch) { // the replica does not know about the epoch that leader replied with // we truncate to the end offset of the largest epoch that is smaller than the // epoch the leader replied with, and send another offset for leader epoch request val intermediateOffsetToTruncateTo = min(replicaEndOffset, replica.logEndOffset.messageOffset) - warn(s"Based on follower's leader epoch, leader replied with epoch ${epochOffset.leaderEpoch} " + + warn(s"Based on follower's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " + s"unknown to the follower for ${replica.topicPartition}. " + s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.") OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false) } else { - val offsetToTruncateTo = min(replicaEndOffset, epochOffset.endOffset) - finalFetchLeaderEpochOffset(offsetToTruncateTo, epochOffset.endOffset, replica) + val offsetToTruncateTo = min(replicaEndOffset, leaderEpochOffset.endOffset) + finalFetchLeaderEpochOffset(offsetToTruncateTo, leaderEpochOffset.endOffset, replica) } } diff --git a/docs/upgrade.html b/docs/upgrade.html index 8bfc61ef480d6..736d135919bdc 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,58 @@