KAFKA-5758: Don't fail fetch request if replica is no longer a follower for a partition#3954
Conversation
|
@junrao, does this look reasonable? |
909c689 to
6c73665
Compare
|
retest this please |
There was a problem hiding this comment.
The line probably should be above line 792 now?
There was a problem hiding this comment.
Good catch that this comment is in the wrong place. I updated the code a little and I think the comment doesn't add much after that, so I removed it. Let me know if you disagree.
There was a problem hiding this comment.
This seems to be an existing problem, but it seems that we only need to call tryCompleteDelayedRequests() when the HW increments.
There was a problem hiding this comment.
I think tryCompleteDelayedDeleteRecords is affected by leaderLWIncremented.
private def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(topicPartition)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
}We could make the tryComplete calls more specific depending on what changed, if you think that's worth it.
There was a problem hiding this comment.
Shouldn't we set the broker.id property since ReplicaManager needs config.brokerId?
There was a problem hiding this comment.
Yes, createBrokerConfig does it:
if (nodeId >= 0) props.put(KafkaConfig.BrokerIdProp, nodeId.toString)There was a problem hiding this comment.
If the replica is not in the assigned replica list, I am wondering if we should just send an empty fetched data back.
There was a problem hiding this comment.
Yes, I agree that it would be better and I was a bit unhappy about the wastefulness of sending a non empty fetch response in that case. Will fix.
…er for a partition We log a warning instead, which is what we also do if the partition hasn't been created yet.
84ffc4b to
45f52b3
Compare
|
retest this please |
|
@junrao, I've addressed your comments. |
|
@ijuma : Thanks for the updated patch. LGTM. I will let you merge it. |
| val logStartOffsets = assignedReplicas.collect { | ||
| case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) => replica.logStartOffset | ||
| } | ||
| CoreUtils.min(logStartOffsets, 0L) |
There was a problem hiding this comment.
Wouldn't this return 0 in most cases ?
There was a problem hiding this comment.
No, it only returns 0 if the collection is empty.
Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in #3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset in fetched records. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in #3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset in fetched records. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in #3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset in fetched records. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in #3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset in fetched records. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
…5305) Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in apache#3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset in fetched records. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
KAFKA-5758; Don't fail fetch request if replica is no longer a follower for a partition We log a warning instead, which is what we also do if the partition hasn't been created yet. A few other improvements: - Return updated high watermark if fetch is returned immediately. This seems to be more intuitive and is consistent with the case where the fetch request is served from the purgatory. - Centralise offline partition handling - Remove unnecessary `tryCompleteDelayedProduce` that would already have been done by the called method - A few other minor clean-ups Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com> Closes apache#3954 from ijuma/kafka-5758-dont-fail-fetch-request-if-replica-is-not-follower
We log a warning instead, which is what we also do if the partition
hasn't been created yet.
A few other improvements:
This seems to be more intuitive and is consistent with the case
where the fetch request is served from the purgatory.
tryCompleteDelayedProducethat wouldalready have been done by the called method