Skip to content

KAFKA-7672 : force write checkpoint during StreamTask #suspend#6115

Merged
guozhangwang merged 12 commits intoapache:trunkfrom
abbccdda:bug_fix
Feb 23, 2019
Merged

KAFKA-7672 : force write checkpoint during StreamTask #suspend#6115
guozhangwang merged 12 commits intoapache:trunkfrom
abbccdda:bug_fix

Conversation

@abbccdda
Copy link
Copy Markdown

This fix is aiming for #2 issue pointed out within https://issues.apache.org/jira/browse/KAFKA-7672
In the current setup, we do offset checkpoint file write when EOS is turned on during #suspend, which introduces the potential race condition during StateManager #closeSuspend call. To mitigate the problem, we attempt to always write checkpoint file in #suspend call.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abbccdda abbccdda force-pushed the bug_fix branch 6 times, most recently from df7ddbe to 61824db Compare January 14, 2019 04:10
@mjsax mjsax added the streams label Jan 14, 2019
@abbccdda abbccdda force-pushed the bug_fix branch 2 times, most recently from 85648ae to d76d00d Compare January 18, 2019 02:01
@abbccdda
Copy link
Copy Markdown
Author

@mjsax @guozhangwang Could you take another look to see if this makes sense?

@abbccdda abbccdda force-pushed the bug_fix branch 6 times, most recently from 342d6e0 to 8dbd301 Compare January 22, 2019 17:57
@abbccdda
Copy link
Copy Markdown
Author

@guozhangwang @mjsax Maybe another look?

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented Feb 2, 2019

@guozhangwang @mjsax Take a look when you got time?

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented Feb 7, 2019

@mjsax Mind take another look?

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change itself LGTM.

I would recommend to update the comment a little bit.

Call for second review @guozhangwang @bbejeck @vvcephei @ableegoldman

Can we add a test that exposes the issue, ie, force the race condition? (Could maybe also be done as a follow up) -- this is a critical bug fix and we should get it into 2.2 (ie, need to be merged by 2/15).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which check? I don't see any if clause.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same. Did you mean that the other change on this PR eliminates the chance for a double checkpoint file writes?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the check is just pointing to eosEnabled. Let me update the comment.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 12, 2019

@abbccdda One more thing: can we also add

completedRestorers.clear();

to StoreChangelogReader#reset() as pointed out on the ticket (and add a test if possible).

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 12, 2019

Another thought that just crossed my mind while digging into this further: Atm, there is a race condition between writing and reading the checkpoint file. Wouldn't it be simpler to avoid this race condition by moving

        // load the checkpoint information
        checkpointableOffsets.putAll(checkpoint.read());
        if (eosEnabled) {
            // delete the checkpoint file after finish loading its stored offsets
            checkpoint.delete();
            checkpoint = null;
        }

from ProcessorStateManager constructor to ProcessorStateManager#register() ?

This would simplify the logic IMHO.

@vvcephei
Copy link
Copy Markdown
Contributor

