KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures#5418
KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures#5418rajinisivaram merged 1 commit intoapache:2.0from
Conversation
|
@mjsax @guozhangwang fixed similar issue on trunk during adminclient changes. looks like error is due to intermediate topics. |
| for (final String topic : topics) { | ||
| try { | ||
| brokers[0].deleteTopic(topic); | ||
| } catch (final UnknownTopicOrPartitionException e) { } |
There was a problem hiding this comment.
Should the exception be logged ?
| } catch (final UnknownTopicOrPartitionException e) { } | ||
| } | ||
|
|
||
| if (timeoutMs > 0) { |
There was a problem hiding this comment.
Should timeout be re-calculated considering the for loop above ?
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the PR.
Please assign Jiras to yourself before you start working on them.
| @After | ||
| public void cleanup() throws InterruptedException { | ||
| CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC); | ||
| CLUSTER.deleteAllTopicsAndWait(120000); |
There was a problem hiding this comment.
Can you elaborate why this fixed the test?
There was a problem hiding this comment.
I'm not 100 percent sure what's the race condition here, and why it fixes the test.
There was a problem hiding this comment.
To be frank, I am not sure about the root-cause. Parameterized StreamStreamJoinIntegrationTest tests are passing for cacheEnabled=true and intermittently failing for parameter cacheEnabled=false.
I thought we may not be cleaning some intermediate topics/change-log/? topics between parameterized runs. So I tried deleting all topics including offsets topics. With this I am not able to reproduce the failures.
I may be totally wrong. I'll spend some more time to understand the tests and debug the issue.
There was a problem hiding this comment.
@omkreddy Were you be able to reproduce the issue locally before this PR?
There was a problem hiding this comment.
Yes, I am able to reproduce the failures on my machine.
There was a problem hiding this comment.
It seems we are using the same application id twice in StreamStreamJoinIntegartionTest
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
This might be the root case -- deleting all topics would solve the issue, too, as it prevent to start with a corrupted state.
There was a problem hiding this comment.
I seems that TableTableJoinIntegrationTest has a similar issue...
There was a problem hiding this comment.
Ok.. Yes, I have observed failures with TableTableJoinIntegrationTest too. Shall I update the appId?
There was a problem hiding this comment.
One thing I can think of is that the changelog topic of the joining streams's materialized stores were not deleted, and hence upon starting on the next test case, the store would be pre-populated and hence cause join result to be wrong (note that the state dir is a @Rule which means it will get cleaned up on each test case, and in the next run it will not be the same directory path).
And the reason that it does not always fail is that we have to hit it that the two tests (with and without caching) consecutively.
So think with your fix in 2.0 and trunk, it is Okay to leave the appID as is for now.
There was a problem hiding this comment.
Just ran some more tests locally to confirm my suspicion.
rajinisivaram
left a comment
There was a problem hiding this comment.
@omkreddy Thanks for the PR, LGTM. Since this code is already in trunk and we are seeing issues with 2.0 builds, I am going to merge it to 2.0 before creating the RC. We should address all review comments in a follow-on PR.
Committer Checklist (excluded from commit message)