Skip to content

KAFKA-10150: task state transitions/management and committing cleanup#8856

Merged
guozhangwang merged 14 commits intoapache:trunkfrom
ableegoldman:10150-allow-close-CREATED-tasks
Jun 16, 2020
Merged

KAFKA-10150: task state transitions/management and committing cleanup#8856
guozhangwang merged 14 commits intoapache:trunkfrom
ableegoldman:10150-allow-close-CREATED-tasks

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Jun 12, 2020

  1. KAFKA-10150:
    • always transition to SUSPENDED during suspend, no matter the current state
    • only call prepareCommit before closing if task.commitNeeded is true
  2. Don't commit any consumed offsets during handleAssignment -- revoked active tasks (and any others that need committing) will be committed during handleRevocation so we only need to worry about cleaning them up in handleAssignment
  3. KAFKA-10152: when recycling a task we should always commit consumed offsets (if any), but don't need to write the checkpoint (since changelog offsets are preserved across task transitions)
  4. Make sure we close all tasks during shutdown, even if an exception is thrown during commit
  5. In-flight records were skipped when we saved the checkpointableOffsets before flushing in prepareCommit (This was the root cause of KAFKA-10151 -- we now don't save the offsets at all during prepareCommit and just write the current offsets during postCommit

Must be cherry-picked to 2.6

@ableegoldman
Copy link
Copy Markdown
Member Author

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

@ableegoldman
Copy link
Copy Markdown
Member Author

Also failed with unrelated (extremely) flaky
ReassignPartitionsUnitTest.testModifyBrokerThrottles
and
SslSelectorTest.testCloseOldestConnection (tickets created)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few comments, but otherwise looks great to me.

@ableegoldman lmk if you think the test failures are transient, I can re-trigger it.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @ableegoldman!

Comment thread streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java Outdated
@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

1 similar comment
@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @ableegoldman ! I only had time for a partial review. Here's what I thought so far.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that standbys should never really be in RESTORING state, but it still doesn't seem like it's philosophically any more illegal to suspend from RESTORING than it is from RUNNING. I'd vote to legalize RESTORING here. It does seem like a useful sanity check for CLOSED to be illegal, though.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 100%, but at some point in the past we started checking for RESTORING and throwing IllegalStateException all over StandbyTask. I wanted to keep the changes here to a minimum and figured we should at least be consistent with the current pattern elsewhere

Comment thread streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java Outdated
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a test I added a little while back in response to a bugfix, but it no longer makes sense in the current context (in fact it's currently not really testing anything at all, since the original point was to make sure the changelog reader partitions were cleaned up but that's not even the responsibility of the TaskManager anymore)

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 12, 2020

Retest this please.

2 similar comments
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 12, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 12, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 12, 2020

Retest this please.

1 similar comment
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 12, 2020

Retest this please.

@ableegoldman ableegoldman force-pushed the 10150-allow-close-CREATED-tasks branch 2 times, most recently from 7b86055 to 080a106 Compare June 12, 2020 23:28
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 13, 2020

Retest this please.

1 similar comment
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 13, 2020

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 13, 2020

Mr.J. does not like me recently... Will retry later.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 13, 2020

Haha. After complaining Mr.J. changes his mind :D

@ableegoldman
Copy link
Copy Markdown
Member Author

The sad thing is it's doomed to fail (connect:runtime:compileTestJava is broken)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, we did not allow idempotent state transitions in the state machine itself, but handle it caller side. -- It seem inconsistent to allow SUSPENDED -> SUSPEND but not CREATE -> CREATED etc.

I would recommend to keep the current pattern and avoid calling transiteState() if the task is already in the target state. -- I would also be happy to change it, but for this case, we should change it for all cases. However, this would enlarge the scope of this PR and I think it better not to do it in this PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I was just thinking we should make the idempotency explicit for each state by allowing/disallowing the transition, but I agree we can do that in a followup PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the < arrow? We can still transit from RESTORING to SUSPENDED.

Super-nit: +---->| Suspended -> +---> | Suspended

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff makes it hard to tell, but I "merged" the path to SUSPENDED from CREATED and RESTORING. I find it a bit easier to follow when all the arrows are unidirectional

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we hit an exception in handleRevocation why would we continue here? Are we still in a "clean enough" state to actually continue?

Below we call completeTaskCloseClean(task) what seem incorrect for this case as it might close clean task even if we did not successfully commit before.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we hit an exception in handleRevocation on some task then we would skip out on suspending the rest of the tasks, ie the set of not-suspended tasks does not contain the task that threw (of course if one task threw an exception then its likely others will too, but not guaranteed).

But maybe it's cleaner to catch exceptions during handleRevocation and at least make sure every task gets suspended? I'll try that

On a related note, if we always have to commit before closing (or at least attempt to), should we just remove the writeCheckpointIfNeeded call from closeClean? Seems like the pre/postCommit should be responsible for whether to checkpoint, not close. In this case, it's completely fine to attempt a clean close of a dirty task, as the closeClean method will just maybe throw in which case we can close dirty. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checkpointing, we can check if clean && commitNeeded & checkpoint != null and then throw an exception on closeClean (which would result in calling closeDirty)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-read the current code structure and got some questions:

  1. we collect checkpoint from prepareCommit and check if it is not null in postCommit, but the actual checkpoint value itself is always collectable post the commit, and hence what's only required to that we need to know if we need to write a checkpoint file or not. Previously this needs to be decided since we may transit the state in between but now from the source code it seems to me that we would only call prepare/post before suspend / close ever, so this is no longer required actually, i.e. we can decide whether we need to checkpoint and then collect the checkpoint map and write the file if needed in a single call. Is that right?
  1. I think I agree with you that it is cleaner to make sure in handleRevocation, we still transit those revoked partition's corresponding tasks to suspended even if some of their commit call failed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think you're right, we don't need to keep track of the current checkpoint offsets at all and can just write the current checkpointableOffsets in postCommit
  2. done

Copy link
Copy Markdown
Member

@mjsax mjsax Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postCommit only writes a checkpoint for non-eos. Thus, we still need to write a checkpoint in close() for the eos-case (if just blindly for all cases as we do atm).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postCommit will always write the checkpoint if the task is in SUSPENDED, which it should always be before being closed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do the try-catch as outer-layer? In an exception occurs, we should stop looping through the tasks to call postCommit() -- is this intended? If yes, why?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if commit throws an exception, then we shouldn't call postCommit right?

Or are you saying if commit succeeds but postCommit throws for one task, we should still loop through and try to postCommit all the other tasks?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think if the actual consumer.commit call failed, then we should not trigger postCommit for any one.

As for postCommit, I think it should never fail (we swallow the IO exception happened, because for non-EOS it is just fine, for EOS we would bootstrap from scratch).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the later. And I agree that if commit fails, we should not call postCommit().

For failure in postCommit: we make assumptions about the current code what seems dangerous (ie, not future prove)? -- IMHO, if postCommit fails, we need to close the corresponding task dirty and either recreate it, or rebalance, but we should also continue to call postCommit() for all other tasks?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then I think it makes sense to always attempt to write the checkpoint/call postCommit for a task that was successfully committed, regardless of whether something went wrong during postCommit with a different task

And I agree, we should not make assumptions about the current code not throwing, unless it's explicitly in the contract of the method that it will never throw (which is not the case for postCommit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, in that case the nested try-catch would be necessary.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass on the PR, left some meta comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-read the current code structure and got some questions:

  1. we collect checkpoint from prepareCommit and check if it is not null in postCommit, but the actual checkpoint value itself is always collectable post the commit, and hence what's only required to that we need to know if we need to write a checkpoint file or not. Previously this needs to be decided since we may transit the state in between but now from the source code it seems to me that we would only call prepare/post before suspend / close ever, so this is no longer required actually, i.e. we can decide whether we need to checkpoint and then collect the checkpoint map and write the file if needed in a single call. Is that right?
  1. I think I agree with you that it is cleaner to make sure in handleRevocation, we still transit those revoked partition's corresponding tasks to suspended even if some of their commit call failed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think if the actual consumer.commit call failed, then we should not trigger postCommit for any one.

As for postCommit, I think it should never fail (we swallow the IO exception happened, because for non-EOS it is just fine, for EOS we would bootstrap from scratch).

Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed all current comments, ready for testing & final review

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think you're right, we don't need to keep track of the current checkpoint offsets at all and can just write the current checkpointableOffsets in postCommit
  2. done

@ableegoldman ableegoldman force-pushed the 10150-allow-close-CREATED-tasks branch from f6cbdad to 292cfbd Compare June 16, 2020 00:06
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 16, 2020

Retest this please.

@guozhangwang
Copy link
Copy Markdown
Contributor

test this

1 similar comment
@guozhangwang
Copy link
Copy Markdown
Contributor

test this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need to make this call -- in eager rebalancing, we suspend a task when we get a partition revoked. For this case, we "forget" the current offset within the consumer and thus need to clear the partition grouper. Otherwise, we might read the data a second time, if the partition is reassigned (what would violate EOS).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So we should only clear it here, and not in close

Just curious, why do we "forget" the current offset? I mean, haven't we just committed the current offset before suspending (and if that failed we would close all tasks right away). Maybe I'm misunderstanding what you mean

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer tracks offset internal, however, we buffer data in our internal queue. Thus, the offset tracked by the consumer, might be larger than the offset we commit (we take the offset we commit not from the consumer, but it's based on the records we did take out of the queue and processed).

In eager rebalancing, the consumer clears its internal state if a partition in revoked (and we only suspend the task), including the tracked offsets. If the partition in re-assigned, the consumer fetches the last committed offset to start fetching. Thus, if we don't clear the queue, we might fetch same data that is already in the queue a second time.

Does this make sense?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's a good catch. Makes sense to me.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation. I'll move it back to postCommit with a note

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% certain that the Consumer does clear its internal buffer on revocation. At least, I couldn't find it in the code, but maybe I'm looking in the wrong place.

Not arguing we shouldn't clear the partition group here, was just wondering about this for my own sake. Hm

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% familiar with the consumer code, but in SubscriptionState#assignFromSubscribed new TopicPartitionState are created with position = null.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that seems right. Thanks!

Copy link
Copy Markdown
Member

@mjsax mjsax Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about punctuation, should we actually call commitOffsetsOrTransaction() unconditionally (ie, not consider if consumedOffsetsAndMetadataPerTask is empty or not?

We can still move the check inside consumedOffsetsAndMetadataPerTask, but for EOS there might pending writes from punctuation that we still need to commit?

This would apply to all calls of commitOffsetsOrTransaction ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. So in the punctuation case -- where commitNeeded is true but consumedOffsets is empty -- we still need to call commitOffsetsOrTransaction (and postCommit) because the punctuation may for example write to a state store and generate changelog records. So we would need to commit that transaction, and also write the checkpoint file.
Makes sense

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committable offsets here should contain consumed offsets, and punctuation itself should never update those consumed offsets right?

I think we can skip the call if consumedOffsetsAndMetadataPerTask is empty.

Copy link
Copy Markdown
Member

@mjsax mjsax Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committable offsets here should contain consumed offsets, and punctuation itself should never update those consumed offsets right

Yes.

I think we can skip the call if consumedOffsetsAndMetadataPerTask is empty.

For non-eos, yes, because for non-eos commitOffsetsOrTransaction() would only commit offsets via the consumer (this can be skipped if empty). However, for eos (alpha and beta), we might have a pending transaction that we need to commit on the producer, too.

@ableegoldman ableegoldman force-pushed the 10150-allow-close-CREATED-tasks branch from 78274fe to 3ff79ea Compare June 16, 2020 20:00
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 16, 2020

Retest this please.

1 similar comment
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jun 16, 2020

Retest this please.

if (task.commitNeeded()) {
tasksToCommit.add(task);
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
if (task.isActive()) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @mjsax pointed out, we should still commit even if there are no consumed offsets. However, we should not commit the offsets/transaction if there are no active tasks that need committing

@ableegoldman
Copy link
Copy Markdown
Member Author

Failures were all due to known-flaky/completely broken ReassignPartitionsUnitTest.testModifyBrokerThrottles (PR for that test is still waiting to be merged -- https://issues.apache.org/jira/browse/KAFKA-10147)

@guozhangwang guozhangwang merged commit 2239004 into apache:trunk Jun 16, 2020
guozhangwang pushed a commit that referenced this pull request Jun 16, 2020
…#8856)

* KAFKA-10150: always transition to SUSPENDED during suspend, no matter the current state only call prepareCommit before closing if task.commitNeeded is true

* Don't commit any consumed offsets during handleAssignment -- revoked active tasks (and any others that need committing) will be committed during handleRevocation so we only need to worry about cleaning them up in handleAssignment

* KAFKA-10152: when recycling a task we should always commit consumed offsets (if any), but don't need to write the checkpoint (since changelog offsets are preserved across task transitions)

* Make sure we close all tasks during shutdown, even if an exception is thrown during commit

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.6.

Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Jun 17, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-10150: task state transitions/management and committing cleanup (apache#8856)
  KAFKA-10169: Error message when transit to Aborting / AbortableError / FatalError (apache#8880)
  KAFKA-9974: Fix flaky test by removing unneeded asserts (apache#8646)
  MINOR: Documentation for KIP-585 (apache#8839)
@ableegoldman ableegoldman deleted the 10150-allow-close-CREATED-tasks branch June 26, 2020 22:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants