KAFKA-7211: MM should handle TimeoutException in commitSync#5492
KAFKA-7211: MM should handle TimeoutException in commitSync#5492huxihx wants to merge 5 commits intoapache:trunkfrom
Conversation
With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown in commitSync(). Besides, MM should only commit offsets for existsing topics.
|
@hachikuji Please review this patch. Thanks. |
|
@hachikuji Could you take some time to review this patch? Thanks. |
|
|
||
| def commit() { | ||
| consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))}.asJava) | ||
| val existingTopics: Set[String] = Try(consumer.listTopics) match { |
There was a problem hiding this comment.
Should we do this in case of Timeout error and clear offsets map? Checking for every commit maybe costly.
There was a problem hiding this comment.
@omkreddy Thanks for the response. Currently, commitSync will retry indefinitely when committing offsets for a nonexistent topic, and user could do nothing to get rid of this. That's why I always check the existence of topics before commitSync is called. Does it make any sense?
There was a problem hiding this comment.
With KIP-266, even commitSync will eventually timeout (see the use of default.api.timeout.ms). I prefer @omkreddy's suggestion since listing all the topics in the cluster can be expensive.
There was a problem hiding this comment.
With the trunk code, commitSync indefinitely retried instead of throwing TimeoutException when the topic was deleted. Here were the steps I reproduced:
- Create a test topic named
test - Write a simple consumer to commit offsets for this topic every 3 seconds using
commitSync - Delete this topic
Later, the consumer complained "Offset commit failed on partition test-0 at offset...." forever. Did I miss anything?
There was a problem hiding this comment.
We should get Timeout error after default.api.timeout.ms (default 60secs).
verified with trunk code and its working as expected.
| case Failure(_) => Set.empty | ||
| } | ||
| if (existingTopics.nonEmpty) | ||
| consumer.commitSync(offsets.filterKeys(tp => existingTopics.contains(tp.topic)).map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, "")) }.asJava) |
There was a problem hiding this comment.
nit: we can use OffsetAndMetadata(long offset) constructor
There was a problem hiding this comment.
Also, split to multiple lines. Typical line width is 120 characters.
| if (existingTopics.nonEmpty) | ||
| consumer.commitSync(offsets.filterKeys(tp => existingTopics.contains(tp.topic)).map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, "")) }.asJava) | ||
| else | ||
| consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, "")) }.asJava) |
There was a problem hiding this comment.
nit: we can use OffsetAndMetadata(long offset) construtor
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch. Do you have any ideas for how we can test the fix?
|
@hachikuji Please review again. Thanks. |
|
@lindong28 You may want to review this since LinkedIn tends to trigger a bunch of MirrorMaker edge cases. |
|
@ijuma Sure. I will review this PR. |
|
@huxihx It seems that the 4 commits in this patch are interleaved with other unrelated commits. Can you rebase the patch so that all commits in this PR are at the top of the branch? |
There was a problem hiding this comment.
If there is performance issue and request queue time is more than 60 seconds, previously MM will keep waiting until offset commit succeeds. Now the MM just timeout offset commit operation and continue consume/produce messages. This may cause data duplication if commitOffsets(...) is called in onPartitionsRevoked() or if it is called when the MM is shutdown.
Would it make sense to change the default configuration of consumers in MM so that default.api.timeout.ms is set to MAX_LONG to keep old behavior as default behavior?
Going further along this line, in the scenario that offset commit can not finish in 60 seconds in onPartitionsRevoked(), what is the benefit to let MM give up waiting and give the partition ownership to another consumer, instead of keep trying/waiting infinitely?
There was a problem hiding this comment.
Good question. I would think this PR does not guarantee the absence of duplicate messages. Two possible points for this PR are: 1. Explicitly capturing TimeoutException and warning give user a hint that they might need to adjust the value of default.api.timeout.ms; 2. Have MM able to remove offsets for already-deleted topics from offsets.
Due to the reason of Point 1 above, I don't think setting default.api.timeout.ms to MAX_LONG is reasonable since the introduction of it does give users the ability to get rid of the indefinitely waiting.
Does it make any sense?
There was a problem hiding this comment.
Hey @huxihx, just to clarify, you agree that we should set default.api.timeout.ms to MAX_LONG in MM, right?
There was a problem hiding this comment.
BTW, if the default.api.timeout.ms is set to MAX_LONG and a topic is deleted, what is the behavior of offset commit?
There was a problem hiding this comment.
@lindong28 Thanks for the review. Firstly, I don't quite agree to set default.api.timeout.ms to MAX since it's sort of against the intention why we introduce it. Secondly, the consumer will complain Offset commit failed on partition test-0 at offset.... forever if default.api.timeout.ms is set to MAX and topic is deleted.
There was a problem hiding this comment.
Just to clarify, I am not suggesting that we need to set default.api.timeout.ms to MAX_LONG. I am currently thinking that we may want to use the 60 seconds as timeout value but additionally keep retrying offset commit in e.g. onPartitionsRevoked(), and if MM can not finish offset commit after certain amount of retries, MM should fail fast.
We can discuss based on the reason of offset commit timeout. If the offset commit timeout is due to long request queue time in broker, then even if rebalance completes, the next broker will also not able to commit offset and finally all consumed data will be duplicated. In this case it seems reasonable to just let MM fail fast and let SRE investigate the performance issue in broker and possibly increase the default.api.timeout.ms value.
If offset commit failed due to persistent network error in the given MM host, regardless of whether this MM host timeout or keep retrying, the behavior of this MM host should not affect behavior of other MM. And since this MM will not be able to communicate with other hosts, we would like this MM to fail fast, which will be done by the alternative approach after certain amount of retries.
There was a problem hiding this comment.
I have to admit that I cannot figure out a better solution than yours... How about we separate the commit in onPartitionsRevoked from others. That's to say, only fail fast the commit in onPartitionsRevoked but keep the rest unchanged?
There was a problem hiding this comment.
@huxihx There are currently three cases where MM does offset commit. These are 1) regular offset commit, 2) onPartitionsRevoked() and 3) MM shutdown.
It seems that user probably also want MM to retry more if offset commit timeout during MM shutdown, since otherwise it just means data duplication which is not good. And it may be reasonable asking user to explicitly kill -9 MM if user decides that offset commit won't happen and they would want to tolerate message duplication.
The motivation for infinite offset commit retries for regularly offset commit is less strong because if a regular offset commit timed out, it does not directly mean message duplication. On the other hand it does increase the impact of message duplication if user finally decides the offset commit can not pass due to constant backlog in the broker side. I am inclined to also keep retries in this case. The cons is that MM may stuck on offset commit forever if broker's request queue time is constantly large. But the resulting degraded performance seems reasonable in the event that broker's performance has degraded. On the other hand message duplication is about correctness and it may cause problem for downstream application if MM accidentally duplicates hours worth of data just due to e.g. misconfiguration or broker performance issue. In this case correctness seems more important than performance.
There was a problem hiding this comment.
Hey @huxihx, thanks for all the work! Do you think it is reasonable to retry offset commit infinitely by default in MM per explanation above? I am happy to discuss more.
There was a problem hiding this comment.
Sure, I will think about it and resubmit a patch later.
There was a problem hiding this comment.
Related to the concern of data loss, would it be useful to also keep track and log the time of last successful commit in the warning message, so that SRE can gauge how much time worth of data has been duplicated?
|
@lindong28 Thanks for the comments. Please review again. |
|
@huxihx Could you rebase your commits so that all commits are consecutively appended on top of the latest trunk? It looks like there are a lot of commits between the first commit and the remaining commits of this patch. |
|
@lindong28 Please review again. |
| } | ||
|
|
||
| def commitOffsets(consumerWrapper: ConsumerWrapper) { | ||
| def commitOffsets(consumerWrapper: ConsumerWrapper, retry: Int = Integer.MAX_VALUE): Unit = { |
There was a problem hiding this comment.
It seems that MirrorMaker. commitOffsets() is public and thus it is public API of an existing tool. Maybe we need to have a KIP in order to change this API. For now it seems simpler to just keep the old compatible behavior such that MM will block infinitely if commit can not pass due something other than topic deletion. If there is good use-case for other behavior than we can have a KIP. Does this sound OK?
| throw e | ||
|
|
||
| case _: TimeoutException if retry > 0 => | ||
| if (retry == Integer.MAX_VALUE) { // only try to remove offsets for nonexistent topics once |
There was a problem hiding this comment.
What if topic deletion happens after a few retries? It seems safer to to filter topic for every retry. It is probably OK because the time used to filter topic in MM should be much smaller than the time needed for backoff and the time needed to wait for offset commit to pass, right?
There was a problem hiding this comment.
Thanks for the update. LGTM. Left only one minor comment.
@hachikuji There is some change in the MM behavior since your last review. Do you want to take another look?
| retry += 1 | ||
| warn("Failed to commit offsets because the offset commit request processing can not be completed in time. " + | ||
| s"If you see this regularly, it could indicate that you need to increase the consumer's ${ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} " + | ||
| s"Last successful offset commit timestamp=${lastSuccessfulCommitTime}, retry count=${retry}") |
There was a problem hiding this comment.
nits: we can replace ${lastSuccessfulCommitTime} with $lastSuccessfulCommitTime. Same for ${retry}
| @@ -69,6 +70,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { | |||
| private var offsetCommitIntervalMs = 0 | |||
There was a problem hiding this comment.
not related to this PR. Can we update the comments above:
"There are N mirror maker threads each having one KafkaConsumer instance" ?
|
@huxihx Thanks much for the PR. LGTM. The PR is merged to trunk. |
With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown in commitSync(). Besides, MM should only commit offsets for existsing topics. Author: huxihx <huxi_2b@hotmail.com> Reviewers: Dong Lin <lindong28@gmail.com> Closes apache#5492 from huxihx/KAFKA-7211
With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown in commitSync(). Besides, MM should only commit offsets for existsing topics.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)