Skip to content

KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over#4882

Merged
junrao merged 24 commits intoapache:trunkfrom
apovzner:kafka-6361
May 10, 2018
Merged

KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over#4882
junrao merged 24 commits intoapache:trunkfrom
apovzner:kafka-6361

Conversation

@apovzner
Copy link
Copy Markdown
Contributor

@apovzner apovzner commented Apr 16, 2018

Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over

In summary:

  • Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
  • Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch)
  • If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with.

Added integration test EpochDrivenReplicationProtocolAcceptanceTest.logsShouldNotDivergeOnUncleanLeaderElections that does 3 fast leader changes where unclean leader election is enabled and min isr is 1. The test failed before the fix was implemented.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@apovzner
Copy link
Copy Markdown
Contributor Author

apovzner commented Apr 26, 2018

@lindong28 and @junrao Regarding the truncation logic for future replica (ReplicaAlterLogDirsThread): I thought about this a bit more, and I think we don't need the same truncation logic as in the replica fetcher. We can implement a much simpler logic based on the truncation offset of the local replica. Here are main points:

  1. Currently (and in the original implementation), if the replica becomes a follower, it does a truncation and forces the truncation state on the future replica and provides it with its truncation offset. The original implementation and this PR right now does not use that offset except if the future or current replica does not have any epochs recorded.
  2. I think it is safe for the future replica to truncate and fetch from the offset that the local replica truncated to, i.e., min of that offset and future replica's LOE. The reason is that the future replica only ever fetches from the local replica == one source. So, the only reason why future replica and local replica's logs may diverge is if the local replica truncates and fetches different offsets from the new leader. In this case, we already force truncation of the future replica and provide the new truncate offset of the local replica.
  3. One possible reason against is that what if we "lose" future epoch sequence file or entries from it. The leader epoch truncation logic may help if we lose some recent entries because it will truncate to the end offset of the last known epoch. However, it does not guarantee proper recovery in many cases. Plus, I understand that the future replica will do same recovery logic as normal replica if say something gets corrupted, or we lose the leader epoch file, e.g., someone deletes it (it gets rebuilt from the log).

Based on the above, unless I missed something or my assumptions are wrong, I think we should truncate future replica to the truncation offset of the local replica. This will result in a simpler code/logic which is easier to reason about and debug in the future.

The PR is also ready to review.

@lindong28
Copy link
Copy Markdown
Member

@apovzner Thanks for the patch! I will finish the first round of review this week.

@lindong28 lindong28 self-requested a review April 26, 2018 08:22
Copy link
Copy Markdown
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! LGTM. I only have some minor comments regarding the Java doc and the consistency between ReplicaFetcherThread and ReplicaAlterDirThread.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: leader replied with an offset $offsetToTruncateTo not (logEndOffset)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's a good catch, and also made me realize it's not a completely correct log message for all the cases, will fix.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are changing this Java doc, can we make this Java doc a bit more consistent with the actual implementation. For example, If the leader replied with undefined epoch can probably be the first case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: This is not related to this patch. Can we add one comment saying that the initial offset in this case will be the high watermark, so that it is more consistent with the Java doc?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment above the logging

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: Would it be a bit more accurate a bit more accurate to replace <= with <?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: It probably does not affect the correctness of the code. But I am wondering if we can make ReplicaFetcherThread.maybeTruncate() a bit more consistent with the ReplicaAlterDirThread.maybeTruncate(). Currently ReplicaFetcherThread.maybeTruncate() will specifically handle the scenario that epochOffset.leaderEpoch == UNDEFINED_EPOCH whereas ReplicaAlterDirThread.maybeTruncate() handles the scenario through futureEndOffset == UNDEFINED_EPOCH_OFFSET.

Also, should we also consider to min with the initial offset here just like this patch does in ReplicaAlterDirThread.maybeTruncate()? I am wondering whether we can make them more consistent, or whether there is reason that the initial offset is needed in only one of them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the question in the second paragraph, it would be incorrect to do min with the initial offset (high watermark) here, because this will fallback to pre-KIP-101 implementation and we can actually lose a committed message (see scenario 1 in KIP-101) . This particular case can happen if the leader is on the protocol version of pre this KIP but post-KIP-101, so it replies with the valid offset but invalid leader epoch. In this case, we want to do KIP-101 implementation of truncating to leader's offset, rather than falling back to pre-KIP-101 implementation.

Regarding a bigger question of making AlterLogDirThread more consistent with ReplicaAlterLogDirThread, I wanted to discuss the possibility of ReplicaAlterLogDirThread using only initial offset (which is truncation offset of the main replica) for truncation instead of following offset for leader epoch logic. I left the comment earlier in this PR and wanted to get your opinion.
-- The initial offset in AlterLogDirThread is main replica truncation offset (if main replica is a follower) or main replica's high watermark (if main replica is the leader), which is different from initial offset in ReplicaFetcherThread which is a high watermark.
-- There is no way that future replica needs to truncate further back than initial offset, because it is always a follower of the main replica, and if the main replica truncated and re-fetched offsets from the leader causing temporary log divergence with the future replica, we already force truncation on the future replica setting future replica's initial offset to the main replica truncation offset.

Otherwise, I will change ReplicaAlterDirThread.maybeTruncate() to be more consistent with ReplicaAlterDirThread.maybeTruncate() in the case mentioned above. Although the current implementation is correct, because falling back to the initial offset for the future replica is safe (vs. for follower replica), because the future replica is always a follower. The whole reason for KIP-101 and KIP-279 is to deal with replicas changing their leader/follower status, while the future replica is always a follower.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner Thanks for the explanation. The case that the leader is on the protocol version of pre this KIP but post-KIP-101 AND this patch is used in some broker, can only happen if the Kafka cluster being upgraded to use this patch. The time window of this state is very small and maybe we do not need to take care of this scenario.

Regarding the possibility of ReplicaAlterLogDirThread using only initial offset, Jun has provided a very good example. Basically this approach is not reliable if the future replica is offline when leader replica is truncated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, to be more precise, both leader and follower could be on pre- this KIP protocol version, if the user upgrades the brokers but do not bump the protocol version. So I think we want post KIP-101 behavior, which is what's implemented, vs. going back to pre-KIP-101.

OK, offline future replica is a good example I did not consider. I agree we should use the same algorithm in ReplicaAlterLogDirThread. I will make it more consistent.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner : Thanks for the patch. Left a few comments below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this affects inter broker protocol, we need to (1) document this api change for "2.0-IV0" in ApiVersion.scala, (2) update the upgrade section in the doc, (3) only use the new protocol if the inter broker protocol is 2.0-IV0 or above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done all three.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding (3), the fetcher falls back to KIP-101 logic if inter-broker protocol version < KAFKA_2_0_IV0 (ignores leader epoch returned in the response and uses end offset).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watermark => high watermark

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more precise, 1) should be "the leader is still using message format older than KAFKA_0_11_0_IV2".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA_1_1_IV0 should be KAFKA_2_0_IV0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case, it's probably better to fall back to high watermark. That way, if the leader epoch logic doesn't apply, we always consistently fall back to the old method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern about falling back to high watermark in this particular case is that post-KIP-101 code (and pre-2.0) behaves exactly as described, since the leader does not send leader epoch, so we don't check it and use leader's offset to truncate. And also if brokers upgrade to 2.0, but do not upgrade protocol version. Then, we upgrade to 2.0 protocol version, and we are back to high watermark in this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above logic may still be needed in the following sequence: (1) future replica copies data above HW from current replica; (2) future replica is offline (e.g. disk failure); (3) current replica truncates data above HW and re-replicated new data from leader on the truncated offsets. To avoid duplicates, perhaps we can share the code between ReplicaFetchThread and here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good example I did not consider. In that case, I agree, we need the leader epoch logic. I will try to move this code out into a common method that both fetchers re-use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "the the". Also, we are now returning the leader epoch and the end offset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to just initialize updatedOffsetsOpt to offsets and make it a none Option?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably because the broker port changes on restart?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change -1 to UNDEFINED_EPOCH_OFFSET?

@apovzner
Copy link
Copy Markdown
Contributor Author

apovzner commented May 1, 2018

@junrao and @lindong28 Thanks a lot for your comments. I addressed all of them.

Based on the use case of future replica being offline and missing "mark for truncation" event, I agree that ReplicaAlterLogDirsThread should use the same leader epoch logic for truncation as in ReplicaFetherThread. I moved the common logic that finds truncation offset to AbstractFetcherThread.getOffsetTruncationState, so now the truncation logic in both fetchers is consistent.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner : Thanks for the updated patch. A few more comments below. A couple of other things.

  1. Could you also run the system tests?
  2. This is not an issue directly related to this patch. But I noticed that in Log.truncateTo(), if the truncation point is in the middle of a message set, we will actually be truncating to the first offset of the message set. In that case, the replica fetcher thread should adjust the fetch offset to the actual truncated offset. Typically, the truncation point should never be in the middle of a message set. However, this could potentially happen during message format upgrade. We can tighten this up in a separate jira.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in ReplicaManager.alterReplicaLogDirs(), the initial offset for future replica is also set to its HW. We update future replica's HW In ReplicaAlterLogDirsThread.processPartitionData(). We probably want to bound it by future replica's log end offset.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed the comment to say it is either high watermark for future replica, or current replica's truncation offset.
Also, changed ReplicaAlterLogDirsThread.processPartitionData() to bound future replica's high watermark to its log end offset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test useLeaderEpochInResponse? The only case we want to cover here is that the follower uses version 0 of OffsetForLeaderEpoch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am testing it in ReplicaFetcherThreadTest.shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not tracking offsets => not tracking leader epochs ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a normal behavior. So the logging probably should be info.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we can truncate in more than 1 step, it's probably useful to always bound the truncation point by the replica's log end offset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to mention that we are returning both the epoch and the offset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exists in line 23 already.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed dup from like 23

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should actually fail right now since the version of the leaderEpochRequest is alway version 1. So, we probably want to check the latestAllowedVersion() in the builder in ReplicaFetcherMockBlockingSend.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this test ended up testing local broker on 0.11 and remote broker on latest version, which actually does not fail because we don't check leader epoch in leaderEpochResponse, and the truncation is done using KIP-101 approach (which is verified by this test). I will update the test to use undefined leader epoch in response to simulate another broker also on the older protocol version.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicaLeaderEpoch and leaderEpochOffset may be confusing. How about followerEpoch and leaderEpochOffset?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA_0_11_0_IV2 => KAFKA_0_11_0

Copy link
Copy Markdown
Member

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! Left a few comments

private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get epochs for"));

/* v2 request is the same as v1. Per-partition leader epoch has been added to response */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo. Probably should be v1 instead of v2.

// OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 added a per-partition leader epoch field,
// which specifies which leader epoch the end offset belongs to
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 = new Schema(
ERROR_CODE,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: can we make the indention same as the existing indention in this file?

// and KafkaStorageException for fetch requests.
"1.1-IV0" -> KAFKA_1_1_IV0,
"1.1" -> KAFKA_1_1_IV0,
// Introduced OffsetsForLeaderEpochRequest/OffsetsForLeaderEpochResponse V1 via KIP-279
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: to be more consistent with the existing comment, we can just say Introduced OffsetsForLeaderEpochRequest V1 via KIP-279

/**
* @param leaderEpoch Requested leader epoch
* @return The last offset of messages published under this leader epoch.
* @return The requested leader epoch and the last offset of messages published under this
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the existing Java doc (prior to this patch) of this method is not correct. According to Java doc of LeaderEpochFileCache.endOffsetFor(...), it says The End Offset is the end offset of this epoch, which is defined as the start offset of the first Leader Epoch larger than the Leader Epoch requested, or else the Log End Offset if the latest epoch was requested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the prior description is more of a shortcut, which is actually not correct. I just realized that we should use "end offset" instead of the last offset of messages published here -- the description in LeaderEpochFileCache is more precise. I will update this comment accordingly.

val followerName = if (isFutureReplica) "future replica" else "follower"

// Called when 'offsetToTruncateTo' is the final offset to truncate to.
def finalFetchLeaderEpochOffset(offsetToTruncateTo: Long, offsetFromLeader: Long): OffsetTruncationState = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: now that we don't have any logging in finalFetchLeaderEpochOffset(), we can probably remove this method and replace its usage with one line. For example, finalFetchLeaderEpochOffset(leaderEpochOffset.endOffset, leaderEpochOffset.endOffset) is equivalent to OffsetTruncationState(math.min(offsetToTruncateTo, replica.logEndOffset.messageOffset), truncationCompleted = true)

isInterruptible = false,
includeLogTruncation = true) {
includeLogTruncation = true,
useLeaderEpochInResponse = brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ReplicaFetcherThread.fetchEpochsFromLeader(), the version of OffsetsForLeaderEpochRequest should probably be determined based on the interBrokerProtocolVersion. We can use OffsetsForLeaderEpochRequest V1 only if the interBrokerProtocolVersion >= KAFKA_2_0_IV0. Otherwise, we can rolling bounce the cluster to upgrade the code, the leader may still be running the old code and not recognizes OffsetsForLeaderEpochRequest V1.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OffsetsForLeaderEpoch request is exactly the same in v0, so we don't need to explicitly check the protocol version when building the requests. If the leader is on older version, it will send v0 response, which will not include leader epoch field, which is handled in OffsetsForLeaderEpochResponse constructor by setting leader epoch field to undefined. In the fetcher thread, we handle this case (where the leader epoch is undefined) in maybeTruncate() and fall back to KIP-101 behavior, same as when this broker is on older protocol version.

Copy link
Copy Markdown
Member

@lindong28 lindong28 May 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. my understanding is that the version of the response should always match the version of the request. Thus in order to receive OffsetsForLeaderEpochResponse V1, the broker needs to send OffsetsForLeaderEpochRequest V1. And the broker should reject the request if the version of the request is not recognized. Did I miss something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct. I meant nothing different to do in the fetcher thread. If I understood the code correctly, ReplicaFetcherThread.fetchEpochsFromLeader() passes the OffsetsForLeaderEpochRequest.Builder to sendRequest(), and then build() is called on that builder with with a version in NetworkClient.doSend. It looks like the proper version will be used in that case.

Copy link
Copy Markdown
Member

@lindong28 lindong28 May 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently if we do not explicitly specify the version for AbstractRequest.Builder(), the latest version of this request, as determined by ApiKeys.latestVersion(), will be used for this request. The latest version for OffsetsForLeaderEpochRequest will be V1 after this patch. we probably need to explicitly pass the version (determined by the IBP) to OffsetsForLeaderEpochRequest.Builder, similar to what we do for UpdateMetadataRequest.Builder() in ControllerBrokerRequestBatch.sendRequestsToBrokers().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, thank you, let me take a look.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for your help, I updated the code to use the OffsetsForLeaderEpochRequest version when building a request.

*/
def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset: EpochEndOffset, replica: Replica, isFutureReplica: Boolean = false): OffsetTruncationState = {
// to make sure we can distinguish log output for fetching from remote leader or local replica
val followerName = if (isFutureReplica) "future replica" else "follower"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: this replica can be either follower or future replica. Maybe the variable can be named replicaName?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I already went back and forth couple of times regarding "replica" vs. "follower" (also re: your comment below). Jun commented (in this PR) that replica is also confusing in a way that leader is also a replica. And in case of future replica, it is also a follower, but of a different type. I propose to keep this name as is, but replace replicaEndOffset with followerEndOffset re: your comment below.

} else {
// get (leader epoch, end offset) pair that corresponds to the largest leader epoch
// less than or equal to the requested epoch.
val (followerEpoch, replicaEndOffset) = replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: would the name replicaEpoch be more consistent with the name replicaEndOffset?


case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {

def this (offset: Long) = this(offset, true)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: can we remove the space after this?

isInterruptible: Boolean = true,
includeLogTruncation: Boolean)
includeLogTruncation: Boolean,
useLeaderEpochInResponse: Boolean = true)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion is that it may be more general to just pass the interBrokerProtocolVersion to the constructor of AbstactFetcherThread. And we use this variable to determine the version of OffsetsForLeaderEpochRequest when we actually generate the builder for OffsetsForLeaderEpochRequest. It is more consistent with the existing usage KafkaConfig.interBrokerProtocolVersion in the code base. And if in the future there is some other logic that relies on the interBrokerProtocol in the AbstractFetcherThread, we won't need to add more variable to the constructor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. However, I just tried it, and it requires changes to ConsumerFetcherThread constructor, and then ConsumerFetcherManager, and so on. I think it would be easy to change later when we need more logic dependent on inter broker protocol version, and especially once we remove old consumer fetcher code.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner : Thanks for the latest patch. LGTM. Just a few more minor comments below.

