KAFKA-12194: use stateListener to catch each state change#9888
KAFKA-12194: use stateListener to catch each state change#9888chia7712 merged 5 commits intoapache:trunkfrom
Conversation
|
@wcarlson5 @ableegoldman @cadonna , could you help review this PR? Thanks. |
|
oh, we are working at same issue again (#9887) :( |
| try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { | ||
| StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); |
There was a problem hiding this comment.
Why replacing the try-with-resources with an explicit close()? This did not make the test flaky, as far as I can see. Can't we add the Streams state listener as the first statement in the try-with-resources block?
There was a problem hiding this comment.
Yes, you're right. I just thought there are some duplicate codes and want to clean them up. But you're right, maybe not better. Fixed.
| @Rule | ||
| public TestName testName = new TestName(); | ||
|
|
||
| private final List<KafkaStreams.State> stateToTransitions = new ArrayList<>(); |
There was a problem hiding this comment.
I think stateHistory or stateTransitionHistory would be a more meaningful name for this variable.
| waitForStateTransition(KafkaStreams.State.REBALANCING); | ||
| waitForStateTransition(KafkaStreams.State.RUNNING); |
There was a problem hiding this comment.
I think it would be better to wait until the Kafka Streams client id in state RUNNING and then verify if the history of the states transitions after adding the stream thread is first REBALANCING and then RUNNING. Currently, the order is not verified as far as I can see.
There was a problem hiding this comment.
Good suggestion! Added a hasStateTransition method to verify that. Thanks.
|
@showuon These changes look good. Thanks for shoring up these tests |
| waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); | ||
|
|
||
| waitForStateTransition(KafkaStreams.State.RUNNING); | ||
| assertTrue(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING)); |
There was a problem hiding this comment.
We normally use assertThat() in new and refactored code. Please also change the other occurrences.
| assertTrue(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING)); | |
| assertThat(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING), is(true)); |
| // should have at least 2 states in history | ||
| if (stateTransitionHistory.size() < 2) { | ||
| return false; | ||
| } | ||
|
|
||
| for (int i = 0; i < stateTransitionHistory.size() - 1; i++) { | ||
| if (stateTransitionHistory.get(i).equals(before) && stateTransitionHistory.get(i + 1).equals(after)) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; |
There was a problem hiding this comment.
Why do we need a for-loop here? Wouldn't it suffice to verify the last two elements of the history and check if those two elements are a REBALANCING followed by a RUNNING?
There was a problem hiding this comment.
I use for loop is because I think there could be cases that there are some other state changes after RUNNING, ex: DEAD. But after your question, I think if that happened, the test should also fail as well. So, check the last 2 elements is good. Updated. Thanks.
| private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { | ||
| waitForCondition( | ||
| () -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), | ||
| DEFAULT_DURATION.toMillis(), | ||
| () -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", | ||
| expected, stateTransitionHistory) | ||
| ); | ||
| } |
There was a problem hiding this comment.
Couldn't we simply wait for the current state to become RUNNING?
| private void waitForStateTransition(final KafkaStreams.State expected) throws InterruptedException { | |
| waitForCondition( | |
| () -> !stateTransitionHistory.isEmpty() && stateTransitionHistory.contains(expected), | |
| DEFAULT_DURATION.toMillis(), | |
| () -> String.format("Client did not change to the %s state in time. Observed new state transitions: %s", | |
| expected, stateTransitionHistory) | |
| ); | |
| } | |
| private void waitForRunning() throws Exception { | |
| waitForCondition( | |
| () -> kafkaStreams.state() == KafkaStreams.State.RUNNING, | |
| DEFAULT_DURATION.toMillis(), | |
| () -> String.format("Client did not transit to state %s in %d seconds", expected, DEFAULT_DURATION.toMillis() / 1000) | |
| ); | |
| } |
There was a problem hiding this comment.
We can't just check the current state to become RUNNING because after we add/remove threads, the state won't change immediately. That is, if we check if the state is RUNNING after adding/removing threads, the check will pass, but the rebalance is not happening, yet, which will cause the test fail. So I still use stateTransitionHistory to check the state, and also, I checked the last state of the history to see if it is RUNNING. That should be better.
|
Test will fail, will work it later. Don't review yet. Thanks. |
| if (historySize >= 2 && stateTransitionHistory.get(historySize - 2).equals(before) && | ||
| stateTransitionHistory.get(historySize - 1).equals(after)) { | ||
| return true; | ||
| } |
There was a problem hiding this comment.
nit: just to better visually separate condition from if-block
| if (historySize >= 2 && stateTransitionHistory.get(historySize - 2).equals(before) && | |
| stateTransitionHistory.get(historySize - 1).equals(after)) { | |
| return true; | |
| } | |
| if (historySize >= 2 && stateTransitionHistory.get(historySize - 2).equals(before) && | |
| stateTransitionHistory.get(historySize - 1).equals(after)) { | |
| return true; | |
| } |
| } | ||
|
|
||
| private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) { | ||
| // we store each new state in state transition so that we won't miss any state change |
There was a problem hiding this comment.
Could you please remove this comment? I do not think it is needed. The code is clear enough.
| // verify if state change from "before" state into "after" state | ||
| private boolean hasStateTransition(final KafkaStreams.State before, final KafkaStreams.State after) { | ||
| final int historySize = stateTransitionHistory.size(); | ||
| // should have at least 2 states in history |
There was a problem hiding this comment.
I think this comment is also not needed. Could you remove it?
| ); | ||
| } | ||
|
|
||
| // verify if state change from "before" state into "after" state |
There was a problem hiding this comment.
This comment seems incomplete. But I would also remove it. Sorry that I am a bit picky about inline comments, but inline comment tend to lie after a while when the code they should describe changes but the comments do not. I would rather focus on giving meaning names to variables and methods. For example, I would rename this method to lastStateTransitionFromRebalancingToRunning(), remove the argumetns, and hard code the states.
There was a problem hiding this comment.
Classic Bruno, can't sneak any inline comments past him :P
| waitForRunning(); | ||
| assertThat(hasStateTransition(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING), is(true)); |
There was a problem hiding this comment.
What do you think of combining these two checks to one and call it waitForTransitionFromRebalancingToRunning(). They are always used together.
|
@cadonna @ableegoldman , Thanks for the comments. I've updated in this commit: 4161096. Thanks. |
|
All tests passed. |
| oldThreadCount = kafkaStreams.localThreadsMetadata().size(); | ||
| stateTransitionHistory.clear(); | ||
|
|
||
| // remove a thread |
| waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.REBALANCING, DEFAULT_DURATION); | ||
| waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); | ||
|
|
||
| assertThat(waitForTransitionFromRebalancingToRunning(), is(true)); |
There was a problem hiding this comment.
It seems to me the method waitForTransitionFromRebalancingToRunning can do the assert as well because we always call assertThat(waitForTransitionFromRebalancingToRunning(), is(true) in this test.
|
|
||
| @After | ||
| public void teardown() throws IOException { | ||
| stateTransitionHistory.clear(); |
There was a problem hiding this comment.
This is unnecessary as junit always create a new test class for each test case.
|
|
||
| stateTransitionHistory.clear(); | ||
|
|
||
| // add a new thread again |
|
@showuon Thanks for updating code. +1 again. |
|
LGTM too. @chia7712 please feel free to merge and cherry-pick. |
|
merge to trunk only as this issue happens only in 2.8. |
The tests are flaky because we used the
waitForApplicationStateto wait for a state.waitForApplicationStateis using poll to check the current stream state, which might miss some state changes.Ex:
I use
StateListenerto keep the new state of each state change. So when we verify a specific state, we can always find it if existed. Also have small refactor.Committer Checklist (excluded from commit message)