KAFKA-3388: Fix expiration of batches sitting in the accumulator#1056
KAFKA-3388: Fix expiration of batches sitting in the accumulator#1056becketqin wants to merge 9 commits intoapache:trunkfrom
Conversation
| * if the leader is unavailable. | ||
| * A batch whose metadata is not available should be expired if one of the following is true: | ||
| * <ol> | ||
| * <li> the batch is not in retry AND request timeout has eplapsed after it is ready. (We need to see if a batch is |
| boolean maybeStaleMetadata = metadata.lastRefresh() > metadata.lastSuccessfulUpdate(); | ||
| // We will check all the batches if metadata might be old, this is to avoid the case that no batches will | ||
| // be timeout out when all the brokers are done and producer keeps the stale metadata without timing out | ||
| // the batches. |
There was a problem hiding this comment.
Hmm, can you please try to clarify this comment? There is a typo in will be timeout out and what does it mean to say that brokers are done? Do you mean down?
There was a problem hiding this comment.
Sorry for the confusion... Yes, it should be "down" instead of "done"...
|
@becketqin : Since we will be cutting the 0.10.0 release today, I recommend that we just leave this out of 0.10.0 if we don't have a patch ready in the next few days. |
|
@junrao Sure, I will try to make the patch ready as soon as possible. But I agree this probably should not block 0.10.0 release. I will change the ticket to critical instead of blocker. |
|
@junrao I updated the patch based on your suggestion. I did not add a method to the NetworClient because it is sort of a public API. So instead I passed the client into the |
| } else { | ||
| if (!batch.inRetry()) { | ||
| break; | ||
| // We will check if the batch should be expried if one of the following is true: |
| // 1. The leader is unknown. | ||
| // 2. The leader broker is disconnected. | ||
| Node leader = cluster.leaderFor(entry.getKey()); | ||
| if (leader == null || client.connectionFailed(leader)) { |
There was a problem hiding this comment.
Is just checking connectionFailed enough? For example, the connection may be fine, but a batch can't be sent to leader because the max inflight requests to leader has been reached. In this case, it seems that we can timeout a batch in the accumulator before those that are still in flight.
|
Another related thing is that the send ordering is not really guaranteed with retry when max.inflight.requests > 1. For example, when retried batches are added back to accumulator, they may not be in the send order. So, in this case, the sequence of timing out retried batches is not necessarily guaranteed to be ordered. |
|
@junrao I guess if max.in.flight.requests > 1, the timeout order does not really matter at that point either. In general, when there are Exceptions, the callback order is only guaranteed if max.in.flight.request = 1. Otherwise, the later batch could have a non-retriable exception and got failed immediately when the earlier batch is waiting for a retry. |
|
@becketqin - if this passes review (and is well tested!) before next Monday, we can include this in the next RC. otherwise, it may need to go into trunk and wait for next release (but you guys are running trunk anyway...) |
| try { | ||
| leaderNotConnected = client.connectionFailed(leader); | ||
| } catch (IllegalArgumentException e) { | ||
| // This means the client does not know the leader node. So it is not connected. |
There was a problem hiding this comment.
It would be better to either introduce a method to verify this condition or to change connectionFailed to return false if there is no ConnectionState for this node. The former seems safer.
There was a problem hiding this comment.
@ijuma I actually went back and forth a bit on this. I was thinking about adding a isKnownNode method to the NetworkClient. But KafkaClient interface seems more or less a public interface. I don't want to change it at this point. That is why I simply catch the exception here.
There was a problem hiding this comment.
NetworkClient is clearly marked as an internal class in the documentation. And we added and removed methods during the 0.9.0.0 release. It looks like we haven't marked KafkaClient as internal, but we should IMO.
| // This means the client does not know the leader node. So it is not connected. | ||
| } | ||
|
|
||
| if (leader == null || leaderNotConnected) { |
There was a problem hiding this comment.
Is just checking leaderNotConnected enough? For example, the leader connection may be fine, but a batch can't be sent to leader because the max inflight requests to leader has been reached. In this case, it seems that we can timeout a batch in the accumulator before those that are still in flight.
Also, would it be simpler to implement this based on muted partitions? If a partition is muted, we know there is still an outstanding request for the partition and we can disable expiring batches from that partition. This only applies when guaranteeMessageOrder is true, but is probably what we care about anyway.
There was a problem hiding this comment.
@junrao If the leader connection is fine, we will not timeout a batch in the accumulator, right? If the max in flight requests to leader has been reached, it will eventually be released and the batches can still go through.
I am a little worried about the corner case by using muted partitions. For example, if we happened to send a TopicMetadataRequest to a broker which is the leader of a partition, before the TopicMetadataResponse is returned, the node would not be ready for sending. In that case the partition won't be muted, so the batches of that partition in the accumulator could potentially be timed out even though they can still make progress after the TopicMetadataResponse is returned or timed out.
BTW, I fixed an NPE after this comments.
There was a problem hiding this comment.
How about the following case? Initially leader is not available. Batch 1 is full at time T. Batch 2 is full at time T + 1 sec. At time T + 10 sec, leader is available. Batch 1 is drained and batch 2 is still in accumulator since inflight request is 1. Batch 2 times out at T + 31 sec. Batch 1 later times out at T + 10 + 30 secs.
For your concern on pending TopicMetadataRequest, in that case, the batch has been sitting in accumulator for at least 30 secs. So timing it out also seems reasonable and it preserves the timeout ordering.
There was a problem hiding this comment.
@junrao Not sure if I missed something. In order to time out Batch 2 at T + 31 secs, the NetworkClients has to be disconnected from the leader broker at that point. When NetworkClient disconnects from a broker, it will clear the in-flight request in processDisconnection(). And in Sender.handleProduceResponse(), the sender will re-enque the failed batch when the ClientResponse.disconnected=true if retries are not exhausted.
That means if NetworkClient.connectionFailed() returns true, there is no in flight request to that broker.
So in your example, at T + 31 sec (before Batch 1 hit request timeout hit at T + 10 + 30 secs), if the NetworkClient disconnects from the broker, Batch 1 will be re-enqued right after the NetworkClient update the node status to DISCONNECTED. So in the next poll() when the partition sees its leader is disconnected, batch 1 should have already been re-enqued. Since batch 1 has just been re-enqued, it won't be timed out immediately. And because we break out of the expiration checking while loop once we see a batch that should not be timed out. Batch 2 will not be timed out either.
You are right that from timeout ordering perspective, it is fine to use the muted partitions. And it is also simpler. I was just trying to see if we can make the producer more robust to the server failure because that was also the original motivation of adding the request timeout.
There was a problem hiding this comment.
@junrao Will you have time to take a further look? My only concern (which is not very strong) about using muted partition is that it seems in some cases we might fail a batch if a broker failed, even though the producer will recover after timeout and retry. Bu I agree that is rare. So using muted partitions also sounds good to me. What do you think? Thanks.
There was a problem hiding this comment.
@becketqin : Yes, you are right. It does seem that checking connectionFail state achieves the same thing. It's just that using muted partitions seems simpler and avoids having to pass in networkClient to recordAccumulator, which is a bit weird.
|
@junrao Thanks a lot for the review and comments. I updated the patch using muted partitions. Can you take another look? Thanks. |
| TopicPartition tp = entry.getKey(); | ||
| Node leader = cluster.leaderFor(tp); | ||
|
|
||
| if (leader == null || !muted.contains(tp)) { |
There was a problem hiding this comment.
It seems that just checking muted is enough?
|
@becketqin : Thanks for the patch. The main logic and the tests make a lot of sense to me now. LTGM |
No description provided.