KAFKA-9727: cleanup the state store for standby task dirty close and check null for changelogs#8307
Conversation
There was a problem hiding this comment.
Nice catch, this is indeed possible.
There was a problem hiding this comment.
Standby task should never be in RESTORING since we always transit from CREATED -> RUNNING -> RESTORING in one call. Did you observe this was not the case from failed system tests? Even in unclean close case you described I did not see why it could be possible..
There was a problem hiding this comment.
I think this is what the below TODO (191) was added for, Thanks :) Please feel free to remove that TODO marker then.
guozhangwang
left a comment
There was a problem hiding this comment.
LGTM! I agree that 2)/3) are bugs, not sure about 1) -- we can discuss more about this.
Could you add some tests as well?
There was a problem hiding this comment.
Standby task should never be in RESTORING since we always transit from CREATED -> RUNNING -> RESTORING in one call. Did you observe this was not the case from failed system tests? Even in unclean close case you described I did not see why it could be possible..
There was a problem hiding this comment.
I think this is what the below TODO (191) was added for, Thanks :) Please feel free to remove that TODO marker then.
There was a problem hiding this comment.
Nice catch, this is indeed possible.
|
test this please |
|
Discussed offline with @guozhangwang , the fix 1 was not correct and the true issue was due to the state transition. We call |
|
test this please |
1 similar comment
|
test this please |
| public void closeClean(final Map<TopicPartition, Long> checkpoint) { | ||
| Objects.requireNonNull(checkpoint); | ||
| close(true, checkpoint); | ||
| close(true); |
There was a problem hiding this comment.
Not for this PR: we can clean up the task-manager code to not pass in the checkpoint at all.
| this.time = time; | ||
| this.recordCollector = recordCollector; | ||
| eosDisabled = !StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); | ||
| eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); |
There was a problem hiding this comment.
This part will have some conflicts with @mjsax 's PR, just a note.
There was a problem hiding this comment.
Yea, one of us probably needs to rebase
| waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), | ||
| "Stream instance one should be up and running by now"); | ||
|
|
||
| streamInstanceOne.close(Duration.ofSeconds(30)); |
There was a problem hiding this comment.
For my own education: before the fix, this integration test will fail when instance-2 is started?
There was a problem hiding this comment.
Yes, actually either instance-1 or instance-2 would fail, depending on which box gets standby assignment. There would be a IllegalState + NPE exception sequence happening.
This PR fixes three things:
The sequence to reproduce the system test failure:
Committer Checklist (excluded from commit message)