Skip to content

KAFKA-9274: handle TimeoutException on task reset#10000

Merged
mjsax merged 5 commits intoapache:trunkfrom
mjsax:kafka-9274-kip-572-reset-offsets
Feb 6, 2021
Merged

KAFKA-9274: handle TimeoutException on task reset#10000
mjsax merged 5 commits intoapache:trunkfrom
mjsax:kafka-9274-kip-572-reset-offsets

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jan 29, 2021

  • part of KIP-572

This changes move the offset reset for the internal "main consumer" when we revive a corrupted task, from the "task cleanup" code path, to the "task init" code path. For this case, we have already logic in place to handle TimeoutException that might be thrown by consumer#committed() method call.

@mjsax mjsax added streams kip Requires or implements a KIP labels Jan 29, 2021
@chia7712
Copy link
Copy Markdown
Member

congratulations on PR 10,000 :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering to maybe merge this method into initMetadata() but it might convolute different code path, and we should execute this method rarely anyway so I don't think we should have concerns about calling mainConsumer.committed twice for rare cases.

Let me know what you think.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm...I'm not necessarily that concerned about calling mainConsumer.committed twice in rare cases (although maybe that would not be so good, since those rare cases happen to be those in which this is probably more likely to time out, right?)
But personally, just coming into this code from the outside, it's super confusing to have two different methods for initializing the offsets. It seems more convoluted that way, to me. Also maybe I am missing some context here but why do we call initOffsetsIfNeeded from initializeIfNeeded rather than from completeRestoration in the first place? We don't need to initialize main consumer offsets until it transitions to running

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if is not strictly required, however, it allows us to just pass null as offsetResetter in tests, so might be worth it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just pass in a no-op lambda instead? I'd rather avoid special handling for null input that isn't supposed to be null, just so we can use null in the tests (which are therefore not realistic tests since it should never be null, no?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

@mjsax mjsax force-pushed the kafka-9274-kip-572-reset-offsets branch from 5a457f0 to ce7eeb4 Compare February 5, 2021 04:12
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 5, 2021

Updated this PR.


case RESTORING:
initializeMetadata();
resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, thanks, this seems much cleaner to me

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think this LG -- but I'll be happy to get this soaking asap

@mjsax mjsax merged commit 0bc394c into apache:trunk Feb 6, 2021
abbccdda pushed a commit to abbccdda/kafka that referenced this pull request Feb 6, 2021
@mjsax mjsax deleted the kafka-9274-kip-572-reset-offsets branch February 23, 2021 03:21
abbccdda pushed a commit to abbccdda/kafka that referenced this pull request Mar 22, 2021
…he#10000)

This PR was removed by accident in trunk and 2.8, bringing it back.
abbccdda pushed a commit that referenced this pull request Mar 22, 2021
…) (#10372)

This PR was removed by accident in trunk and 2.8, bringing it back.

Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>
lct45 pushed a commit to confluentinc/kafka that referenced this pull request Mar 22, 2021
…he#10000) (apache#10372)

This PR was removed by accident in trunk and 2.8, bringing it back.

Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>
abbccdda pushed a commit to abbccdda/kafka that referenced this pull request Mar 23, 2021
…he#10000)

This PR was removed by accident in trunk and 2.8, bringing it back.
abbccdda pushed a commit that referenced this pull request Mar 23, 2021
…) (#10374)

This PR was removed by accident in trunk and 2.8, bringing it back.

Co-authored-by: Matthias J. Sax <matthias@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>
Terrdi pushed a commit to Terrdi/kafka that referenced this pull request Apr 1, 2021
…he#10000) (apache#10372)

This PR was removed by accident in trunk and 2.8, bringing it back.

Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants