KAFKA-9274: Gracefully handle timeout exception#8060
KAFKA-9274: Gracefully handle timeout exception#8060guozhangwang merged 2 commits intoapache:trunkfrom
Conversation
mjsax
left a comment
There was a problem hiding this comment.
Some initial comments and questions
There was a problem hiding this comment.
If we change the semantic, we should update the class JavaDocs accordingly -- also, do we need to state that "It means all tasks belonging to this thread have been migrated" ?
There was a problem hiding this comment.
Why do we need to clear the store now but not before?
There was a problem hiding this comment.
Because before we never "revive" a task: once it's closed it's dead; but now it is possible and we may re-initialize the stores and if we do not clear it would cause an illegal-state exception.
There was a problem hiding this comment.
Well -- we could also "buffer" the record and try to send it later? In the mean time we would need to pause the corresponding task though to not process more input records (or course, we would need to let the task finish processing the current input record what might lead to more output records that we would need to buffer, too). -- This is just a wild thought and we could also handle this case later if required.
There was a problem hiding this comment.
I thought about the buffering mechanism here, but decided it may not worth since we've not seen timeout from partitionsFor -- it should be quite rare because in most cases the producer already got the partition metadata cached locally. If we found this call timing out become an issue we can revisit the buffering, wdyt?
There was a problem hiding this comment.
Does the consumer not log this already within assign(...) ?
There was a problem hiding this comment.
Yes but it does not log the "delta" :) Joking aside, I found that logging the added / removed partitions are important and making the debugging easier.
There was a problem hiding this comment.
Same question as above (this question comes up for more log statements below -- don't add a comment each time)
There was a problem hiding this comment.
Hmm for pause / resume I think I buy your argument -- I can remove these two from the changelog reader.
There was a problem hiding this comment.
After some more thoughts I feel it's better to still keep it since in many cases (e.g. in unit test trouble shooting) we usually only enable debugging on sub-packages like o.a.k.streams instead of everything, so we cannot always rely on the embedded client's log4j entries.
There was a problem hiding this comment.
Why do we throw TimeoutException directly but not wrap it?
There was a problem hiding this comment.
Because on the caller TaskManager we would swallow TimeoutException anyways.
There was a problem hiding this comment.
To what extent do we skip? Seems the method just executes as always?
There was a problem hiding this comment.
We skip adding records to it. After reviewed your PR I think I would refactor this logic a bit more, stay tuned.
There was a problem hiding this comment.
I moved this logic from addRecords to isProcessible so we would still add records for closing tasks but would skip processing them.
There was a problem hiding this comment.
Do we need to distinguish StreamsException and KafkaException (StreamsException is a KafkaException and both are fatal)`?
Actually similar question about KafkaException and Exception? The different error messages don't seems to provide much value?
There was a problem hiding this comment.
Yeah that's a good question.. My original plan is that we should not expect KafkaException to be thrown here since we already wrap all of them as StreamsException so if there's any thrown it may be a bug. But since either case we re-throw it anyways now I'm not feeling so strong (I've also thinking if we just do not re-throw exception if StreamsException since it is expected, but that is a behavior change since user's registered handler would not trigger then). I can just collapse all of them into one capture and just log the actual exception as well, wdyt?
There was a problem hiding this comment.
Should we keep this check and add final else that throws an IllegalStateException instead?
There was a problem hiding this comment.
Yeah that sounds better :)
There was a problem hiding this comment.
Why do we only remove the task if it was active now?
There was a problem hiding this comment.
We previously also only remove the task (#8058 (comment)), the original motivation to only remove the task is that standby tasks are likely to go back and even if they do not they can still be closed in the next rebalance's onAssignment call, so we just keep them a bit longer hoping it worth the "wait" :)
5adea85 to
74cfbf1
Compare
| client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); | ||
|
|
||
| // retry initialization should work | ||
| producer.initTransactions(); |
There was a problem hiding this comment.
This is just to verify that initTransactions can indeed be retried. cc @hachikuji
guozhangwang
left a comment
There was a problem hiding this comment.
Extracted the timeout handling logic from #8058.
|
LGTM. There are still a few cases in the |
|
@mjsax thanks for pointing it out, I plan to do another cleanup to merge |
Delay the initialization (producer.initTxn) from construction to maybeInitialize; if it times out we just swallow and retry in the next iteration.
If completeRestoration (consumer.committed) times out, just swallow and retry in the next iteration.
For other calls (producer.partitionsFor, producer.commitTxn, consumer.commit), treat the timeout exception as fatal.
Committer Checklist (excluded from commit message)