Actually, @mjsax , after reviewing the ticket, I think 6113 (#6113) is the actual bugfix that must go in. So I don't think that @abbccdda needs to add the clear in this PR.

If we can merge 6115 as well, which contains a significant performance improvement for restore in some cases, then it would be ideal.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 12, 2019

If we can merge 6115 as well, which contains a significant performance improvement for restore in some cases, then it would be ideal.

#6115 includes the completedRestorers.clear() call so I think we just need to make sure both PRs get in.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @abbccdda ,

Thanks for the PR.

I had one comment: it seems like we should update more of the state manager APIs to indicate that commit is no longer a guaranteed part of close. Otherwise, we could easily introduce a bug later, calling the method with acked offsets, not realizing that they will actually just be ignored.

Thanks,
-John

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to ignore the ackedOffsets argument here, it seems like we should revisit the interface.

There are multiple paths that lead to this method, and it's not analytically obvious why it's ok to just ignore the argument and skip checkpointing here.

If we want to move the checkpoint up in the lifecycle to suspend, then it seems like we should do so holistically and remove the checkpointable offsets from the paramters of close. What do you think?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei @bbejeck I checked the callers of close(ackedOffsets) function,

  1. Abstract task is ok since our fix is to make sure state manager could checkpoint when suspending
  2. GlobalStateUpdateTask is also fine since the global state manager (using GlobalStateManagerImpl) is overriding this function who will do offset checkpoint.

So I think we should be safe to ignore this in the base class, but keep the acked offset parameter in order for subclass to checkpoint for now, which minimizing the risk of this PR. Thank you!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What "subclasses" are you referring to? Both classes you mention are internal and not extended. I don't see any risk that we need to minimize?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean GlobalStateManagerImpl implements the close(offsets) @mjsax

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could wel update the code in GlobalStateUpdateTask to:

    public void close() throws IOException {
        stateMgr.checkpoint(offsets);
        stateMgr.close();
    }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my point is that stateMgr in GlobalStateUpdateTask is taking in GlobalStateManagerImpl whose close(offsets) will do the checkpoint operation. So I guess we don't need to call checkpoint explicitly here right? @mjsax

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax @abbccdda @bbejeck I left a comment below for this issue. Please take a look.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I've left some comments regarding un-used method parameters as a result of this change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same. Did you mean that the other change on this PR eliminates the chance for a double checkpoint file writes?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One note, with the removal of this line, the ackedOffsets is no longer used at all in this method. However, it's part of the StateManager#close interface, but it's not safe to remove as GlobalStateManager also implements the StateManager interface. Which to me, brings up two questions

  1. Do we need to apply the same approach for global state stores?
  2. Should we refactor the StateManager interface to have a no-args close() method?

Given the time constraints, I don't think should hold up this PR but it's something that IMHO we shouldn't let slip.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just laying out the context here (admittedly, this piece of logic is a bit hard to understand due to messy code structures, and we have a TODO task to clean up this tech debt soon):

StreamTask:

  1. With EOS turned off, today we are actually double-checkpointing necessarily, once in suspend, and once in close. In fact, only one checkpoint is needed. This is known issue but did not incur any correctness issue, just unnecessary overhead.

  2. With EOS turned off, we do not checkpoint on suspend, and only on close.

StandbyTask:

We always write the checkpoint in flushAndCheckpointState, which is called in both commit and suspend. Note that although we pass empty map into the checkpoint call, it is okay since it will be updated with the committed offsets internally. However, in fact in commit it is okay to just pass in null to NOT checkpoint at all, since there should be no offset change between the latest flushAndCheckpointState and the close call during normal processing (i.e. we are also duplicate-checkpointing here).

GlobalUpdateTask:

We pass in offsets in both stateConsumer.pollAndUpdate() and stateConsumer.close() but again this is the same duplicate-checkpointing issue as in StreamsTask case 1), because the offsets would never changed between the previous checkpoint call during normal run, and if there is an exception in between -- e.g. you checkpointed offset 100 in the latest pollAndUpdate, and in the next pollAndUpdate call you get an exception at offset 105, and hence jump into the finally blocked to call close -- we should not checkpoint 105 in this case since the data may not be flushed to the store at all, but rather just keep the checkpoint file with 100 to maintain at least once semantics.

With all this, let's do the following:

  1. in StreamTask, the principle is that we always checkpoint on flush, and never checkpoint on close any more, regardless of EOS. And because of that:
  2. Remove final Map<TopicPartition, Long> ackedOffsets from the close call, as @bbejeck suggested. In the StreamTask caller, we can safely remove the parameter since we are now never checkpointing on close.
  3. Remove the parameter in AbstractTask#void closeStateManager(final boolean writeCheckpoint) as well and also remove the condition that writes checkpoint or not also. Since, again, no matter if we are closing cleanly or committing successfully, we would not write checkpoint files any more. And we can also remove the corresponding parameters like clean and commitSuccesfully in its ancestor call trace as well.
  4. remove the parameter in stateMgr.close(offsets); in GlobalUpdateTask#close().

WDYT?

@guozhangwang
Copy link
Copy Markdown
Contributor

Actually, about the action 0) above: since we now write checkpoint file at suspend as well when EOS is turned on, we should delete the file upon resumption we should also delete the checkpoint file as we did at the construction time:

if (eosEnabled) {
            // delete the checkpoint file after finish loading its stored offsets
            checkpoint.delete();
            checkpoint = null;
        }

