Skip to content

KAFKA-9274: Remove retries from InternalTopicManager#9060

Merged
mjsax merged 5 commits intoapache:trunkfrom
mjsax:kafka-9274-kip-572-internal
Aug 6, 2020
Merged

KAFKA-9274: Remove retries from InternalTopicManager#9060
mjsax merged 5 commits intoapache:trunkfrom
mjsax:kafka-9274-kip-572-internal

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jul 23, 2020

  • part of KIP-572
  • replace retries in InternalTopicManager with infinite retires plus
    a new timeout, based on consumer config MAX_POLL_INTERVAL_MS
  • if the new timeout hits, we don't throw StreamsException any longer (as we did when exceeding retries), but send INCOMPLETE_SOURCE_TOPIC_METADATA error code to let all instances shut down

Third PR for KIP-572 (cf #8864 and #9047)

Call for review @vvcephei

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor side fix: fetchEndOffset can never throw TimeoutException because it catches all RuntimeException and convert them into StreamsException already

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first TODO removed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second TODO removed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third TODO removed -- as we don't pass retires and retry.backoff.ms via admin config any longer, this test is not needed any more

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fourth TODO removed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei this is an open question

\cc @guozhangwang

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that moving forward we should try to not create internal topics during rebalance but try pre-create in starting, but for now assuming this is still the case I think letting the whole application to die is fine --- i.e. treat it the same as source topics. Hence I'm leaning towards encoding INCOMPLETE_SOURCE_TOPIC_METADATA to shutdown the whole app, across all clients.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@mjsax mjsax changed the title KAFKA-9274: Remove retries from InternalTopicManager KAFKA-9274: Remove retries from InternalTopicManager Jul 23, 2020
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jul 23, 2020

Copy link
Copy Markdown
Member

@dajac dajac Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to guarantee that the deadlineMs is respected, I think that we must set the timeout of the AdminClient's call accordingly: CreateTopicsOptions.timeoutMs. With the default, I think that the call could be longer than half of MAX_POLL_INTERVAL_MS_CONFIG.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Default max.poll.interval.ms is 5 minutes (ie, the deadline is set to 2.5 minutes by default) while default api.default.timeout.ms is 1 minutes? Thus we might be ok?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I misread the default value of max.poll.interval.ms, too many zeros for my eyes ;). The default works fine then. Do we want to protect ourselves if the user changes the default? Or shall we just call out that api.default.timeout.ms should be lower than max.poll.interval.ms somewhere?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to add a check in StreamsConfig and either throw or log a WARN depending how strict we want to be.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking a bit more about this, with the default, you may end up not honouring the deadline. createTopics can take up to 1m so if you invoke one when less than 1m is reaming before the deadline, you may not honour the deadline. It may not be that important though.

If we want to strictly enforce it, we could calculate the maximum timeout for each call, something like deadline - now, and set it with CreateTopicsOptions.timeoutMs.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jul 23, 2020

Only the StreamsStandbyTask.test_standby_tasks_rebalance system test failed and it's know to be buggy.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jul 23, 2020

Jenkins failed on know flaky tests only.

Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we reduce max poll interval?

Copy link
Copy Markdown
Member Author

@mjsax mjsax Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, we change the "deadline" in the group leader to create/verify all internal topics from "counting retries" to a timeout of max.poll.interval.ms / 2 and we reduce the default of 5 minutes to speed up this test.

cf https://github.com/apache/kafka/pull/9060/files#diff-d3963e433c59b08688bb4481faa20e97R79

@abbccdda
Copy link
Copy Markdown

abbccdda commented Aug 4, 2020

Also, do we have new unit test coverage for the changes?

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 4, 2020

Updated. Call for review @abbccdda

Also, do we have new unit test coverage for the changes?

We did not really change much, only switching from retries to a "timeout" and thus existing unit tests (that I updated accordingly, eg, different configs and/or different exception type) should be sufficient?

The only other thing is the new error code and I just added 2 unit tests for this case.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of the initiative to throw a different exception? Could we update the summary of this PR?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we update the meta comment to explain when a TaskAssignmentException is thrown?

Copy link
Copy Markdown
Member Author

@mjsax mjsax Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskAssignmentException is not a checked exception and it's just a curtesy declaration... (I can also remove throws TaskAssignmentException if you prefer).

Don't see any need to document anything further -- the code makes it clear (in fact, this method does not even throw the exception itself, but it just bubbles up from internalTopicManager.makeReady and the code documents itself:
https://github.com/apache/kafka/pull/9060/files#diff-d3963e433c59b08688bb4481faa20e97R179-R184

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments for thrown exception

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/temporary/temporarily

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 4, 2020

@abbccdda I updated the PR. I also realized, that it might be better to throw a TimeoutException instead of a TaskAssignmentException if we hit the new timeout. Updated the code and tests accordingly.

Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor comment.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we could throw different exceptions here, would be good to add a log to indicate which type of exception is thrown.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already corresponding log.error statement before those exceptions are thrown. No need to double log IMHO?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, sg.

mjsax added 5 commits August 5, 2020 14:16
 - part of KIP-572
 - replace `retries` in InternalTopicManager with infinite retires plus
   a new timeout, based on consumer config MAX_POLL_INTERVAL_MS
@mjsax mjsax force-pushed the kafka-9274-kip-572-internal branch from bf49314 to 05b0faa Compare August 5, 2020 21:30
Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mjsax mjsax merged commit 9903013 into apache:trunk Aug 6, 2020
@mjsax mjsax deleted the kafka-9274-kip-572-internal branch August 7, 2020 03:48
// need to add mandatory configs; otherwise `QuietConsumerConfig` throws
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
retryTimeoutMs = new QuietConsumerConfig(consumerConfig).getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) / 2L;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mjsax , am I reading this PR correctly? Do we now only allow a single member to retry topic creation/validation for up to half of the poll interval, after which we shut down the entire application? That sounds like the opposite of resiliency...what if the brokers are temporarily unavailable? Before this we would just let the single thread die, and the internal topic creation/validation would be retried on the subsequent rebalance. That wasn't ideal, but given the upcoming work to allow reviving/recreating a death thread, that seems to be preferable to permanently ending the application?

Sorry if I'm misreading this, was just going over all the PRs in the last month or so to produce a diff+summary of the important ones, and want to make sure I actually understand all the changes we've made

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies if this was touched on in the KIP, it's been a while and the discussion thread was quite long so I may have missed something there

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the previous default was "zero retries" and thus the new default is more resilient with a 5 minute default max.poll.interval. -- But yes, we shutdown the whole app for this case now as proposed by @guozhangwang (IIRC).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today since we do not have ways to partially create tasks we'd have to create all topics to make sure all tasks are "complete" within each rebalance, if we cannot successfully create the topics within the poll.interval (i.e. we'd need to complete that rebalance with the poll.interval, and I guess halving it is to be more conservative), then killing that thread is not very useful anyways since we cannot proceed with the initializable tasks anyways.

That being said, with the upcoming work I'd agree that just shutdown the thread and allow users to optionally retry rebalance with new threads would be preferrable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cadonna @wcarlson5 to bring to your attention.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we should remove the shutdown error code in case of TimeoutException during internal topic validation before 2.7. I'll create a ticket so we don't lose track -- I think even just letting it kill the one thread is better than killing all of them

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants