Skip to content

KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest#9733

Merged
mjsax merged 5 commits intoapache:trunkfrom
showuon:KAFKA-10017-store-error
Jan 19, 2021
Merged

KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest#9733
mjsax merged 5 commits intoapache:trunkfrom
showuon:KAFKA-10017-store-error

Conversation

@showuon
Copy link
Copy Markdown
Member

@showuon showuon commented Dec 11, 2020

The PR follows #9688, to make the EosBetaUpgradeIntegrationTest more stable. Fixed some issues:

  1. Test failed with the error message:
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING

https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk8/274/testReport/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/
After investigation, I found it's because we actually do rebalancing twice after new stream started: 1 for Adding new member, 1 for leader re-joining group during Stable. So, if we only wait for the state to be RUNNING, it might enter REBALANCING state later, and cause that we can't get store successfully

  1. We setUncaughtExceptionHandler, but didn't handle it well. Before, we expected the uncaught exception only got 1, but actually, we'll get 4 here (2 injected exception, 2 injected commit exception). So, we saw many messages in stderr output, which is not good for debugging. After fix, we only output to stderr when the exception is not what we expected for debugging use.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Dec 11, 2020

@mjsax @ableegoldman @guozhangwang , could you help review this PR? Thanks.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

The built result is not available any longer. Where was the InvalidStateStoreException thrown? Wondering, because keysFromInstance should retry internally? Did we exceed the timeout?

We should already have a guard in place for multiple rebalances though, thus, I am not sure atm if I understand the root cause of the issue.

@@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
streams2Alpha.cleanUp();
streams2Alpha.start();
assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the issue you point out, we actually use this line. As a matter of fact, there is no guarantee if there would be even more than two rebalances. \cc @ableegoldman might be able to provide more input?

Copy link
Copy Markdown
Member Author

@showuon showuon Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
The stacktrace is like this. The line number is not mapped to master branch correctly, but you can know what it is from the method name. It failed when it's trying to get all state data and checking if the current stream is in RUNNING state, but it's under rebalancing. And this exception won't do any retry.

Stacktrace

org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
	at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
	at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
	at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.keysFromInstance(EosBetaUpgradeIntegrationTest.java:1112)
	at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:494)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

And, yes, we make sure it completed the rebalance then checking the running state, but as I mentioned, there will be 2 rebalance happened(1 for Adding new member, 1 for leader re-joining group during Stable), and we only wait 1 rebalance completes, so there might be another rebalancing later. The 2 stream state transition log is like this:

stateTransitions1:
[KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]  

stateTransitions2: 
[KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]

So, as you can see, we might enter next step when we are in step 2 (KeyValue(REBALANCING, RUNNING)), and there will be another rebalancing soon. That's why I'll wait explicitly for this transition pair [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line number is not mapped to master branch correctly, but you can know what it is from the method name.

Well, this seem to imply the the run was on an older version that did not contain the retry yet -- note. adding the retry it was part of the latest PR for this test that we merged: https://github.com/apache/kafka/pull/9688/files#diff-86a5136ae170df067137442b5eae05fa5fd9d1e02aca85bac8a251b7d2557b0eR1089

Thus, I would assume the the whole test failure you observes was before the last fix.

there will be 2 rebalance happened

Correct, but we actually "cut off" the unstable rebalances (there might actually be more then 2...) via assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);

To me, it seems that you observe the test failure before the PR got merged -- otherwise the line numbers for keysFromInstance would match.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I didn't notice there is already new added retry in master. I'll revert my change for it. Thank you!

// The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream.
// So, log to stderr for debugging when the exception is not what we expected
e.printStackTrace(System.err);
fail("Should only get one uncaught exception from Streams.");
Copy link
Copy Markdown
Member

@mjsax mjsax Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this fail did not work as expected? Otherwise the test would have failed all the time? Maybe we should rather set a boolean flag that we evaluate outside of the callback to let the test fail?

Also, we have one run with zero exceptions and one run with 2 exception (one exception type each) -- not 4. Thus, we need to handle this differently for the error-injection and the "clean run" differently depending on the boolean test flag.

Copy link
Copy Markdown
Member Author

@showuon showuon Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this fail did not work as expected? Otherwise the test would have failed all the time?

Yes, the fail() is throwing AssertionError to fail the stream, but that's excatly what we expected to fail the stream, just not with our injected exception. So it won't failed the test.

we need to handle this differently for the error-injection and the "clean run" differently depending on the boolean test flag.

I see, will update it. Thanks.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, but the AsseriontError does not mark the test as failed as it seems, because it would be thrown in a different thread? We would need to set a boolean flag, and add an assertThat to the main test code to see if the flag fired?

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Dec 21, 2020

@mjsax , as mentioned in #9690 (comment), I've updated the commit. What my change is:

  1. Before my change, when we started a stream with the other one is running, we'll just waitForRunning for both streams. But we actually expect more than 1 rebalancing here (unstable + stable assignments), that would make us run the following tests under the wrong stream state. For example: When stream1 started, it might go through
[CREATED -> REBALANCING], [REBALANCING -> RUNNING](unstable), [RUNNING -> REBALANCING], [REBALANCING -> RUNNING](stable)

So, if we only waitForRunning, it might be just after the unstable rebalancing. Though we have waitForStableAssignment, it still doesn't guarantee it already completed the stable rebalacing. That is, we need to wait for the "specific number" of [REBALANCING -> RUNNING] state transition so that we can make sure it completes all necessary rebalancing. And the "specific number" can be counted by the onAssignmentComplete callback.

Note: I only waitForNumRebalancingToRunning for the new started stream because I found sometimes if the stream runs fast enough, the already running stream might not have the expected number of [REBALANCING -> RUNNING] state transition. The reason is this line:

[appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED 

Basically, The already running stream thread should have the state change: [RUNNING to PARTITIONS_REVOKED], [PARTITIONS_REVOKED to PARTITIONS_ASSIGNED](unstable), [PARTITIONS_ASSIGNED to RUNNING] + [RUNNING to PARTITIONS_ASSIGNED](stable), [PARTITIONS_ASSIGNED to RUNNING]. Because it needs one more PARTITIONS_REVOKED step, it might be under 2 PARTITIONS_ASSIGNED at the same time.

  1. Fail the test when there are too many exception thrown or unexpected exception thrown by the hasUnexpectedError flag.

Thank you.

@showuon showuon force-pushed the KAFKA-10017-store-error branch from c032fd3 to ee037dd Compare December 21, 2020 08:50
private final static String APP_DIR_1 = "appDir1";
private final static String APP_DIR_2 = "appDir2";
private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception or" +
"there are too many exceptions thrown, please check standard error log for more info.";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing space between or (line above) and there

private Throwable uncaughtException;

private int testNumber = 0;
private boolean hasUnexpectedError = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable should be declare volatile because the callback is executed on a different thread.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Fixed.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR. I am still not sure if I understand your reasoning. Also, let's back up for a second.

You originally observed a test failure based on missing retries when trying to get the state store. This should be fixed already.

Did you encounter any other test failures? I could not reproduce any test failures locally, so I am still wondering why we would need the changes (besides the fix in the exception callback).

I see that you did change the StableAssignmentListener but I don't understand why we need this: We first block until we get a stable assignment. For this case, we know that the state must be in rebalancing (because Kafka Streams would transit to REBALACING before the StableAssignmentListener is called). Thus, after waitForStableAssignment returns, we only need to wait until we transit to state RUNNING -- I don't see any advantage in tacking the exact number of rebalances. Just waiting until the last state is RUNNING seems sufficient.

Do I miss something?

Again: did you observer more test failures that indicate that we need the proposed changes?

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Dec 23, 2020

@mjsax , thanks for your comments. Let me answer your questions below.

You originally observed a test failure based on missing retries when trying to get the state store. This should be fixed already.

--> Right! The failure when trying to get state store is fixed now.

did you observer more test failures that indicate that we need the proposed changes?

--> Yes, I observe test failures on your PR to 2.6 (i.e. #9690 (comment)), this test failure should also happen in trunk branch, which is caused by the stream is still under unstable assignment, not stable yet.

For this case, we know that the state must be in rebalancing (because Kafka Streams would transit to REBALACING before the StableAssignmentListener is called)

--> No, that's not what I observed. What I observed is that the StableAssignmentListener only completes the assignment, and the streams haven't handled the assignment yet. Please check the following logs:

2020-12-23T11:35:01.361+0800 [DEBUG] [TestEventLogger]     !!! onAssignmentComplete
...
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,703] INFO stream-thread [appDir2-StreamThread-1] Handle new assignment with:
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]          New active tasks: [0_0, 0_2]
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]          New standby tasks: []
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]          Existing active tasks: []
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]          Existing standby tasks: [0_0, 0_2] (org.apache.kafka.streams.processor.internals.TaskManager:255)
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,703] INFO stream-thread [appDir1-StreamThread-1] Handle new assignment with:
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]          New active tasks: [0_1, 0_3]
2020-12-23T11:35:19.703+0800 [DEBUG] [TestEventLogger]          New standby tasks: []
2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger]          Existing active tasks: [0_1, 0_3]
2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger]          Existing standby tasks: [] (org.apache.kafka.streams.processor.internals.TaskManager:255)
...
2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,703] INFO stream-thread [appDir1-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:224)
2020-12-23T11:35:19.704+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,703] INFO stream-client [appDir1] State transition from RUNNING to REBALANCING (org.apache.kafka.streams.KafkaStreams:298)
...
2020-12-23T11:35:19.711+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,711] INFO stream-thread [appDir2-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread:224)
2020-12-23T11:35:19.711+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,711] INFO stream-client [appDir2] State transition from RUNNING to REBALANCING (org.apache.kafka.streams.KafkaStreams:298)
...
2020-12-23T11:35:19.776+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,776] INFO stream-thread [appDir1-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:224)
2020-12-23T11:35:19.776+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:19,776] INFO stream-client [appDir1] State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:298)
...
2020-12-23T11:35:20.028+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:20,028] INFO stream-thread [appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:224)
2020-12-23T11:35:20.028+0800 [DEBUG] [TestEventLogger]     [2020-12-23 11:35:20,028] INFO stream-client [appDir2] State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:298)

