HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed#8140
Conversation
…changelogs for initialized tasks
|
|
||
| transitionTo(State.SUSPENDED); | ||
| log.info("Suspended running"); | ||
| log.info("Suspended active"); |
There was a problem hiding this comment.
Technically it may be running or restoring here
|
|
||
| // we can always let changelog reader try restoring in order to initialize the changelogs; | ||
| // if there's no active restoring or standby updating it would not try to fetch any data | ||
| changelogReader.restore(); |
There was a problem hiding this comment.
This isn't a necessary part of the fix, but I feel it makes more sense to move this here: the restore consumer's can only make progress on tasks that have been initialized, which only happens in taskManager.tryToCompleteRestoration
There was a problem hiding this comment.
I agree. Originally I put it in front intentionally since initializeIfNeeded is called at task creation not here, and we would only try to initialize the changelogs in restore and hence only check if the restoration can be completed.
Now that we've delayed the initializeIfNeeded in tryToComplete we should reorder the restore call as well.
| * @throws StreamsException if the store's change log does not contain the partition | ||
| */ | ||
| boolean checkForCompletedRestoration() { | ||
| boolean tryToCompleteRestoration() { |
There was a problem hiding this comment.
Also not necessary, but I feel it better suggests/reminds that it is taking some action here and not just checking a condition
vvcephei
left a comment
There was a problem hiding this comment.
Thanks, @ableegoldman !
Looks good to me. There's one logging mistake (below) that needs to be corrected, but otherwise I think it's ready to merge.
|
Ok to test. |
guozhangwang
left a comment
There was a problem hiding this comment.
Hi @ableegoldman Thanks for the hot-fix! I agree with you about fix 2) here, for fix 1) I have some alternative ideas since it is risky to add the partitions without setting their positions (we discussed in another PR @vvcephei and decide we may want to set the reset-policy to always none for restore consumer as well, which would cause NoOffsetsForPartition in the immediate poll call.
| ", this should not happen: " + changelogs); | ||
| } | ||
|
|
||
| addChangelogsToRestoreConsumer(Collections.singleton(partition)); |
There was a problem hiding this comment.
The reason I did not do this is that in consumer, even if a partition is paused, we would still try to fetch is fetching position in the poll call if it was not available. And that's why I have to defer this after we've found the starting position and initialize them with seek calls.
So I'm thinking maybe we would just remove the illegal-state check from the remoreFromRestoreConsumer call and just log it as a debug?
There was a problem hiding this comment.
Ah, I didn't realize the consumer would do anything with paused partitions. I'm not sure we want to remove the verification entirely, but we can check any "missing" partitions against the changelogs map instead.
|
|
||
| // we can always let changelog reader try restoring in order to initialize the changelogs; | ||
| // if there's no active restoring or standby updating it would not try to fetch any data | ||
| changelogReader.restore(); |
There was a problem hiding this comment.
I agree. Originally I put it in front intentionally since initializeIfNeeded is called at task creation not here, and we would only try to initialize the changelogs in restore and hence only check if the restoration can be completed.
Now that we've delayed the initializeIfNeeded in tryToComplete we should reorder the restore call as well.
| try { | ||
| task.closeClean(); | ||
| changelogReader.remove(task.changelogPartitions()); | ||
| task.closeClean(); |
There was a problem hiding this comment.
Yup! I also found this bug and fixed in another PR, but great to see it confirmed in yours as well :)
Co-Authored-By: John Roesler <vvcephei@users.noreply.github.com>
|
Ran the tests locally twice: one unrelated failure in |
guozhangwang
left a comment
There was a problem hiding this comment.
LGTM! Merging to trunk now.
* apache-github/trunk: (23 commits) KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154) HOTFIX: fix NPE in Kafka Streams IQ (apache#8158) MINOR: set scala version automatically based on gradle.properties KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142) KAFKA-9441: Add internal TransactionManager (apache#8105) MINOR: Document endpoints for connector topic tracking (KIP-558) MINOR: Standby task commit needed when offsets updated (apache#8146) KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111) MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py KAFKA-9586: Fix errored json filename in ops documentation KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058) HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140) MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337) MINOR: Improve EOS example exception handling (apache#8052) MINOR: Fix a number of warnings in clients test (apache#8073) MINOR: Update shell scripts to support z/OS system (apache#7913) MINOR: Wording fix in Streams DSL docs (apache#5692) MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141) KAFKA-9533: ValueTransform forwards `null` values (apache#8108) ...
…etrics-common * confluent/master: (76 commits) KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154) HOTFIX: fix NPE in Kafka Streams IQ (apache#8158) MINOR: set scala version automatically based on gradle.properties KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142) KAFKA-9441: Add internal TransactionManager (apache#8105) MINOR: Document endpoints for connector topic tracking (KIP-558) MINOR: Standby task commit needed when offsets updated (apache#8146) Changes to migrate to Artifactory (#263) KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111) MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py KAFKA-9586: Fix errored json filename in ops documentation KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058) HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140) MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337) MINOR: Improve EOS example exception handling (apache#8052) MINOR: Fix a number of warnings in clients test (apache#8073) MINOR: Update shell scripts to support z/OS system (apache#7913) MINOR: Wording fix in Streams DSL docs (apache#5692) MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141) ...
|
Why are there no new test cases for this bug in this PR? |
|
I'll make a note to add test cases for this in the next PR |
This fixes two issues which together caused the soak to crash/some test to fail occasionally.
What happened was:
In the main
StreamThreadloop we initialized a new task inTaskManager#checkForCompletedRestorationwhich includes registering, but not initializing, its changelogs. We then complete the loop and call poll, which resulted in a rebalance that revoked the newly-initialized task. InTaskManager#handleAssignmentwe then closed the task cleanly and go to remove the changelogs from theStoreChangelogReaderonly to get anIllegalStateExceptionbecause the changelog partitions were not in the restore consumer's assignment (due to being uninitialized).This by itself should^ be a recoverable error, as we catch exceptions here and retry closing the task as unclean. Of course the task actually was successfully closed (clean) so we now get an unexpected exception
Illegal state CLOSED while closing active taskThe fix(es) I'd propose are:
Keep the restore consumer's assignment in sync with the registered changelogs, ie the setedit: since the consumer does still perform some actions (gg fetches) on paused partitions, we should avoid adding uninitialized changelogs to the restore consumer's assignment. Instead, we should just skip them when removing.ChangelogReader#changelogsbut pause them until they are initializedStoreChangelogReader#removecall to before thetask.closeCleanso that the task is only marked as closed if everything was successful. We should do so regardless, as we should (attempt to) remove the changelogs even if the clean close failed and we must do unclean.