KAFKA-10555: Improve client state machine#9720
KAFKA-10555: Improve client state machine#9720mjsax merged 20 commits intoapache:trunkfrom wcarlson5:wcarlson-fsm-changes
Conversation
There was a problem hiding this comment.
This will not be needed with the new error definition
There was a problem hiding this comment.
we will be doing the same thing but closing the client for now. Maybe a replace globalThread will be added later
There was a problem hiding this comment.
We want to make close() idempotent and not throw an exception but we will log a warning, but only for close so that is why these logs are not in the setState() method.
There was a problem hiding this comment.
This will let Streams shutdown uncleanly when in EOS mode
There was a problem hiding this comment.
This test is for the functionality we are removing
There was a problem hiding this comment.
Just wait for ERROR because you don't close crashed
There was a problem hiding this comment.
No longer do you need to close crashed
There was a problem hiding this comment.
Why this change? client2 should transit from PENDING_ERROR to ERROR (not from PENDING_SHUTDOWN to CLOSED, and thus the new CLOSE check seem not to be right? Or do I miss something?
There was a problem hiding this comment.
My thoughts where that we are calling close and so its is going to just be closed as the state transitions are cleared. This is the test that made me want you to review the PR I was worried that I might have broken the integrity of the tests as it is not behaving as I expected. However no other test was having this problem so I was not sure if I was understanding it correctly.
There was a problem hiding this comment.
From my understanding/ according to the new state transition diagram, if there is an error we go to PENDING_ERROR and if we call close() we transit PENDING_ERROR -> ERROR?
There was a problem hiding this comment.
the call to close() will have no effect. The PENDING_ERROR -> ERROR transition is like the PENDING_SHUTDOWN -> NOT_RUNNING where the transition from the PENDING... state is automatic
There was a problem hiding this comment.
remove deprecated handler
There was a problem hiding this comment.
When the client is shutdown it now goes to ERROR
There was a problem hiding this comment.
super nit: fix indention of comment
Also please update the state diagram in the comment above.
There was a problem hiding this comment.
Should we return true there? I understand that we are not in NOT_RUNNING state, but in the end we are in a terminal state and we did cleanup all resources. -- I guess, I am raising the question if we should relax the definition of this return value?
There was a problem hiding this comment.
I don't think that we should return true. A user would then expect that that the state would then change to NOT_RUNNING and could be stuck waiting on that. Where if we return false in then they might retry and get stuck there.
I don't know which is a better problem to have but I think that changing the meaning of this return value won't add any more clarity to the situation.
There was a problem hiding this comment.
Fair point. Might be good to get input from somebody else in addition. \cc @cadonna
There was a problem hiding this comment.
As far as I can see, where the return value is used the javadoc says
true if all threads were successfully stopped, false if the timeout was reached.
Since all threads were successfully stopped, I would return true. We clearly document that ERROR is a terminal state, so I do not see why somebody should wait for NOT_RUNNING when the client is in ERROR and close() returns true.
There was a problem hiding this comment.
Alright I adjusted the close response to align with this.
There was a problem hiding this comment.
If the global thread dies, should we not transit to ERROR at the end?
There was a problem hiding this comment.
It will. We don't strictly need to test it here as that is tested elsewhere but we can add it for clarity
There was a problem hiding this comment.
I guess it would not hurt. Leave it up to you than.
There was a problem hiding this comment.
Why not just change this from CLOSE_CRASHED to CRASH ?
There was a problem hiding this comment.
Should we extend it to have two state transitions, RUNNING -> PENDING_ERROR and PENDING_ERROR -> ERROR ?
There was a problem hiding this comment.
Actually it could probably be just PENDING_ERROR -> ERROR because it could be in RUNNING or REBALNCING previously
There was a problem hiding this comment.
Nit: the error message is weird -> Streams didn't transit to ERROR state.
There was a problem hiding this comment.
I don't think that we should return true. A user would then expect that that the state would then change to NOT_RUNNING and could be stuck waiting on that. Where if we return false in then they might retry and get stuck there.
I don't know which is a better problem to have but I think that changing the meaning of this return value won't add any more clarity to the situation.
There was a problem hiding this comment.
It will. We don't strictly need to test it here as that is tested elsewhere but we can add it for clarity
There was a problem hiding this comment.
It made it so that the new default was not used until we updated the Error transition as we are doing in this PR. :)
There was a problem hiding this comment.
My thoughts where that we are calling close and so its is going to just be closed as the state transitions are cleared. This is the test that made me want you to review the PR I was worried that I might have broken the integrity of the tests as it is not behaving as I expected. However no other test was having this problem so I was not sure if I was understanding it correctly.
There was a problem hiding this comment.
I thought we always need to call close? If an error happens, we call the handler, and if the handler return shutdown, we transit to PENDING_ERROR. On close() we transit from PENDING_ERROR -> ERROR?
Or do I have some misconception?
There was a problem hiding this comment.
the handler will call close, but the user will not need to. The PENDING_ERROR state is indicating the resources are closing before the transition to ERROR after which no more work will be done. We made it so the user can call close on PENDING_ERROR or ERROR but it will only log a warning
There was a problem hiding this comment.
@mjsax I am still looking into the shouldUpgradeFromEosAlphaToEosBeta test. but not making a ton of progress I will try to carve out a little time tomorrow for it.
I think there was a bit of miss-understanding about what PENDING_ERROR means but I hope I cleared it up the comments below.
There was a problem hiding this comment.
the handler will call close, but the user will not need to. The PENDING_ERROR state is indicating the resources are closing before the transition to ERROR after which no more work will be done. We made it so the user can call close on PENDING_ERROR or ERROR but it will only log a warning
There was a problem hiding this comment.
the call to close() will have no effect. The PENDING_ERROR -> ERROR transition is like the PENDING_SHUTDOWN -> NOT_RUNNING where the transition from the PENDING... state is automatic
| Arrays.asList( | ||
| KeyValue.pair(KafkaStreams.State.ERROR, KafkaStreams.State.PENDING_SHUTDOWN), | ||
| KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) | ||
| KeyValue.pair(State.PENDING_ERROR, State.ERROR) |
There was a problem hiding this comment.
Using just PENDING_ERROR -> ERROR because the transition to PENDING_ERROR can be from multiple sources. Also that transition is already tested so this check implies it
mjsax
left a comment
There was a problem hiding this comment.
Just some final nits. Overall LGTM.
| () -> !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING), | ||
| MAX_WAIT_TIME_MS, | ||
| () -> "Client did not startup on time. Observers transitions: " + observed | ||
| () -> "Client did not have the expected state transition on time. Observers transitions: " + observed |
There was a problem hiding this comment.
Why this change? We do wait for RUNNING?
There was a problem hiding this comment.
We do wait for running, I was thinking of bringing it to match with the other methods below but that doesn't make it anymore useful so I will revet it.
|
Btw: to what extend do we need to update the docs? We should at least add a section to |
|
I extracted the https://github.com/apache/kafka/pull/9720/files#r562203226 fix to #9948 |
|
@mjsax it looks like there is one unrelated failure |
|
Am I correct in believing this change was released in Kafka 2.8.0? I can't find it in the release notes. We just had a minor outage because we didn't realize when upgrading to v2.8 that some error handling/shutdown code in our Kafka Streams app couldn't assume that it only had to kick in when the state transitioned to |

Removes the transition to error for when there are no threads and makes Error terminal
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)