KAFKA-13370: add errors when commit offsets failed and add tests#11413
KAFKA-13370: add errors when commit offsets failed and add tests#11413showuon wants to merge 1 commit intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
side fix: the error could be timed out or "cancelled". Add in the log.
There was a problem hiding this comment.
Sometimes the worker task stops need more than 1 sec. Increasing the timeout to 10 secs to make it reliable.
|
@chia7712 , could you please take a look? Thanks. |
| double failurePercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-failure-percentage"); | ||
| double successPercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-success-percentage"); | ||
|
|
||
| if (!isCommitSucceeded) { | ||
| assertTrue(failurePercentage > 0); | ||
| assertTrue(successPercentage == 0); | ||
| } else { | ||
| assertTrue(failurePercentage == 0); | ||
| assertTrue(successPercentage > 0); |
There was a problem hiding this comment.
Add offset-commit-failure-percentage and offset-commit-success-percentage metrics verification.
| // Very rare case: offsets were unserializable, and unable to store any data | ||
| log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error); | ||
| finishFailedFlush(); | ||
| recordCommitFailure(time.milliseconds() - started, error); |
There was a problem hiding this comment.
doFlush will return null after calling the callback with error attached. Handle the failed flush here, since we can know which error is thrown.
|
@UnityLung @chia7712 @rhauch , please help review this PR when available. Thank you. |
|
@UnityLung @chia7712 @rhauch , please help review this PR. thank you. |
|
@showuon, thanks for trying to fix this issue. But I think the best course of action here is actually to revert #9642, for a few reasons.
So I think I'm going to revert #9642, but your additional unit tests here are also very useful. Would you mind creating a new PR with those unit test improvements? Thanks! |
In #9642, we removed the unnecessary
successparameter, and use theerroras the key to identify if the commit successfully or failed. That's a good improvement. However, there are some cases we passedsuccesswithfalse, but withouterrorvalue. I think we should always pass theerrorvalue when failed. Fix it and add tests. After this fix, allrecordCommitFailurecall will always pass with an error.Committer Checklist (excluded from commit message)