KAFKA-12523: handle TaskCorruption/TimeoutException during handleCorruption and handleRevocation#10407
Conversation
|
Ready for review @guozhangwang @vvcephei @mjsax @cadonna |
5aea7dd to
549a5f7
Compare
85b900c to
7282884
Compare
| corruptedActive.setChangelogOffsets(singletonMap(t1p0, 0L)); | ||
| taskManager.handleCorruption(singleton(taskId00)); | ||
|
|
||
| assertThat(corruptedActive.commitPrepared, is(true)); |
There was a problem hiding this comment.
The corrupted tasks should be revived? Those this case, should this flag be reset?
There was a problem hiding this comment.
Seems like we don't reset the commitPrepared during revive, good point. I guess we should reset all of those in the StateMachineTask#revive
edit: actually I think for commitPrepared at least we should not reset it, since we just use this to verify that we did, indeed, prepare a commit. But commitNeeded should probably be cleared in StateMachineTask#revive (and ultimately in StateMachineTask#close but I don't want to mess with this in this PR since it's used very heavily in these tests, see below)
There was a problem hiding this comment.
Hm...seems like we actually may not even clear commitNeeded (or the other commit-related flags) in the actual StreamTask's close or revive. We need to be clearing those in revive or closeDirty (closeClean would have cleared during postCommit) This has probably been a long lurking bug, although a minor one
There was a problem hiding this comment.
Side note: seems weird that StateMachineTask has its own commitNeeded field rather than making the one in StreamTask protected and use it for active tasks (edit: there's actually a valid-ish reason for this, the StateMachineTask just mocks the behavior of most methods and rarely calls the super's method, so even if we used the same commitNeeded flag across the field we'd still have to remember to manually set/clear it in the same way in StateMachineTask any time we do so in Abstract/StreamTask.).
Looks like we use commitNeeded in kind of a risky way in the tests, eg to indirectly indicate that it was closed clean, or infer that we successfully committed, etc Cleaner/safer to not reuse this variable to mean so many different things and just introduce a closedClean, commitSuccessful, etc wherever needed...
But I don't want to mess with it in this PR so I'll just file a ticket to clean this up later if that makes sense.
| } | ||
|
|
||
| @Test | ||
| public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommit() { |
There was a problem hiding this comment.
The test says: should close and revive
How do we exactly verify this? Maybe we do, but it's not clear to me from the code. Can you elaborate?
There was a problem hiding this comment.
We verify the revive by asserting that it went from RUNNING back to CREATED
| } | ||
|
|
||
| @Test | ||
| public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocation() { |
There was a problem hiding this comment.
Same question as above. (Or is the fact that we don't crash good enough as criteria?)
There was a problem hiding this comment.
ditto the above (back to CREATED is the key verification, but also it should not crash)
| /** | ||
| * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets | ||
| */ | ||
| private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit, |
There was a problem hiding this comment.
Just pulled all the actual contents, excluding the TimeoutException + maybeInitTaskTimeoutOrThrow handling, so we could use it in handleCorruption without that stuff
mjsax
left a comment
There was a problem hiding this comment.
LGTM. Feel free to merge after Jenkins passed.
guozhangwang
left a comment
There was a problem hiding this comment.
Had a quick question about the closeDirtyAndRevive in catch block, otherwise lgtm. Thanks for the added test coverage!
BTW, the more I review the code now, the more I feel like removing eos-alpha and having the timeout handling simpler and assume it would always affect all tasks that the thread owns :)
| final Task task = tasks.task(taskId); | ||
| if (task.isActive()) { | ||
| corruptedActiveTasks.put(task, task.changelogPartitions()); | ||
| corruptedActiveTasks.add(task); |
There was a problem hiding this comment.
Thanks for the cleanup! I think in the past we may only mark some subset of changelog partitions as corrupted, but later we would always just mark all of them as corrupted. Just following that thought, maybe in task.markChangelogAsCorrupted we do not need to pass in parameters either but just mark all changelog partitions as corrupted?
There was a problem hiding this comment.
I didn't want to go all the way with consolidating this logic since eventually we may want to have it so that only the subset of partitions/stores which are actually corrupted will need to be wiped out. So I'd prefer to leave this as-is for now and keep the places in which we infer the changelogs from the task restricted to just the TaskManager for now
| final Collection<Task> uncorruptedTasks = new HashSet<>(tasks.activeTasks()); | ||
| uncorruptedTasks.removeAll(corruptedActiveTasks); | ||
| // Those tasks which just timed out can just be closed dirty without marking changelogs as corrupted | ||
| closeDirtyAndRevive(uncorruptedTasks, false); |
There was a problem hiding this comment.
If closeDirtyAndRevive throws here, then the next closeDirtyAndRevive would not be triggered. Is that okay, or do we guarantee that closeDirtyAndRevive would not throw at all now?
There was a problem hiding this comment.
It seems we guarantee that closeDirtyAndRevive does not throw -- this isn't a new assumption, since prior to this it was possible for closeDirtyAndRevive to throw for standby tasks which means we would not invoke it for active tasks. We're just doing the same thing here. (Even if we did throw I think it would be ok under both ALOS or EOS, as for EOS this would cause an unclean shutdown which would mean wiping the store anyway, and for ALOS we would just be closing dirty which again is what we were about to do anyway)
|
Only one test failed during the last build, |
|
Merged to trunk and cherrypicking to 2.8 once tests pass @vvcephei |
…Corruption and handleRevocation (#10407) Need to handle TaskCorruptedException and TimeoutException that can be thrown from offset commit during handleRevocation or handleCorruption Reviewers: Matthias J. Sax <mjsax@confluent.org>, Guozhang Wang <guozhang@confluent.io>
|
Done! |
|
@guozhangwang FYI I filed https://issues.apache.org/jira/browse/KAFKA-12574 to deprecate eos-alpha, and hopefully we can remove it soon-ish |
|
Thanks @ableegoldman ! |
…rrupted (#10444) Minor followup to #10407 -- we need to extract the rebalanceInProgress check down into the commitAndFillInConsumedOffsetsAndMetadataPerTaskMap method which is invoked during handleCorrupted, otherwise we may attempt to commit during a a rebalance which will fail Reviewers: Matthias J. Sax <mjsax@confluent.io>
…rrupted (#10444) Minor followup to #10407 -- we need to extract the rebalanceInProgress check down into the commitAndFillInConsumedOffsetsAndMetadataPerTaskMap method which is invoked during handleCorrupted, otherwise we may attempt to commit during a a rebalance which will fail Reviewers: Matthias J. Sax <mjsax@confluent.io>
…Corruption and handleRevocation (apache#10407) Need to handle TaskCorruptedException and TimeoutException that can be thrown from offset commit during handleRevocation or handleCorruption Reviewers: Matthias J. Sax <mjsax@confluent.org>, Guozhang Wang <guozhang@confluent.io>
…rrupted (apache#10444) Minor followup to apache#10407 -- we need to extract the rebalanceInProgress check down into the commitAndFillInConsumedOffsetsAndMetadataPerTaskMap method which is invoked during handleCorrupted, otherwise we may attempt to commit during a a rebalance which will fail Reviewers: Matthias J. Sax <mjsax@confluent.io>
Clean up handling of TaskCorruptedException in
handleRevocation: if we try to commit and get a TaskCorrupted, we should just immediately clean up the affected tasks instead of bubbling the TaskCorruptedException up through poll and trying to deal with any corrupted tasks which have since been revokedhandleCorrupted: if we get a TaskCorrupted when trying to commit the clean tasks before closing and reviving the corrupted ones, we should just include these tasks in the subsequentcloseAndReviveLeft some things as followup work to keep the changes minimal and low-risk for the 2.8 release. If it looks good I'll file tickets for any TODOs and add the ticket # in the TODO before merging
Should be cherrypicked to 2.8 @vvcephei