KAFKA-7109: Close cached fetch sessions in the broker on consumer close#5407
KAFKA-7109: Close cached fetch sessions in the broker on consumer close#5407stanislavkozlovski wants to merge 8 commits intoapache:trunkfrom stanislavkozlovski:KAFKA-7109-close-fetch-sessions-on-consumer-close
Conversation
|
@cmccabe please review. No easy way to test this as far as I'm aware, integration tests should at least verify it doesn't break anything. |
|
cc @cmccabe |
There was a problem hiding this comment.
We should also clear the sessionHandlers map here, just for tidiness.
|
Thanks for this, @stanislavkozlovski. Looks good. We need a test, though. Take a look at |
|
Initially I couldn't find a way to test this in |
There was a problem hiding this comment.
Should we also check that there are no partitions contained in the request?
There was a problem hiding this comment.
Also we should match the fetch session with the earlier one
There was a problem hiding this comment.
Absolutely. Done
|
LGTM. Thanks, @stanislavkozlovski. |
|
Retest this please |
1 similar comment
|
Retest this please |
There was a problem hiding this comment.
Don't we have to do something with the futures returned by send? How do we know if the requests completed? Also, cc @hachikuji.
There was a problem hiding this comment.
We had discussed offline that there is no need to wait and ensure the requests completed. It makes sense to make it best effort, otherwise it can slow down client termination and I assume people prefer the client terminates quicker rather than make sure no unused sessions are in the broker
There was a problem hiding this comment.
We should always document things like that so that people reading the code are aware of it. My point was more subtle though, how do we make sure we actually sent the request?
There was a problem hiding this comment.
More specifically, there's no point in having this if we close the network client before we even get to the point where the request has reached the network layer.
There was a problem hiding this comment.
Oh, I did not know that was a possibility. I'll dive in.
Fair point with the documentation
There was a problem hiding this comment.
It probably makes sense to wait a short time, but not very long, to send the request to the broker. I'm not sure how that interacts with the rest of the client close logic
There was a problem hiding this comment.
Yeah, good catch @ijuma. This is the code for closing the KafkaConsumer:
ClientUtils.closeQuietly(fetcher, "fetcher", firstException);
ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);Meaning there's a pretty big chance the client is closed before the request are sent.
But then I read the Javadoc of #send()
Send a new request. Note that the request is not actually transmitted on the
* network until one of the {@link #poll(long)} variants is invoked. At this
* point the request will either be transmitted successfully or will fail.
* Use the returned future to obtain the result of the send. Note that there is no
* need to check for disconnects explicitly on the {@link ClientResponse} object;
* instead, the future will be failed with a {@link DisconnectException}.
Meaning this code doesn't transmit any requests since poll is not called at all...
I see two approaches to this
- Call
poll(Long timeout)with therequest.timeout.msconfigured timeout after the for loop. This will give all requests a grand total oftimeouttime to be sent. This call will block for a total maximum ofrequest.timeout.ms(default is 30 seconds which seems high to me for this use case). Maybe a total timeout of N seconds, where N is the number of requests? - Call
poll((RequestFuture<?> future, long timeout)individually for each sent close request. This blocks on each call, so it's questionable what the timeout should be (1 second?)
In both cases, we will slow down the closing of the consumer for a bit, as we will have to block as far as I can tell.
As a sidenote - this code would also need to handle exceptions as it currently doesn't. My idea is to only catch & log the error.
There was a problem hiding this comment.
The consumer close code needs to block until the fetch sessions are closed. KafkaConsumer has close() and close(long timeout, TimeUnit timeUnit). In the case of the latter one, we don't want to wait longer than the specified time.
|
Retest this please |
There was a problem hiding this comment.
wonder if we should handle cases where this is less than 0
There was a problem hiding this comment.
Can we not use the Timer that we pass to the Coordinator?
There was a problem hiding this comment.
We should consider using the Timer abstraction in the Fetcher methods too.
There was a problem hiding this comment.
Yes, agreed
There was a problem hiding this comment.
Although if we use the same Timer we ought to make the coordinator close with a timer of timeoutMs, not the min of timeoutMs and requestTimeoutMs, right?
|
@cmccabe I've addressed the latest changes and rebased to trunk |
There was a problem hiding this comment.
I don't think so since it won't be able to pass the Timer class to fetcher.close. Maybe we could introduce a new interface called CloseableWithTimeout/TimeoutCloseable and have closeQuietly handle that as well?
There was a problem hiding this comment.
Or have one that takes a lambda so that the caller can do the close. Similar to what we have for Scala.
There was a problem hiding this comment.
Can we not use the Timer that we pass to the Coordinator?
There was a problem hiding this comment.
We should consider using the Timer abstraction in the Fetcher methods too.
There was a problem hiding this comment.
Why is the Math.min not done any longer?
There was a problem hiding this comment.
That's what I asked in #5407 (comment)
Since the Timer will be shared across multiple close() methods, having it as short as one requestTimeoutMs may not give sufficient time for fetcher.close(). Do you think it's okay to use a shared timer with the minimum for both close calls?
There was a problem hiding this comment.
We could update the timer so that the min was the requestTimeoutMs for the second close too.
There was a problem hiding this comment.
In the end I think the timer for the second close should be the minimum of what time is left out of timeoutMs and requestTimeoutMs.
|
Retest this please |
|
Retest this please |
Previously, the consumer's incremental fetch sessions would time out once the consumer was gone.
|
It may be worth reviving this, it requires a rebase. |
|
@stanislavkozlovski , encountered similar errors and I agree we should close the incremental fetch sessions when consumer closed. Are you able to complete this PR? If not, I can co-author with you to help complete this PR to merge this good improvement. Thank you. |
|
@stanislavkozlovski , I'm going to take over this ticket tomorrow if you don't mind. Thank you. |
|
@showuon yes, please feel free. apologies for being slow to respond and missing the previous messages about reviving this. I will go ahead and close this PR now |
|
Thank you! |
Previously, the consumer's incremental fetch sessions would time out once the consumer was gone.