Skip to content

KAFKA-12497: Skip periodic offset commits for failed source tasks#10528

Merged
C0urante merged 4 commits intoapache:trunkfrom
C0urante:kafka-12497
Oct 13, 2022
Merged

KAFKA-12497: Skip periodic offset commits for failed source tasks#10528
C0urante merged 4 commits intoapache:trunkfrom
C0urante:kafka-12497

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

Jira

This change serves two purposes:

  1. Eliminate unnecessary log messages for offset commit of tasks that don't need to perform offset commits (e.g., a task that has failed and for which all data has been flushed and committed)
  2. Stop blocking the offset commit thread unnecessarily for flushes that will never succeed because the task's producer has failed to send a record in the current batch with a non-retriable error

Existing unit tests for the OffsetStorageWriter are tweaked to verify the small change made to it. Several new unit tests are added for the WorkerSourceTask that cover various cases where offset commits should not be attempted, and some existing tests are modified to cover cases where offset commits either should or should not be attempted.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

@ncliang @gharris1727 @kpatelatwork @ddasarathan could one or two of you take a look at this when you have time?

@kpatelatwork
Copy link
Copy Markdown
Contributor

kpatelatwork commented Apr 15, 2021

@C0urante overall looks good and very good job on the tests but as I am new to the code, I tried my best to review but I recommend getting a LGTM from one more reviewer also who knows this code better than me to ensure we don't miss something obvious?

Copy link
Copy Markdown
Contributor

@ncliang ncliang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante . I also agree with @kpatelatwork that it is better to encapsulate the should offset commit logic in WorkerSourceTask . Otherwise I think generally LGTM.

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Oct 7, 2021

@rhauch @tombentley could either of you take a look? It'd be nice to get this merged in time for the upcoming 3.1 release; I know I've seen plenty of people led astray by continued offset commit messages for failed tasks and it'd be great if we could improve their experience.

Copy link
Copy Markdown
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@C0urante I've taken a very quick initial look. Given the JIRA is explicitly about logging it would be nice if there were assertions in the tests that directly checked for logging. Could we use a LogCaptureAppender to make such assertions?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method used in the if is called shouldCommitOffsets, not areThereOffsetsToBeCommitted, so is this message accurate, or perhaps the method name is slightly misleading? Maybe amending the message to "...there are no offsets that should be committed"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the message rewording is fair. Renaming to areThereOffsetsToBeCommitted is a little verbose and although it does technically capture the case where the task's producer has failed to send a record with a non-retriable error, I think it may still be a little misleading.

Comment on lines 219 to 221
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't assertEquals(shouldFlush, writer.willFlush()) and assertEquals(shouldFlush, writer.beginFlush()) be clearer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. Thought it would make messages clearer for failed assertions but the difference isn't worth the unreadable code.

@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks @tombentley. Interesting point about the log capture appender but it's a little tricky with the current setup. I've hacked together a prototype that tries to provide realistic coverage by simulating genuine offset commits with calls to SourceTaskOffsetCommitter::commit without constructing a full-on integration test; LMK what you think.

If the test cases themselves look acceptable, I suspect we may want to either develop a Connect-specific log capture appender (similar to how Connect has its own embedded testing framework that was largely copied from Streams) or move the Streams-specific LogCaptureAppender into a more general-use location, possibly in the clients module. The current prototype's dependency on Streams' testing artifacts isn't desirable as it makes partial builds (which I personally do quite frequently while testing Connect-specific changes) less effective.

@tombentley
Copy link
Copy Markdown
Member

I suspect we may want to either develop a Connect-specific log capture appender (similar to how Connect has its own embedded testing framework that was largely copied from Streams) or move the Streams-specific LogCaptureAppender into a more general-use location, possibly in the clients module. The current prototype's dependency on Streams' testing artifacts isn't desirable as it makes partial builds (which I personally do quite frequently while testing Connect-specific changes) less effective.

Agreed about he undesirability of the dependency, and if possible a common LogCaptureAppender would be better than have several copies floating about.

Copy link
Copy Markdown
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@C0urante the extra assertions in the test look reasonable to me.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use multi-resource try-with-resources?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "timed out" part of the error is a little misleading now.

@C0urante
Copy link
Copy Markdown
Contributor Author

I've rebased onto the latest trunk. I think with the latest changes (especially these logging improvements) most of the changes in this PR were made redundant. The only remaining room for improvement IMO is skipping log messages for failed tasks; the other issues (squatting on the source task offset commit thread too long for failed messages to be acknowledged, and misleading users with messages about flushing 0 records) have already been addressed.

I've force-pushed a single commit that brings this PR up to date with the latest trunk; going to push an additional commit later this week that addresses the review comments that have been left on it.

@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks for the review @tombentley, and apologies for the delay. I've addressed your comments and moved the LogCaptureAppender class to the clients module where it can be used by both Connect and Streams.

I hope this looks alright given the changes in #11323 that drastically alter the logic touched on here.

We're probably past the point for 3.1 so this isn't particularly urgent anymore but I believe it'd still be useful to have at some point if you can spare the time. No rush, though.

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Nov 17, 2021

CC @vvcephei -- this touches on the LogCaptureAppender currently used by Streams; you may want to take a look if you have the time.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante !

I just had some small comments. It would be nice to clean those up before merging, but otherwise it LGTM.

Comment thread build.gradle Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you've moved the util, do you still need this dependency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, do not need this!

Comment thread build.gradle Outdated
Comment thread checkstyle/import-control.xml Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We had an import restriction on pulling in a Streams dependency?

Actually, it looks like the util isn't in this package anymore at all, so this is probably not needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good call, thanks for catching this.

I could be wrong but I think the import control logic requires everything to be explicitly allowed; don't think there's an explicit restriction on the Streams package name.

Copy link
Copy Markdown
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante. I guess we might be able to refactor kafka.utils.LogCaptureAppender to use the moved LogCaptureAppender, but if so we can do that in another PR.

@mimaison
Copy link
Copy Markdown
Member

I see 2 committers approved the changes but this was not merged.
@C0urante can you rebase to fix the conflicts ? Thanks

@C0urante
Copy link
Copy Markdown
Contributor Author

This will create other conflicts with #11780. Would it be possible to prioritize that PR and then, once it's merged, fix the conflicts on this one and merge it?

@mimaison
Copy link
Copy Markdown
Member

Thanks, that makes sense. I've started reviewing #11780

@C0urante C0urante changed the title KAFKA-12497: Skip unnecessary source task offset commits KAFKA-12497: Skip periodic offset commits for failed source tasks Jul 25, 2022
@C0urante
Copy link
Copy Markdown
Contributor Author

@mimaison I've finally gotten around to rebasing this one, mind taking another look?

@mimaison
Copy link
Copy Markdown
Member

@tombentley @vvcephei Can you take another look?

@C0urante
Copy link
Copy Markdown
Contributor Author

@mimaison @vvcephei @tombentley I'd like to merge this in order to unblock #12434, which adds a second use case for the LogCaptureAppender in the Connect unit tests. Would it be possible to give this another pass sometime next week? Thanks!

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Aug 1, 2022

It's also been proposed in #12434 that we eliminate the Java LogCaptureAppender class entirely and replace it with the Scala-based version that's used in core.

Any thoughts?

@C0urante
Copy link
Copy Markdown
Contributor Author

Test failures are unrelated; going to merge this as-is. @vvcephei @tombentley Please let me know if you'd like to follow up on this at any point.

@C0urante C0urante merged commit 18e60cb into apache:trunk Oct 13, 2022
@C0urante C0urante deleted the kafka-12497 branch October 13, 2022 14:15
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…ache#10528)

Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Kalpesh Patel <kpatel@confluent.io>, John Roesler <vvcephei@apache.org>, Tom Bentley <tbentley@redhat.com>
rutvijmehta-harness pushed a commit to rutvijmehta-harness/kafka that referenced this pull request Feb 9, 2024
…ache#10528)

Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Kalpesh Patel <kpatel@confluent.io>, John Roesler <vvcephei@apache.org>, Tom Bentley <tbentley@redhat.com>
rutvijmehta-harness added a commit to rutvijmehta-harness/kafka that referenced this pull request Feb 9, 2024
…ache#10528) (#55)

Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Kalpesh Patel <kpatel@confluent.io>, John Roesler <vvcephei@apache.org>, Tom Bentley <tbentley@redhat.com>

Co-authored-by: Chris Egerton <chrise@aiven.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants