Add AlterISR RPC and use it for ISR modifications#9100
Add AlterISR RPC and use it for ISR modifications#9100hachikuji merged 47 commits intoapache:trunkfrom
Conversation
* Make AlterIsr block until Controller process it fully * Batch AlterIsr requests for a short while on broker side * Don't attempt to restrict to a single in-flight request
| def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: AlterIsrResponseData => Unit): Unit = { | ||
| val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() | ||
|
|
||
| alterIsrRequest.topics().forEach(topicReq => topicReq.partitions().forEach(partitionReq => { |
There was a problem hiding this comment.
It might be simpler just to use AlterIsrRequestData and AlterIsrResponseData throughout this code (rather than converting to Map[TopicPartition, LeaderAndIsr] and Map[TopicPartition, Errors])
There was a problem hiding this comment.
The conversion logic is a tad annoying, but it makes the rest of the code nicer. I'm ok with it. That said, could we use scala conventions, e.g.:
alterIsrRequest.topics.forEach { topicReq =>
topicReq.partitions.forEach { partitionReq =>
hachikuji
left a comment
There was a problem hiding this comment.
Thanks. Left a few comments. I haven't looked closely at the controller logic yet.
| }) | ||
| }) | ||
|
|
||
| def responseHandler(response: ClientResponse): Unit = { |
There was a problem hiding this comment.
I think the basic approach here is to ignore successful responses and wait for the LeaderAndIsr update. I am wondering how we should handle the case when the update failed. Say for example that our update fails with the INVALID_VERSION error. Inside Partition, we will still have the pendingIsr set. Do we need to clear it? How about other errors?
There was a problem hiding this comment.
I found a race during the system tests when a follower is shutting down. The controller handles the shut down before it handles an AlterIsr. If the proposed ISR includes the now-offline replica, the controller refuses to update that ISR change and returns an error for that partition. It then sends out the current LeaderAndIsr.
The problem is that the broker ignores this LeaderAndIsr since it has the same leader epoch. This is easy enough to fix, we can bump the leader epoch in the controller (and ZK) before sending it out.
However, there's still the case of failing to update ZK. I think we should probably treat this the same way as an offline replica. If we simply return an error in AlterIsr response and let the leader reset the pending ISR state, the leader will just retry with stale metadata and the update will fail again.
I think in all these error cases we must bump the leader epoch to force the leader to accept the new LeaderAndIsr. Thoughts?
| override def clearPending(topicPartition: TopicPartition): Unit = { | ||
| pendingIsrUpdates synchronized { | ||
| // when we get a new LeaderAndIsr, we clear out any pending requests | ||
| pendingIsrUpdates.remove(topicPartition) |
There was a problem hiding this comment.
Removal from this set won't prevent BrokerToControllerRequestThread from retrying in-flight requests. I'm considering whether we should have a way to cancel requests that we are still awaiting.
There was a problem hiding this comment.
With the latest changes to prevent multiple in-flight requests, I don't think this should happen for a given partition. Even if it did, the retried in-flight request from BrokerToControllerRequestThread would fail on the controller with an old version.
I'm wondering if we even need this clearPending behavior. Since I changed the AlterIsr request to fire at most after 50ms, it's a narrow window between enqueueing an ISR update and receiving a LeaderAndIsr.
There was a problem hiding this comment.
Update: after some discussion and looking over failed system tests, we ended up with the following error handling:
- REPLICA_NOT_AVAILABLE and INVALID_REPLICA_ASSIGNMENT will clear the pending ISR to let the leader retry. This covers a case where a leader tries to add a replica to the ISR which is offline because it (the follower) just finished shutdown.
- INVALID_UPDATE_VERSION will not clear the pending ISR since the broker has stale metadata.
- FENCED_LEADER_EPOCH, NOT_LEADER_OR_FOLLOWER, UNKNOWN_TOPIC_OR_PARTITION will not clear the pending state and therefor will not retry. We presume here that the controller is correct and the leader has old metadata. By not clearing the pending ISR, the leader will await LeaderAndIsr before attempting any further ISR changes
- Other unspecified errors: clear the pending state and let the leader retry. Not sure what cases could cause other errors, but it is probably better to be in a retry loop than to be completely stuck
Also fix some short circuit bugs in controller
|
Recent changes include:
|
| def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { | ||
| val alterIsrRequest = request.body[AlterIsrRequest] | ||
|
|
||
| if (!controller.isActive) { |
There was a problem hiding this comment.
I think authorization should probably be the first thing we do.
There was a problem hiding this comment.
This is actually a really good point. I filed a JIRA to fix this in other places in KafkaApis https://issues.apache.org/jira/browse/KAFKA-10491
| if (requestLeaderEpoch > currentLeaderEpoch) { | ||
| val currentZkVersion = partition.getZkVersion | ||
| val requestZkVersion = partitionState.zkVersion | ||
| if (requestLeaderEpoch > currentLeaderEpoch || requestZkVersion > currentZkVersion) { |
There was a problem hiding this comment.
Do we still need this change? I think we are trying to keep the current approach where the controller bumps the leader epoch for any controller-initiated change.
There was a problem hiding this comment.
No, we don't need this anymore. This was added so a LeaderAndIsr could update the Partition state without a leader epoch bump, but we don't have that flow anymore so we can revert this.
* Lots of cleanup, logging improvements * Changes to AlterIsr RPC field names * Consolidate on maximal ISR name * Introduce sealed trait for different ISR states * Tweaks to some error handling
| // if it is not in the ISR yet | ||
| if (!inSyncReplicaIds.contains(followerId)) | ||
| // Check if this in-sync replica needs to be added to the ISR. We look at the "maximal" ISR here so we don't | ||
| // send an additional Alter ISR request for the same replica |
There was a problem hiding this comment.
Another possibility is that the replica is pending removal in which case another AlterIsr will be needed. I think it might be more intuitive to make this check:
if (!isrState.inflight && !isrState.isr.contains(followerId))There was a problem hiding this comment.
Yea checking the maximal set isn't needed anymore since adding the sealed trait. I'll just update this to simply call maybeExpandIsr which will do the check you propose here
|
|
||
| private def needsExpandIsr(followerReplica: Replica): Boolean = { | ||
| leaderLogIfLocal.exists { leaderLog => | ||
| !hasInFlightAlterIsr && leaderLogIfLocal.exists { leaderLog => |
There was a problem hiding this comment.
I think we can refactor this a little bit to avoid some duplication and inconsistency. We have the following logic above when updating follower state:
if (!isrState.maximalIsr.contains(followerId))
maybeExpandIsr(followerReplica, followerFetchTimeMs)This is a little inconsistent because here we are checking isrState.isr. I'd suggest splitting this method into something like the following:
def hasReachedHighWatermark(follower: Replica): Boolean = {
leaderLogIfLocal.exists { leaderLog =>
val leaderHighwatermark = leaderLog.highWatermark
isFollowerInSync(follower, leaderHighwatermark)
}
}
def canAddToIsr(followerId: Int): Boolean = {
val current = isrState
!current.inflight && !current.isr.contains(followerId)
}
def needsExpandIsr(follower: Replica): Boolean = {
canAddToIsr(follower.brokerId) && hasReachedHighWatermark(follower)
}Then we can change the logic in maybeExpandIsr to the following:
val needsIsrUpdate = canAddToIsr(followerReplica) && inReadLock(leaderIsrUpdateLock) {
...| val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs) | ||
| if (outOfSyncReplicaIds.nonEmpty) { | ||
| val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds | ||
| val newInSyncReplicaIds = isrState.isr -- outOfSyncReplicaIds |
There was a problem hiding this comment.
Seems like we do not have a check for inflight AlterIsr after the write lock has been acquired.
| * | ||
| **/ | ||
| val candidateReplicaIds = inSyncReplicaIds - localBrokerId | ||
| val candidateReplicaIds = isrState.maximalIsr - localBrokerId |
There was a problem hiding this comment.
This is related to my comment above for the ISR expansion case, but it is a bit confusing to use maximal ISR when the expectation is that we will not shrink as long as we have a pending update inflight. Would it be better to check for inflights and document that this method will return an empty set as long as there is a pending AlterIsr request?
There was a problem hiding this comment.
Makes sense, that will also satisfy your other comment about not checking for inflight requests within the write lock
There was a problem hiding this comment.
Also, yes it's confusing to refer to maximalIsr here even though it should always equal the committed ISR at this point (assuming we check for inflight first).
| partition.maybeShrinkIsr() | ||
| assertEquals(Set(brokerId), partition.inSyncReplicaIds) | ||
| assertEquals(10L, partition.localLogOrException.highWatermark) | ||
| assertEquals(alterIsrManager.isrUpdates.size, 1) |
There was a problem hiding this comment.
I may have missed it, but do we have tests which verify error handling? I see tests which verify requests get sent, but at a quick glance I didn't see tests of responses.
* Cleaned up expand/shrink ISR methods * Style stuff * Logging stuff
hachikuji
left a comment
There was a problem hiding this comment.
@mumrah One additional thought occurred to me. After the controller receives an AlterIsr request and updates the partition state, I think we need to call onPartitionReassignment to see if there is a pending reassignment which can be completed.
|
@hachikuji yea, good catch. This works today using a ZK watch on the partition "/state" znode which is still getting triggered with this PR. We can modify the new ISR update path to explicitly call |
|
Yeah, I'm ok leaving that for https://issues.apache.org/jira/browse/KAFKA-10521. |
| val partitionError: Errors = controllerContext.partitionLeadershipInfo(tp) match { | ||
| case Some(leaderIsrAndControllerEpoch) => | ||
| val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr | ||
| if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { |
There was a problem hiding this comment.
Consider the following scenario:
- broker sends AlterIsr
- the update succeeds but the response is lost
- broker retries AlterIsr
Currently the leader will be stuck after 3) because it has no way to get the latest LeaderAndIsr state if the first attempt fails. To handle this, I think we need to add an idempotence check here. After we have validated the leader epoch, if the intended state matches the current state, then we can just return the current state.
There was a problem hiding this comment.
I was trying to think some kind of race with a zombie leader trying to update the ISR, however this would get fenced by the leader epoch. This should be pretty easy to add
| debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.") | ||
| case Errors.INVALID_UPDATE_VERSION => | ||
| debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to invalid zk version. Retrying.") | ||
| sendAlterIsrRequest(isrState) |
There was a problem hiding this comment.
It might make more sense to handle this case similarly to FENCED_LEADER_EPOCH. Retrying won't help since we know our version will be rejected Come to think of it, this would be kind of a strange error to hit in the current implementation which only allows one request inflight at a time. For controller-initiated changes, we'd expect to hit FENCED_LEADER_EPOCH. Anyway, I think it's still worth keeping the error.
| sendAlterIsrRequest(isrState) | ||
| case _ => | ||
| warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to $error. Retrying.") | ||
| sendAlterIsrRequest(isrState) |
There was a problem hiding this comment.
Is there any way that we could end up retrying after the pending ISR state had already been reset? I know we have AlterIsrManager.clearPending, but that only removes the partition from the unsent queue. How do we handle inflight AlterIsr requests after the state has been reset. Seems like it might be worth adding a check here to validate whether the request is still needed.
There was a problem hiding this comment.
True, we could see a new ISR from controller initiated changes via LeaderAndIsr while our request is in-flight. We have a check for this on successful responses, but we should also check here. Since our request failed, we don't have a leaderEpoch to check against so I think the best we can do is see if isrState is still pending before re-sending the request
* Make AlterIsr idempotent in the controller * Tweak some error handling * Add a few tests * Style feedback
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. I left a few more nits which can be saved for a follow-up.
| warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to $error. Retrying.") | ||
| sendAlterIsrRequest(isrState) | ||
| } else { | ||
| warn(s"Ignoring failed ISR update to ${proposedIsr.mkString(",")} since due to $error since we have a committed ISR.") |
There was a problem hiding this comment.
nit (for follow-up): fix grammar "since due"
| } | ||
|
|
||
| /** | ||
| * This is called for each partition in the body of an AlterIsr response. For errors which are non-retryable we simply |
There was a problem hiding this comment.
nit: conventionally we prefer "retriable"
| partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH) | ||
| None | ||
| } else if (newLeaderAndIsr.equalsIgnoreZk(currentLeaderAndIsr)) { | ||
| // If a partition is already in the desired state, just return it |
There was a problem hiding this comment.
It might be worth mentioning that this could happen in the case of a retry after a successful update.
| alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) | ||
| } else if (!controller.isActive) { | ||
| sendResponseMaybeThrottle(request, requestThrottleMs => | ||
| alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception())) |
There was a problem hiding this comment.
nit: leave off parenthesis after exception
|
I think the failing tests are known flakes and should be fixed by https://issues.apache.org/jira/browse/KAFKA-10514. |
|
I got a thought, there is a scenario that A leader can see its followers, but cannot see Zookeeper, and then the leader will be fenced when it attempts to shink isr or expand isr because it holds the leaderIsrUpdateLock, but now the leader can't be fenced because it just sends the message and will process message normally. when the ack=1, it will continue processing the msg, and will be lost. should we reject process the msg when the alterisr find the broker is not in liveBrokerIdAndEpochs? click me |
|
@jacky1193610322 I missed this comment before. It's a good question. In general, the leader will continue in its current state as long as possible. As you say, as soon as it needs to shrink/expand the ISR, it grabs the leaderAndIsr update and attempts to synchronously update the state. If Zookeeper can't be reached, then the thread gets stuck. Eventually this causes the broker to effectively deadlock, which has the side effect of preventing any Produce requests (and any other requests) from getting through. I think it's a fair point that this affords some protection for acks=1 requests, but I think we tend to view the side effect of deadlocking the broker as worse than any benefit. In KIP-500, we have an alternative approach for self-fencing. The analogous case is when the leader cannot reach the controller. We use a heartbeating mechanism to maintain liveness in the cluster. Unlike with Zookeeper, we do not rely on the session expiration event in order to tell that a broker has been declared dead. Instead if we do not get a heartbeat response from the controller before some timeout, then we will stop accepting Produce requests. I have been thinking a little bit about your suggestion to self-fence after getting an invalid version error from AlterIsr. It might help in the interim before KIP-500 is complete. I think our expectation here was that if we get an invalid version error, then the LeaderAndIsr with the updated state should soon be on the way. I suppose we could come up with reasons why that assumption might fail, so it might make sense to be a little more defensive. I will file a jira about this and we can see what others think. Thanks for the suggestion! |
|
Thanks for your reply, I also have read KIP-500 and other KIP-631, It's good about the fence. but it will be released in a few months, before that, I think we also need to try the best to fence the broker when the controller already think the broker has died. In other words, we should fence 2-way. |
Background
On the broker side, ISR changes in the following cases:
Previously, we would synchronously update the partition state in ZK which includes the current leader+epoch as well as the ISR. The ZK version is used for concurrency control and is known to the broker (kept in LeaderAndIsr struct). In order to notify the controller of the ISR changes, the leader would periodically write to a special "isr_change_notification" znode which the controlled kept a ZK watch on. A list of modified partitions was written to this znode so the controller would only reload the necessary ISRs.
With KIP-497, we introduce an asynchronous ISR workflow that makes the controller the only component which can change the ISR state.
TLDR
Broker changes
Now when the broker wants to modify the ISR, it will send an AlterIsr request to the controller. Since this request can include any number of partition's ISR modifications, we will want to allow batching of ISR updates. This is done by adding a small delay (50ms) before sending the request to the controller after an ISR change is requested. By adding this delay, other ISR changes can accumulate before getting sent off as a single AlterIsr request.
Once the controller makes the update, it will return the updated ISR to the leader and send UpdateMetadata requests to the other interested brokers. As specified in the KIP, while a leader is waiting on the controller to respond it will assume the largest ISR (i.e., the union of the actual ISR and the pending ISR). For ISR expansions, this allows the leader to assume the new ISR right away. For ISR shrinks however, the leader must wait for confirmation from the controller that the ISR has been persisted (see the KIP for more details). We call this oversubscribed ISR the "maximal" ISR
A new
AlterIsrManagerclass is added which accumulates ISR changes and uses a backgroundBrokerToControllerChannelManagerthread to send AlterIsr requests to the controller.The use of AlterIsr is only enabled when a broker is configured with IBP version 2.7-IV2 or greater.
Controller changes
A new
AlterIsrReceivedcontroller event is added along with supporting methods for handling AlterIsr requests from KafkaApis as well as actually processing the request and modifying ZooKeeper. The proposed ISRs are validated against the controller's metadata before writing to ZK. Once all of the ISRs have been written out (or failed), the controller will return the AlterIsr response which includes the updated ISRs and zk versions.After returning the response, the controller will then send out UpdateMetadata requests for all partitions included in the AlterIsr request.
In order to support rolling upgrades and brokers on older IBP, the controller will accept AlterIsr and monitor the ZK watch for ISR changes. Once we fully migrate away from ZK, AlterIsr will be the only mechanism for updating the ISR.
Configs
TODO
Metrics
TODO