* @param fetchOffsets the partitions to mark truncation complete
*/
private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, Long]) {
private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, OffsetTruncationState]) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name now is not very accurate. It doesn't always mark truncation as completed.

warn(s"Based on $followerName's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true)
} else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH || !useLeaderEpochInResponse) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't really need the flag useLeaderEpochInResponse. If interBrokerProtocolVersion < KAFKA_2_0_IV0, it's guaranteed that leaderEpochOffset.leaderEpoch is UNDEFINED_EPOCH.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I have thought about this as well. But for future replica, even if interBrokerProtocolVersion < KAFKA_2_0_IV0, ReplicaAlterDirThread.fetchEpochsFromLeader() may still return EpochEndOffset whose leaderEpoch is not UNDEFINED_EPOCH. Maybe this method should also return UNDEFINED_EPOCH if interBrokerProtocolVersion < KAFKA_2_0_IV0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks for raising this, Dong. I think we should then make ReplicaAlterDirThread.fetchEpochsFromLeader() return response with UNDEFINED_EPOCH to match "older protocol response".

* truncate the leader's offset (and do not send any more leader epoch requests).
* -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that
* leader replied with, follower's Log End Offset).
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this comment is really for AbstractFetcherThread.getOffsetTruncationState(). If we move the comment there, we can also simplify the comment in ReplicaAlterLogDirsThread.maybeTruncate().

// less than or equal to the requested epoch.
val (followerEpoch, followerEndOffset) = replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch)
if (followerEndOffset == UNDEFINED_EPOCH_OFFSET) {
// This can happen if replica was not tracking leader epochs at that point (before the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the code uses follower, perhaps we can say "if follower was not"

//We should have truncated to the offsets in the response
assertTrue(truncateToCapture.getValues.asScala.contains(156))
assertTrue(truncateToCapture.getValues.asScala.contains(172))
assertTrue("Expected offset 156 in captured truncation offsets " + truncateToCapture.getValues,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can change the text to sth like "Expect partition t1p0 to truncate to offset 156".



//We should have truncated to the offsets in the first response
assertTrue("Expected offset 155 in captured truncation offsets " + truncateToCapture.getValues,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we further assert that the builder for OFFSET_FOR_LEADER_EPOCH in ReplicaFetcherMockBlockingSend.sendRequest() is set with the right version?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified ReplicaFetcherMockBlockingSend to save the version of OffsetsForLeaderEpochRequest and added couple of checks in the test.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner : Thanks for the new patch. A couple of more comments.

tp -> new EpochEndOffset(Errors.NONE, replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch))
val (leaderEpoch, leaderOffset) = replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)
val leaderEpochInResponse: Int =
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) leaderEpoch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check? Since we are getting the leader epoch from the current replica's log directly, even when IBP < KAFKA_2_0_IV0, it seems that we can just return leaderEpoch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are on protocol < 2.0, then the local replica will be fetching from leader based on older protocol (not using leader epoch). If we don't check here, the future replica will be fetching from the local replica based on leader epoch. Seems inconsistent? On the other hand, it should still work for the future replica to truncate using leader epoch in that case too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, preserving the leader epoch always gives better outcome. So, if we can do it, there is no reason to switch to a worse method. We have no choice btw follower and leader because of IBP. However, here, everything is local. So, there is no need to be constraint by IBP.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the leader epoch here for better outcome, should we still check useLeaderEpochInResponse in getOffsetTruncationState() so that it returns OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true) if useLeaderEpochInResponse is false?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use leader epoch, then we should go all the way using the new protocol, i.e., continue truncating until finding the consistent point.
Ok, I will change back to using leader epoch if available for future replica.

OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false)
} else {
val offsetToTruncateTo = min(followerEndOffset, leaderEpochOffset.endOffset)
OffsetTruncationState(min(offsetToTruncateTo, replica.logEndOffset.messageOffset), truncationCompleted = true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, we don't expect the truncation point to be < local HW. So, it would be useful to log a warning when this happens. Not sure what's the easiest way since now we can have intermediate truncation point.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner : Thanks for the update. A minor comment below. Also, (1) have you added the logic to warn if truncation point < local HW? (2) have you run all systems?

import java.util

import AbstractFetcherThread.ResultWithPartitions
import kafka.api._
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

@apovzner
Copy link
Copy Markdown
Contributor Author

apovzner commented May 9, 2018

@junrao I added the warning about truncating below HW to ReplicaFetcherThread.maybeTruncate. I explicitly compare replica.highWatermark to the offset we are truncating. If we truncate several times, and more than once below HW, we will output the warning multiple times, which I think is ok.

I ran system tests yesterday (https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1746/) and there was only one failure in kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=streams-join.scale=1 which was due to stream test process took too long to exit. I don't think it is related to any changes in this PR.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner : Thanks for the patch. LGTM.

@junrao junrao merged commit 9679c44 into apache:trunk May 10, 2018
Comment thread docs/upgrade.html

<script id="upgrade-template" type="text/x-handlebars-template">

<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 1.2.x to 2.0.0</a></h4>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apovzner @junrao while working on another PR, I realized this one duplicated the part of upgrade_2_0_0 with upgrade_1_2_0 (we renamed 1.2 to 2.0). If there is nothing new content added I'll go ahead and remove the duplicated section in my PR

ijuma added a commit to ijuma/kafka that referenced this pull request May 11, 2018
…-record-version

* apache-github/trunk:
  KAFKA-6894: Improve err msg when connecting processor with global store (apache#5000)
  KAFKA-6893; Create processors before starting acceptor in SocketServer (apache#4999)
  MINOR: Fix typo in ConsumerRebalanceListener JavaDoc (apache#4996)
  MINOR: Remove deprecated valueTransformer.punctuate (apache#4993)
  MINOR: Update dynamic broker configuration doc for truststore update (apache#4954)
  KAFKA-6870 Concurrency conflicts in SampledStat (apache#4985)
  KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over (apache#4882)
  KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (apache#4976)
  KAFKA-6878 Switch the order of underlying.init and initInternal (apache#4988)
  KAFKA-6299; Fix AdminClient error handling when metadata changes (apache#4295)
  KAFKA-6878: NPE when querying global state store not in READY state (apache#4978)
  KAFKA 6673: Implemented missing override equals method (apache#4745)
  KAFKA-6834: Handle compaction with batches bigger than max.message.bytes (apache#4953)
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…t leader fail over (apache#4882)

Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over

In summary:
- Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
- Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch)
- If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with.

Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants