Skip to content

KAFKA-10166: always write checkpoint before closing an (initialized) task#8926

Merged
guozhangwang merged 9 commits intoapache:trunkfrom
ableegoldman:10166-excessive-TaskCorruptedException-test
Jun 26, 2020
Merged

KAFKA-10166: always write checkpoint before closing an (initialized) task#8926
guozhangwang merged 9 commits intoapache:trunkfrom
ableegoldman:10166-excessive-TaskCorruptedException-test

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Jun 25, 2020

This should address at least some of the excessive TaskCorruptedExceptions we've been seeing lately. Basically, at the moment we only commit tasks if commitNeeded is true -- this seems obvious by definition. But the problem is we do some essential cleanup in postCommit that should always be done before a task is closed:

  1. clear the PartitionGroup
  2. write the checkpoint

2 is actually fine to skip when commitNeeded = false with ALOS, as we will have already written a checkpoint during the last commit. But for EOS, we only write the checkpoint before a close -- so even if there is no new pending data since the last commit, we have to write the current offsets. If we don't, the task will be assumed dirty and we will run into our friend the TaskCorruptedException during (re)initialization.

To fix this, we should just always call prepareCommit and postCommit at the TaskManager level. Within the task, it can decide whether or not to actually do something in those methods based on commitNeeded.

One subtle issue is that we still need to avoid checkpointing a task that was still in CREATED, to avoid potentially overwriting an existing checkpoint with uninitialized empty offsets. Unfortunately we always suspend a task before closing and committing, so we lose the information about whether the task as in CREATED or RUNNING/RESTORING by the time we get to the checkpoint. For this we introduce a special flag to keep track of whether a suspended task should actually be checkpointed or not

Comment on lines 248 to 249
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was another "sort-of bug": if we hit an exception in handleRevocation we wouldn't finish committing the active tasks, so commitNeeded could still be true. But of course, if we hit an exception earlier, we would have thrown it up to ConsumerCoordinator which would only save the first exception, so this didn't really do anything

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually simplify the standby task shutdown a LOT

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@ableegoldman ableegoldman changed the title KAFKA-10166: always invoke postCommit before closing a task [WIP] KAFKA-10166: always invoke postCommit before closing a task Jun 25, 2020
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a pass over the code, overall it LGTM.

While working on another PR I realized that the stateMgr.flush actually does not need to be in prepareCommit, and postCommit is sufficient; and in that case we can just optionally call postCommit upon each commit. Anyways, just a quick FYI for something that's out of this scope.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not committing these tasks, should we call their postCommit?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@guozhangwang
Copy link
Copy Markdown
Contributor

@ableegoldman please lmk when you want to trigger jenkins builds on this PR.

@ableegoldman ableegoldman changed the title [WIP] KAFKA-10166: always invoke postCommit before closing a task KAFKA-10166: always write checkpoint before closing an (initialized) task Jun 26, 2020
@abbccdda
Copy link
Copy Markdown

retest this

@abbccdda
Copy link
Copy Markdown

test this

@vvcephei
Copy link
Copy Markdown
Contributor

Test this please

@vvcephei
Copy link
Copy Markdown
Contributor

Ok to test

@vvcephei
Copy link
Copy Markdown
Contributor

Retest this please

@vvcephei
Copy link
Copy Markdown
Contributor

Oh, yeah... the magic touch ;)

@ableegoldman
Copy link
Copy Markdown
Member Author

Seems like all the Topology testDriver tests failed, but I got a green build running locally. Do they not run with ./gradlew streams:test?

@guozhangwang
Copy link
Copy Markdown
Contributor

Seems like all the Topology testDriver tests failed, but I got a green build running locally. Do they not run with ./gradlew streams:test?

They are included in streams:test. Maybe try to rebase the branch and see if there's any missing committs?

@ableegoldman ableegoldman force-pushed the 10166-excessive-TaskCorruptedException-test branch from 424f7e1 to 4125b36 Compare June 26, 2020 16:59
@vvcephei
Copy link
Copy Markdown
Contributor

Test this please

1 similar comment
@vvcephei
Copy link
Copy Markdown
Contributor

Test this please

@ableegoldman
Copy link
Copy Markdown
Member Author

Java 8 and 14 builds passed, Java 11 build failed with...zero failures?

@abbccdda
Copy link
Copy Markdown

12:23:50 
12:23:50 * What went wrong:
12:23:50 Execution failed for task ':streams:unitTest'.
12:23:50 > Process 'Gradle Test Executor 9' finished with non-zero exit value 134
12:23:50   This problem might be caused by incorrect test process configuration.
12:23:50   Please refer to the test execution section in the User Manual at https://docs.gradle.org/6.5/userguide/java_testing.html#sec:test_execution
12:23:50 ```

@guozhangwang guozhangwang merged commit 30df089 into apache:trunk Jun 26, 2020
guozhangwang pushed a commit that referenced this pull request Jun 26, 2020
…task (#8926)

This should address at least some of the excessive TaskCorruptedExceptions we've been seeing lately. Basically, at the moment we only commit tasks if commitNeeded is true -- this seems obvious by definition. But the problem is we do some essential cleanup in postCommit that should always be done before a task is closed:

* clear the PartitionGroup
* write the checkpoint

The second is actually fine to skip when commitNeeded = false with ALOS, as we will have already written a checkpoint during the last commit. But for EOS, we only write the checkpoint before a close -- so even if there is no new pending data since the last commit, we have to write the current offsets. If we don't, the task will be assumed dirty and we will run into our friend the TaskCorruptedException during (re)initialization.

To fix this, we should just always call prepareCommit and postCommit at the TaskManager level. Within the task, it can decide whether or not to actually do something in those methods based on commitNeeded.

One subtle issue is that we still need to avoid checkpointing a task that was still in CREATED, to avoid potentially overwriting an existing checkpoint with uninitialized empty offsets. Unfortunately we always suspend a task before closing and committing, so we lose the information about whether the task as in CREATED or RUNNING/RESTORING by the time we get to the checkpoint. For this we introduce a special flag to keep track of whether a suspended task should actually be checkpointed or not

Reviewers: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.6 since it is a blocker, cc @rhauch

Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Jun 27, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-10180: Fix security_config caching in system tests (apache#8917)
  KAFKA-10173: Fix suppress changelog binary schema compatibility (apache#8905)
  KAFKA-10166: always write checkpoint before closing an (initialized) task (apache#8926)
  MINOR: Rename SslTransportLayer.State."NOT_INITALIZED" enum value to "NOT_INITIALIZED"
  MINOR: Update Scala to 2.13.3 (apache#8931)
  KAFKA-9076: support consumer sync across clusters in MM 2.0 (apache#7577)
  MINOR: Remove Diamond and code code Alignment (apache#8107)
  KAFKA-10198: guard against recycling dirty state (apache#8924)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants