KAFKA-9441: remove prepareClose() to simplify task management#8833
KAFKA-9441: remove prepareClose() to simplify task management#8833mjsax merged 8 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
When we suspend, we always want to commit.
There was a problem hiding this comment.
This logic is now followed in suspendCleanAndPrepareCommit() that must be called before a task can be closed.
There was a problem hiding this comment.
This is now done via suspendAndPrepareCommit()
There was a problem hiding this comment.
The "old" suspend() was called after committing, the "new" suspend() is now called before committing!
The old suspend logic is now handled via postCommit and close
There was a problem hiding this comment.
Instead of "blindly" writing a checkpoint in postCommit(), we only do it if a checkpoint get's scheduled.
There was a problem hiding this comment.
Mainly rewrite to use switch now -- however, we return a proper non-empty checkpoint on SUSPEND now.
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the PR, left some comments.
There was a problem hiding this comment.
Should update the exception text like while scheduling checkpoint
There was a problem hiding this comment.
Should be if no commit is needed (ie, if we need to commit, we need to commit first! -- writing a checkpoint as long as a commit is needed implies we write the checkpoint too early).
There was a problem hiding this comment.
Seems applied to both RUNNING and SUSPENDED?
There was a problem hiding this comment.
For SUSPENDING, we should always write the checkpoint. Fixing.
There was a problem hiding this comment.
Do we want to throw here if the current state is CLOSED?
There was a problem hiding this comment.
My proposal is, to keep the methods idempotent.
There was a problem hiding this comment.
nit: we can throw illegal-state if the state() == RESTORING since it should never happen.
There was a problem hiding this comment.
Why not just make suspend a no-op if the task is RESTORING? That seems more in line with how we handle things elsewhere
There was a problem hiding this comment.
For StandbyTasks, we never restore. When we do the state transition, we away make two transitions directly after each other from CREATE -> RESTORING -> RUNNING -- thus, state RESTORING is an invalid state for standby tasks.
There was a problem hiding this comment.
Right, by "check for RESTORING" I meant "throw an exception if state is restoring". It seems odd to check for RESTORING during suspend but not in any other StandbyTask method. Either it can never be in RESTORING and we are completely sure of that, and shouldn't check for RESTORING, or we should always check whether it's RESTORING and not just during suspend (eg also in postCommit)
There was a problem hiding this comment.
Just to clarify, I would support doing the former, ie don't check whether it's RESTORING here at all. But we should at least be consistent
There was a problem hiding this comment.
Ah, I see. I guess we check it in almost all method though. (we just missed initializeIfNeeded and resume() -- will add it there).
There was a problem hiding this comment.
We do this check implicitly for some case already, ie:
if (RUNNING) {
} else {
throw
}
ie, only RUNNING is a valid state, and all others are invalid. Thus, it seems to be consistent if we add those checks elsewhere (or, what would be odd, exclude RESTORING from those implicit checks).
There was a problem hiding this comment.
Similar here, maybe we could leverage transitionTo to help throw the exception.
There was a problem hiding this comment.
The partitionGroup.clear and partitionGroup.close are interchangeable right now, should we just consolidate both?
There was a problem hiding this comment.
Prefer to throw different illegal state exception here than making comments
There was a problem hiding this comment.
Maybe not necessary after a second thought. However, one more question: why not making closeAndRecycleState idempotent as well?
There was a problem hiding this comment.
I see your point, but for this case, I would prefer to introduce a new state -- atm, closeAndRecycleState transits to CLOSED state what is the same as when we actually close a task -- however, the stateMgr would be closed for a proper CLOSED state, while for recycling the stateMgr is not closed -- so in general, the CLOSED state is not a "safe" state to provide idempotence. Thoughts?
There was a problem hiding this comment.
I could see we are trying to maintain the same behavior, but still why a restoring task won't need to close topology?
There was a problem hiding this comment.
Because the topology is only initialized when restoring is finished.
There was a problem hiding this comment.
Sg, should we move the log on L811 inside the if statement?
There was a problem hiding this comment.
nit: since this is a private function only called by suspend, we can modify the caller such that we only call this in RUNNING not in RESTORING, and then inside this function we do not need this check anymore.
guozhangwang
left a comment
There was a problem hiding this comment.
@mjsax thanks for the cleanup! I left some meta comment about the Task interface thinking they can further be simplified, lmk wdyt.
Will make a thorough pass on the other code once we have the meta approach nailed down.
There was a problem hiding this comment.
I'm wondering if we could merge committableOffsetsAndMetadata with prepareCommit as well, letting the latter to return the map? See my other comment aside.
There was a problem hiding this comment.
That is certainly possible. Good catch!
There was a problem hiding this comment.
It seems to me that the reason we want to have two suspends and also merging the suspendClean with prepareCommit is that for StreamTask, if state SUSPENDED we want to skip prepareCommit. I feel it is a tad cleaner to separate them further into one suspend which does not try to call prepareCommit, and rely on whether prepareCommit should do anything or not based on both state (i.e. only running/restoring/suspended need to commit) and commitNeeded flag.
With that we can convert the callers as follows:
- suspendDirty(): just call suspend(), do not call prepareCommit().
- suspendCleanAndPrepareCommit():
2.a) fromtask.closeAndRecycleState: call suspend(), and then call prepareCommit(); the second would checkcommitNeededand if it was false, we would not try to flush / commit. Hence if the task just transited from other states to suspended, thencommitNeededshould still be true.
2.b) fromtaskManagerdirectly: same as above, but for this call we always follow with acommittableOffsetsAndMetadatagetting the map of offsets, so I'm thinking we can mergeprepareCommitwithcommittableOffsetsAndMetadataas well: if the state is right andcommitNeededis set, execute the prepare committing procedure, and accumulate the offsets, otherwise returningnullindicating no offsets needed to be committed.
There was a problem hiding this comment.
We also want to swallow exceptions in closeTopology() but we can work around this.
7a98613 to
120fb8d
Compare
|
Updates this PR. |
| case CREATED: | ||
| // the task is created and not initialized, just re-write the checkpoint file | ||
| scheduleCheckpoint(emptyMap()); | ||
| case RESTORING: |
There was a problem hiding this comment.
Could we merge RESTORING and SUSPENDED?
There was a problem hiding this comment.
+1, IDEA also suggests it :)
There was a problem hiding this comment.
Sg, should we move the log on L811 inside the if statement?
| * @throws StreamsException fatal error, should close the thread | ||
| */ | ||
| void prepareCommit(); | ||
| Map<TopicPartition, OffsetAndMetadata> prepareCommit(); |
| task.prepareCloseDirty(); | ||
| try { | ||
| task.suspend(); | ||
| } catch (final RuntimeException swallow) { |
There was a problem hiding this comment.
I could get a follow-up newbie ticket, but it seems that we have a couple of catch and swallow cases in the task manager with clean flag, does it make sense to extract the executeAndMaybeSwallow to TaskManager class and share between cases?
There was a problem hiding this comment.
SGTM. Can you create a ticket?
|
|
||
| @Test | ||
| public void shouldRecycleTask() { | ||
| EasyMock.expectLastCall(); |
guozhangwang
left a comment
There was a problem hiding this comment.
Made a thorough pass over the code.
There was a problem hiding this comment.
nit: we can throw illegal-state if the state() == RESTORING since it should never happen.
| prepareCommit(); | ||
|
|
||
| if (state() == State.CREATED || state() == State.RUNNING) { | ||
| if (state() == State.CREATED || state() == State.SUSPENDED) { |
There was a problem hiding this comment.
The comment below is not accurate anymore: we do not write checkpoint during recycle actually.
EDIT: actually, the updated offsetSnapshotSinceLastCommit seems not used since after this function we would create a new StreamTask and in between we do not check if commitNeeded at all. Could we remove line 175 then?
| switch (state()) { | ||
| case CREATED: | ||
| case RUNNING: | ||
| case SUSPENDED: |
There was a problem hiding this comment.
Ditto for line 195: we do not need to update the snapshot since we are closing the task already.
There was a problem hiding this comment.
nit: since this is a private function only called by suspend, we can modify the caller such that we only call this in RUNNING not in RESTORING, and then inside this function we do not need this check anymore.
| log.info("Suspended restoring"); | ||
| case RUNNING: | ||
| try { | ||
| closeTopology(); |
There was a problem hiding this comment.
Maybe we can skip calling this if we are in RESTORING; I have another comment below.
Also could we add javadoc on top explaining what exception can be thrown?
There was a problem hiding this comment.
Yes. RESTORING above is it's own "case" branch now (before RUNNING and RESTORING was shared the code).
| case RESTORING: | ||
| commitNeeded = false; | ||
| case RUNNING: | ||
| case SUSPENDED: |
There was a problem hiding this comment.
What about 1) move the line 387/388 out of the switch, also line 400 after the switch block, 2) and then make these three states separate branches, so that we can avoid a mix of switch / if-else.
|
|
||
| stateMgr.checkpoint(checkpointableOffsets()); | ||
| if (state() == State.SUSPENDED) { | ||
| partitionGroup.clear(); |
There was a problem hiding this comment.
Hmm... this reads a bit weird to me. Can we call this in suspend instead? Also in that case we do not need to call this in close and closeAndRecycle.
There was a problem hiding this comment.
We cannot call it in suspend, because we would loose the partition-time information that we need in prepareCommit() (that is called after suspend()).
There was a problem hiding this comment.
Got it, makes sense.
Could you copy-paste the above as comment to remind other readers?
There was a problem hiding this comment.
Sure -- but if they change it (I also did the change originally) a unit test fails anyway :)
| prepareClose(true); | ||
|
|
||
| suspend(); | ||
| prepareCommit(); |
There was a problem hiding this comment.
I think we actually do not need to commit (including write-checkpoint) when closeAndRecycle actually, and only need to suspend the task before recycle it. But this is out of the scope and we can discuss about this in another PR (cc @ableegoldman ).
There was a problem hiding this comment.
Assuming rebalancing does not happen often (in a stable deployment) it might be re-mature optimization?
| * Currently only changelog topic offsets need to be checkpointed. | ||
| */ | ||
| private Map<TopicPartition, Long> checkpointableOffsets() { | ||
| if (state() == State.RESTORING) { |
There was a problem hiding this comment.
Should we return emptyMap if we are SUSPENDED as well?
There was a problem hiding this comment.
EDIT: actually, I think we need to accumulate the consumed offsets when we just transited to suspend and then called prepareCommit, but if we are already in suspended then it is actually okay to return an emptyMap. However since we do not know if we have just transited to suspended and the below code should not be a big overhead, we can just keep it as-is.
So please ignore my previous comment :)
There was a problem hiding this comment.
Is this logic necessary? I don't think we would populate data in record collector or consumed offsets until we start processing?
| task.prepareCloseClean(); | ||
| final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task | ||
| .committableOffsetsAndMetadata(); | ||
| task.suspend(); |
There was a problem hiding this comment.
question: I cannot remember why we need to commit those still owned tasks during handle-assignment, is that necessary? Or is that just an optimization: since we are going to commit anyways, let's just commit everyone.
If that's the case, we can refresh the last-commit timestamp as well.
There was a problem hiding this comment.
Even in that case, in line 205 we could check task.commitNeeded() && task.isActive right?
There was a problem hiding this comment.
For eoa-beta, if we commit, we alway need to commit all tasks. And to not distinguish between non-eos/eos-alpha vs eos-beta, be decided to just commit all tasks for all cases.
And we don't own StreamThread#lastCommitMs so we cannot update it.
For L205: the outter if checks already if task.isActive
There was a problem hiding this comment.
@mjsax @guozhangwang why do we need to commit at all during handleAssignment? Shouldn't we have already committed all tasks that need to be committed during handleRevocation?
That's not exactly a bug, I'm just wondering if it's necessary?
There was a problem hiding this comment.
We may not call handleRevocation before calling handleAssignment so the task to close may not be in SUSPENDED state yet, and hence do close them we need to commit their states. For other tasks, they are not necessarily committing but I think the point was, that since we are going to send one commit request anyways so just commit for everyone --- note that flushing can indeed be skipped, which is what KAFKA-9450 covers
|
Updated. |
| log.info("Suspended restoring"); | ||
| case RUNNING: | ||
| try { | ||
| closeTopology(); |
| case SUSPENDED: | ||
| final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes(); | ||
|
|
||
| committableOffsets = new HashMap<>(consumedOffsets.size()); |
There was a problem hiding this comment.
+1, this seems not really necessary atm.
There was a problem hiding this comment.
Maybe not necessary after a second thought. However, one more question: why not making closeAndRecycleState idempotent as well?
| * Currently only changelog topic offsets need to be checkpointed. | ||
| */ | ||
| private Map<TopicPartition, Long> checkpointableOffsets() { | ||
| if (state() == State.RESTORING) { |
There was a problem hiding this comment.
Is this logic necessary? I don't think we would populate data in record collector or consumed offsets until we start processing?
| task.prepareCloseDirty(); | ||
| try { | ||
| task.suspend(); | ||
| } catch (final RuntimeException swallow) { |
|
LGTM. Please feel free to merge after addressed @abbccdda 's comments above. |
| public void closeAndRecycleState() { | ||
| prepareClose(true); | ||
| suspend(); | ||
| prepareCommit(); |
There was a problem hiding this comment.
Why call prepareCommit (or suspend for that matter)?
There was a problem hiding this comment.
Both now do what prepareClose() did before.
There was a problem hiding this comment.
I just mean, why not inline that? I'm just imagining coming back to this code in a few months and wondering why we need to suspend a task before recycling, or why we call prepareCommit but don't then actually commit, etc
There was a problem hiding this comment.
Nevermind, I see that's the pattern we follow everywhere else
Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
|
Merged to |
* 'trunk' of github.com:apache/kafka: (42 commits) HOTFIX: Fix compile error in TopicAdminTest (apache#8866) KAFKA-10144: clean up corrupted standby tasks before attempting a commit (apache#8849) KAFKA-10157: Fix broken tests due to InterruptedException from FinalizedFeatureChangeListener (apache#8857) KAFKA-9432: automated protocol for DescribeConfigs (apache#8312) KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (apache#8764) KAFKA-10027: Implement read path for feature versioning system (KIP-584) (apache#8680) KAFKA-10085: correctly compute lag for optimized source changelogs (apache#8787) KAFKA-10086: Integration test for ensuring warmups are effective (apache#8818) KAFKA-9374: Make connector interactions asynchronous (apache#8069) MINOR: reduce sizeInBytes for percentiles metrics (apache#8835) KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter (apache#8829) KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy (apache#8828) KAFKA-9845: Warn users about using config providers with plugin.path property (apache#8455) KAFKA-7833: Add missing test (apache#8847) KAFKA-9066: Retain metrics for failed tasks (apache#8502) KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (apache#8453) KAFKA-9985: Sink connector may exhaust broker when writing in DLQ (apache#8663) KAFKA-9441: remove prepareClose() to simplify task management (apache#8833) KAFKA-7833: Add Global/StateStore name conflict check (apache#8825) KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (apache#8630) ...
…t-for-generated-requests * apache-github/trunk: (248 commits) KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (apache#8764) KAFKA-10027: Implement read path for feature versioning system (KIP-584) (apache#8680) KAFKA-10085: correctly compute lag for optimized source changelogs (apache#8787) KAFKA-10086: Integration test for ensuring warmups are effective (apache#8818) KAFKA-9374: Make connector interactions asynchronous (apache#8069) MINOR: reduce sizeInBytes for percentiles metrics (apache#8835) KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter (apache#8829) KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy (apache#8828) KAFKA-9845: Warn users about using config providers with plugin.path property (apache#8455) KAFKA-7833: Add missing test (apache#8847) KAFKA-9066: Retain metrics for failed tasks (apache#8502) KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (apache#8453) KAFKA-9985: Sink connector may exhaust broker when writing in DLQ (apache#8663) KAFKA-9441: remove prepareClose() to simplify task management (apache#8833) KAFKA-7833: Add Global/StateStore name conflict check (apache#8825) KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (apache#8630) KAFKA-9991: Fix flaky unit tests (apache#8843) KAFKA-10014; Always try to close all channels in Selector#close (apache#8685) KAFKA-10079: improve thread-level stickiness (apache#8775) MINOR: Print all removed dynamic members during join complete (apache#8816) ...
prepareCloseClean()andprepareCloseDirty().RUNNING -> CLOSED(tasks must be suspended before closing now)suspend()withsuspendDirtyandsuspendAndPrepareCommit()Call for review @guozhangwang @abbccdda @vvcephei @ableegoldman @cadonna