KAFKA-7672: Restoring tasks need to be closed upon task suspension#6113
KAFKA-7672: Restoring tasks need to be closed upon task suspension#6113guozhangwang merged 7 commits intoapache:trunkfrom
Conversation
| needsRestoring.clear(); | ||
| endOffsets.clear(); | ||
| needsInitializing.clear(); | ||
| completedRestorers.clear(); |
There was a problem hiding this comment.
added some hack code to listeners and transformer init func before next version release..
|
Triggered system test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2255/ |
| return restoringByPartition.get(partition); | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
The following three functions do not have logical changes, just re-grouping all overridden functions on top of AssignedTasks here.
| throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); | ||
| } | ||
|
|
||
| changelogReader.reset(); |
There was a problem hiding this comment.
We should not reset changelogReader since we still need its restored offset when closing the restoring tasks.
| // close all restoring tasks as well and then reset changelog reader; | ||
| // for those restoring and still assigned tasks, they will be re-created | ||
| // in addStreamTasks. | ||
| firstException.compareAndSet(null, active.closeAllRestoringTasks()); |
There was a problem hiding this comment.
Why do we close those tasks instead of just suspending them? Could we close them only if they are not re-assigned?
There was a problem hiding this comment.
The life time of a task is:
created -> [initializeStateStores] -> restoring (writes to the initialized state stores) -> [initializeTopology] -> running -> [closeTopology] -> suspended -> [closeStateManager] -> dead
I.e. the restoring tasks do not have topology initialized at all, whereas suspend call is just trying to closeTopology.
There was a problem hiding this comment.
Ok, but why can't we keep restoring task open than, and hope they get reassigned so we can continue restoring them?
There was a problem hiding this comment.
Yes we can, the issue is that today we clear all the store-restorers hence there's an issue.
We can, of course do some optimizations like do not close restoring tasks, and also do not clear their corresponding restorers as well, but this is out of the scope of this PR and I want to address it separately.
cc @vvcephei
There was a problem hiding this comment.
@guozhangwang Can you create a Jira to track this cleanup?
There was a problem hiding this comment.
|
retest this please |
| final Iterator<StreamTask> restoringTaskIterator = restoring.values().iterator(); | ||
| while (restoringTaskIterator.hasNext()) { | ||
| final StreamTask task = restoringTaskIterator.next(); | ||
| log.debug("Closing restoring and not re-assigned task {}", task.id()); |
There was a problem hiding this comment.
Sorry for my denseness... Why are these "not re-assigned"? They're part of a data structure called "assigned tasks", which seems to imply that they are assigned.
There was a problem hiding this comment.
Terminology storm coming :)
The assigned tasks contains the following non-overlapping sets (see my other comment as well for details): created, restoring, running, suspended. Normally created should be empty since once a task is created it should move on transit to either restoring or running immediately. This function is called below for suspendTasksAndState. In which case we should:
createdandsuspended: no action, since it should be empty.running: transit tosuspended.restoring: close immediately.
The comment itself is indeed misleading, I will change to Closing restoring task
vvcephei
left a comment
There was a problem hiding this comment.
LGTM, @guozhangwang
I did ask one question about a comment, and I share @mjsax 's question about why we need to fully close stores that may be reassigned.
|
Failed tests are not relevant. Local run passed. |
|
retest this please |
…6113) * In activeTasks.suspend, we should also close all restoring tasks as well. Closing restoring tasks would not require `task.close` as in `closeNonRunningTasks `, since the topology is not initialized yet, instead only state stores are initialized. So we only need to call `task.closeStateManager`. * Also add @linyli001 's fix. * Unit tests updated accordingly. Reviewers: Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>
|
Cherry-picked to 2.2 as well. |
* AK/trunk: (36 commits) KAFKA-7962: Avoid NPE for StickyAssignor (apache#6308) Address flakiness of CustomQuotaCallbackTest#testCustomQuotaCallback (apache#6330) KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (apache#6327) MINOR: fix parameter naming (apache#6316) KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (apache#6218) MINOR: Refactor replica log dir fetching for improved logging (apache#6313) [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (apache#6227) MINOR: Increase produce timeout to 120 seconds (apache#6326) KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (apache#6293) MINOR: Fix line break issue in upgrade notes (apache#6320) KAFKA-7972: Use automatic RPC generation in SaslHandshake MINOR: Enable capture of full stack trace in StreamTask#process (apache#6310) KAFKA-7938: Fix test flakiness in DeleteConsumerGroupsTest (apache#6312) KAFKA-7937: Fix Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup (apache#6311) MINOR: Update docs to say 2.2 (apache#6315) KAFKA-7672 : force write checkpoint during StreamTask #suspend (apache#6115) KAFKA-7961; Ignore assignment for un-subscribed partitions (apache#6304) KAFKA-7672: Restoring tasks need to be closed upon task suspension (apache#6113) KAFKA-7864; validate partitions are 0-based (apache#6246) KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (apache#6285) ...
…pache#6113) * In activeTasks.suspend, we should also close all restoring tasks as well. Closing restoring tasks would not require `task.close` as in `closeNonRunningTasks `, since the topology is not initialized yet, instead only state stores are initialized. So we only need to call `task.closeStateManager`. * Also add @linyli001 's fix. * Unit tests updated accordingly. Reviewers: Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>
In activeTasks.suspend, we should also close all restoring tasks as well. Closing restoring tasks would not require
task.closeas incloseNonRunningTasks, since the topology is not initialized yet, instead only state stores are initialized. So we only need to calltask.closeStateManager.Also add @linyli001 's fix.
Unit tests updated accordingly.
Committer Checklist (excluded from commit message)