so that the semantics is guaranteed: after resumption, if we get a crash, we should enforce bootstrapping from the beginning.

@guozhangwang
Copy link
Copy Markdown
Contributor

EosIntegrationTest. shouldNotViolateEosIfOneTaskFailsWithState
GlobalStateManagerImplTest. shouldCheckpointRestoredOffsetsToFile
GlobalStateManagerImplTest. shouldWriteCheckpointsOnClose
GlobalStateTaskTest. shouldCloseStateManagerWithOffsets
StreamTaskTest. shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled

Those tests failed locally.

@abbccdda
Copy link
Copy Markdown
Author

Among the tests, I think
shouldCheckpointRestoredOffsetsToFile, shouldWriteCheckpointsOnClose,shouldCloseStateManagerWithOffsets,shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled could be removed, because our change basically breaks the fundamental assumption behind them.
I will keep looking at EosIntegrationTest to figure out whether we should do sth to make it work.

@abbccdda
Copy link
Copy Markdown
Author

Thanks @guozhangwang for the bug fix! I also rebased my test removal changes on top

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 22, 2019

chechstyle error -- can you update the pr to fix it? @abbccdda

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this

Copy link
Copy Markdown
Member

@mjsax mjsax Feb 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the semantics clearer, I am wondering if we should use two nested if:

if(eosEnabled && !clean) {
  try {
    if (checkpoint != null) { ... }
  } catch(...) {...}
}

This make it clear, that it's a EOS condition (first var to be checked) and for the EOS, do something if !clean.

The checkpoint != null if just a guard against NPE and has nothing to do with the actual logic

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to include clean here? (Was this another bug?) I thought for standbys this does not matter?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, since standby tasks do not have eos anyways today.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Did not find checkpoint..."

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 23, 2019

Thanks for the follow ups @guozhangwang LGTM.

@guozhangwang guozhangwang merged commit 1f9aa01 into apache:trunk Feb 23, 2019
guozhangwang pushed a commit that referenced this pull request Feb 23, 2019
This fix is aiming for #2 issue pointed out within https://issues.apache.org/jira/browse/KAFKA-7672
In the current setup, we do offset checkpoint file write when EOS is turned on during #suspend, which introduces the potential race condition during StateManager #closeSuspend call. To mitigate the problem, we attempt to always write checkpoint file in #suspend call.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.2 as well.

@abbccdda
Copy link
Copy Markdown
Author

@guozhangwang Thanks a lot for making the fix work!

jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk: (36 commits)
  KAFKA-7962: Avoid NPE for StickyAssignor (apache#6308)
  Address flakiness of CustomQuotaCallbackTest#testCustomQuotaCallback (apache#6330)
  KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (apache#6327)
  MINOR: fix parameter naming (apache#6316)
  KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (apache#6218)
  MINOR: Refactor replica log dir fetching for improved logging (apache#6313)
  [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (apache#6227)
  MINOR: Increase produce timeout to 120 seconds (apache#6326)
  KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (apache#6293)
  MINOR: Fix line break issue in upgrade notes (apache#6320)
  KAFKA-7972: Use automatic RPC generation in SaslHandshake
  MINOR: Enable capture of full stack trace in StreamTask#process (apache#6310)
  KAFKA-7938: Fix test flakiness in DeleteConsumerGroupsTest (apache#6312)
  KAFKA-7937: Fix Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup (apache#6311)
  MINOR: Update docs to say 2.2 (apache#6315)
  KAFKA-7672 : force write checkpoint during StreamTask #suspend (apache#6115)
  KAFKA-7961; Ignore assignment for un-subscribed partitions (apache#6304)
  KAFKA-7672: Restoring tasks need to be closed upon task suspension (apache#6113)
  KAFKA-7864; validate partitions are 0-based (apache#6246)
  KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (apache#6285)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…he#6115)

This fix is aiming for #2 issue pointed out within https://issues.apache.org/jira/browse/KAFKA-7672
In the current setup, we do offset checkpoint file write when EOS is turned on during #suspend, which introduces the potential race condition during StateManager #closeSuspend call. To mitigate the problem, we attempt to always write checkpoint file in #suspend call.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants