KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread#6283
Conversation
- When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. - Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix.
|
This is my first contribution and I haven't exactly poured over the entirety of the Connect runtime, so I'll be curious to see if I'm missing something entirely. A few things I'd like reviewers to note that I thought about myself:
|
|
retest this please |
|
Ping @mjsax @kkonstantine @wicknicks @rhauch (not totally sure, but I've seen these folks reviewing other connect PRs) |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @pgwhalen -- nice catch! I think this was not considered an issue is because the internal poll(Integer.MAX_VALUE) call in readToEnd() should never time out. But as you pointed out, there are other method invocations, and timeouts on those should be handled as well.
I have a minor suggestion to help make the log message more readable, and (if you're in the code) another to move the two setter methods on MockConsumer together. Otherwise, this looks great.
| readToLogEnd(); | ||
| log.trace("Finished read to end log for topic {}", topic); | ||
| } catch (TimeoutException e) { | ||
| log.warn("Timeout while reading log to end {}", e.getMessage()); |
There was a problem hiding this comment.
How about putting the topic name in the message (like on line 315), denoting that this will be retried automatically, and maybe why this might happen.
| log.warn("Timeout while reading log to end {}", e.getMessage()); | |
| log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. This may occur when brokers are unavailable or unreachable.", topic, e.getMessage()); |
There was a problem hiding this comment.
Good idea, though looks like this suggested change will exclude the message (doesn't have another {}); I'll fix that.
| public synchronized void setOffsetsException(KafkaException exception) { | ||
| this.offsetsException = exception; | ||
| } | ||
|
|
There was a problem hiding this comment.
Nit: It's probably a bit better to have this be near the setPollException(...) method.
There was a problem hiding this comment.
Agreed, not sure why I put it there.
|
It is a bit surprising it hasn't come up more since AK 2.0 was released, but it would only when brokers become unavailable. It looks like KAFKA-6608 changed the I documented this on KAFKA-7941, but one workaround is to set the This fix should be ported back to AK 2.0. |
- Enhance logging on TimeoutException - Move setOffsetsException method to sensible place in MockConsumer
|
Thanks for picking up the review @rhauch ! I'm surprised it hasn't come up more as well actually, my team experiences it enough that we went through the trouble to add some additional out of process monitoring specifically to catch this error, and now restart our instance of connect nightly in order to get it back in a good state if it hits this. Regardless, glad we're fixing it now! For back porting to 2.0, is that something I should help with by making other PRs or will you/others handle it while merging? |
|
@pgwhalen, my comment about backporting was for posterity. PRs should be based on |
| } | ||
|
|
||
| public synchronized void setException(KafkaException exception) { | ||
| this.exception = exception; |
There was a problem hiding this comment.
@pgwhalen, after thinking about this some more, I think we should keep this older signature, deprecate it, and have it call setPollException(exception). Although MockConsumer is strictly speaking a non-public API, doing this will help avoid problems should people use the MockConsumer class in their tests. And it'd probably help to add JavaDoc that says to use setPollException(...) instead.
There was a problem hiding this comment.
Good point, I forgot it was a mildly public API. I added it back, should be good to merge now unless anything else stands out.
| try { | ||
| readToLogEnd(); | ||
| log.trace("Finished read to end log for topic {}", topic); | ||
| } catch (TimeoutException e) { |
There was a problem hiding this comment.
there are a couple of other places where we call readToEnd() and poll() (links below). should we catch this exception in those places also?
There was a problem hiding this comment.
Thanks for the additional eyes @wicknicks !
The readToLogEnd() call is inside start()so my thinking was that failure would happen on startup which seems okay (if not desirable). If there were broker availability issues it seems reasonable to me for Connect not to start. If we want to try to be more robust we could come up with some retrying strategy, but I bet that would require changes in a number of other places.
The poll() method contains its own catch (KafkaException e) which is a super class of TimeoutException so we ought to be okay there:
|
👍 this affects us, please merge! |
|
can we please merge this ? |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @pgwhalen. This looks good to me, but since we want to backport this all the way back to the 2.0 branch, I've been waiting to merge until we get out of the freeze on the 2.3 branch.
At least approving this. I'll merge this and backport it as soon as we have a vote to release AK 2.3.0.
|
Wondering what is the timeline for this to be integrated/backported into the production release. It's affecting our current project. Sorry if this is not the right place to ask this question. |
|
Giving this one another bump as 2.3 has been out for a bit now. |
| } catch (TimeoutException e) { | ||
| log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + | ||
| "This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage()); | ||
| continue; |
There was a problem hiding this comment.
I would suggest to backoff the retry, as the timeout probably means the unavailability of the resource (it might be the kafka broker itself, or the network, or something else), an immediate retry likely will not be success.
There was a problem hiding this comment.
In practice, actual retries to the network will still be beholden to the request.timeout.ms and retry.backoff.ms inside the consumer, so that isn't an issue here.
There was a problem hiding this comment.
That's true, thanks for refreshing my mind. Then no need to backoff here.
) When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix. Author: Paul Whalen <pgwhalen@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
) When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix. Author: Paul Whalen <pgwhalen@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
) When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix. Author: Paul Whalen <pgwhalen@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
|
Thanks for the contribution and the effort of merging it! @pgwhalen @rhauch What's your thoughts? I think it will be especially helpful when workers' lifecycle is managed by some resource orchestration (such as mesos, k8s, or else), which is our case. |
) When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix. Author: Paul Whalen <pgwhalen@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
…aBasedLog worker thread (apache#6283) TICKET = KAFKA-7941 LI_DESCRIPTION = EXIT_CRITERIA = HASH [2426926] ORIGINAL_DESCRIPTION = When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix. Author: Paul Whalen <pgwhalen@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com> (cherry picked from commit 2426926)
catch TimeoutException and log a warning, which can occur if brokers
are unavailable, otherwise the worker thread terminates.
exceptions not just when polling but also when querying for offsets,
which is necessary for testing the fix.
Committer Checklist (excluded from commit message)