The line !!! onAssignmentComplete is the callback we got on onAssignmentComplete( I added). Then, the stream thread handle the new assignments, next, it'll do the state transition (change to REBALANCING) for each stream, and the last, will be in RUNNING state after rebalancing completes.

And, that's the reason why the PR to 2.6 #9690 keeps encountering test failures, because we're still under "unstable" assignment (that is, just complete the onAssignmentComplete, not entering stable rebalancing stage yet). And in 2.6, the unstable assignment will sometimes be empty assignmet, that's why the test is flaky. Though in 2.7 and later, we don't see any test failure so far, but the stream still has possibility that it's not in stable RUNNING state, and the assignment might be changed after stable assignment. That's why I think we should fix this issue in trunk branch, too.

Also, I think we should apply this fix to the PR #9690 , too, to fix the failed test issue.

Thank you. Happy Holidays~

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 23, 2020

Thanks for the input! The test behavior is slightly different in 2.6 compared to 2.7/trunk though and thus, I am not sure if we can apply the same reasoning (that is also the reason why there is a separate PR for 2.6). I did not have time to look into the 2.6 PR yet, as I wanted to resolve this PR first.

assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments;
waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments);
waitForRunning(stateTransitions1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you reasoning is right, do we need to use waitForNumRebalancingToRunning here, too? If there are multiple rebalances, both clients would participate?

Also, you PR does not update all stages when we wait for state changes. Why? Would we not need to apply to logic each time?

Copy link
Copy Markdown
Member Author

@showuon showuon Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your questions. Answer them below:

Also, you PR does not update all stages when we wait for state changes. Why? Would we not need to apply to logic each time?

--> Yes, I should do that. What I did now is just focusing on the case: 1 new started stream + 1 already running stream, which will have more failures here. But you're right, I should put the change in all stages.

If you reasoning is right, do we need to use waitForNumRebalancingToRunning here, too? If there are multiple rebalances, both clients would participate?

--> Good question! I have explained in the previous comment (#9733 (comment)) though, I can explain again since I know you didn't understand exactly why I did this change before.

I only waitForNumRebalancingToRunning for the new started stream only, not for the "already running stream" because I found sometimes if the stream runs fast enough, the "already running stream" might not have the expected number of [REBALANCING -> RUNNING] state transition. The reason is this line:

[appDir2-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED 

Basically, The already running stream thread should have the state change: [RUNNING to PARTITIONS_REVOKED], [PARTITIONS_REVOKED to PARTITIONS_ASSIGNED](unstable), [PARTITIONS_ASSIGNED to RUNNING], [RUNNING to PARTITIONS_ASSIGNED](stable), [PARTITIONS_ASSIGNED to RUNNING]. Because it needs one more PARTITIONS_REVOKED step, it might be under 2 PARTITIONS_ASSIGNED at the same time (no RUNNING in the middle). And that's why the stream client doesn't change to RUNNING as we expected.

Does that make sense?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 14, 2021

@showuon Sorry for late response. Pretty busy atm.

We found that the fix for the exception handler contained in this PR is also blocking #9720 (\cc @wcarlson5) -- could you extract this fix into a separate PR that we can merge quickly to unblock 9720? Otherwise we might need to extract your fix into 9720 to be able to move forward with it.

Hope to cycle back to this PR soon..

Thanks.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jan 15, 2021

OK, working on it now.

@showuon showuon force-pushed the KAFKA-10017-store-error branch from e5cac2d to f2f0967 Compare January 15, 2021 02:30
@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jan 15, 2021

@mjsax , I've reverted my change for the state change. Now, there's only the exception handler improvement change.

And for the state change fix, after the holidays, I think, if the test is not flaky now, why should we change it, and maybe make it unreliable again. So, my thought is, we keep monitoring this test, and if tests failed again, we can discuss it again. What do you think?

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jan 15, 2021

There's a jenkins timeout build error in jdk8. Made a minor change to trigger jenkins build again to make sure it's infra's issue, not my change

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jan 15, 2021

All tests passed.

@mjsax mjsax added streams tests Test fixes (including flaky tests) labels Jan 19, 2021
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 19, 2021

Thanks for updating the PR!

And for the state change fix, after the holidays, I think, if the test is not flaky now, why should we change it, and maybe make it unreliable again. So, my thought is, we keep monitoring this test, and if tests failed again, we can discuss it again. What do you think?

Agree. We can always do a follow up if needed.

@mjsax mjsax merged commit 130274b into apache:trunk Jan 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants