Skip to content

KAFKA-9999: Make internal topic creation error non-fatal#8677

Closed
abbccdda wants to merge 4 commits intoapache:trunkfrom
abbccdda:KAFKA-9999
Closed

KAFKA-9999: Make internal topic creation error non-fatal#8677
abbccdda wants to merge 4 commits intoapache:trunkfrom
abbccdda:KAFKA-9999

Conversation

@abbccdda
Copy link
Copy Markdown

As of today, the internal topic creation failure could shut down a stream thread. Instead of hard failure, we could take a more conservative approach by triggering another rebalance and retry with the topic creation to avoid a thread death due to unavailability of the broker.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

"You can increase admin client config `retries` to be resilient against this error.", retries);
log.error(timeoutAndRetryError);
throw new StreamsException(timeoutAndRetryError);
throw new TaskMigratedException("Time out for creating internal topics", new TimeoutException(timeoutAndRetryError));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 441 we added a nextScheduledRebalance field to the assignment in order to signal when a followup rebalance is needed. Can we leverage that here as well so we don't have to go through the whole ordeal of onPartitionsLost?
Check out the call to fetchEndOffsetsin StreamsPartitionAssignor#populateClientStatesMap where we schedule a followup rebalance on the leader if the listOffsets request fails. I think we can reuse the same logic/code path and keep track of a general flag like adminClientRequestSuccessful so the assignor can still finish the assignment

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg, let me check that.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the codepath, and have some doubts on whether we could be able to complete the assignment because if the internal topic creations failed, we don't have valid sub partitions to be assigned correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.. what if we just call on the FallbackPriorTaskAssignor like we do when listOffsets fails, and then remove any tasks that involve internal topics we failed to create? And schedule the followup rebalance for "immediately"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not sure we necessarily even need to call on the FallbackPriorTaskAssignor, we just need to schedule the followup and remove the affected tasks from the assignment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was tempted to say we should just return an empty assignment, which would prompt everyone to rejoin again immediately, but I think the FallbackPriorTaskAssignor is a preferable alternative.

IIUC, we should be able to rely on the precondition that any previously assigned tasks we correctly initialized before they were assigned initially, right? So we know they are all safe to keep working (if possible) while we wait a suitable backoff period before trying to create these topics again.

I could see the idea to instead just remove any tasks we couldn't initialize instead of calling the FallbackPriorTaskAssignor, but if I'm reading this code right, we might just have failed to verify that the topics exist, not only fail to create topics we know didn't exist. So, we might actually remove tasks that were previously assigned if we do this.

It's not clear which strategy is better, since it would depend on the exact nature of the failure, but maybe at a very high level, it's better to continue processing existing work and delay starting new work than potentially to start new work but delay processing existing work.

Or we could try for the "best of both worlds", where we assign the union of all previously assigned tasks and any new tasks we were able to set up.

Finally, even if we re-assign previously assigned tasks, I'm not sure if we actually need/want to use the FallbackPriorTaskAssignor in particular. There doesn't seem to be anything wrong with just computing a new assignment for a subset of the tasks while we also schedule a re-attempt to set up the rest of the tasks after a back-off period.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assign the union of all previously assigned tasks and any new tasks we were able to set up

I was worrying about the case where some internal topics got deleted, and we would cause trouble for the previous owner of the corresponding task. But I suppose if the topic was deleted randomly in the middle of processing, the thread would die anyway, so the odds of the original owner not dying on internal topic deletion is pretty low.

With this strategy, we would at least contain the blast radius to just that current owner since once it dies, that task has no previous owner and would not be assigned. So I'm pretty strongly in favor of this idea. Arguably we could just incorporate this into the existing FallbackPriorTaskAssignor since it will just reduce to the current one in the case all topics have been validated. I'm not sure if that would be more work or less, though.

@abbccdda
Copy link
Copy Markdown
Author

Have updated the PR with my current understanding of the proposal @vvcephei @ableegoldman , the part that needs more discussion is on the case for prepareRepartitionTopics which could also fail to create any internal topic as well. Should we continue in that case?

@vvcephei
Copy link
Copy Markdown
Contributor

Hey @abbccdda , I've just recently been in some investigation of these timeouts as part of #8738 , and we're also planning to implement KIP-572 as a general solution to all timeouts that can happen in Streams.

Given the complexities that came to light in the discussion above, and all the edge cases that can happen, I'm wondering if we should really try to be this smart in the assignor.

What do you think about just leaving the current behavior as-is, and then in the future, changing it to throw the TimeoutException out of assign() so that the KIP-572 logic can catch it and gracefully retry from the outer loop? The downside of that approach is that all the instances would be blocked for the whole poll interval, and then they would have to repeat their attempt to join the group.

I'm just concerned that it doesn't sound from the above like we're very sure that any specific choice of tasks is going to be the right one, and if we leave some tasks out of the assignment, it's going to be harder to debug than if we just let the thread crash (for now) or recover holistically (after KIP-572).

WDYT?

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented Jun 3, 2020

@vvcephei Sounds good, let's wait for KIP-572 PR.

@abbccdda abbccdda closed this Nov 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants