Skip to content

KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert#9629

Merged
ableegoldman merged 3 commits intoapache:trunkfrom
showuon:KAFKA-10754
Nov 24, 2020
Merged

KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert#9629
ableegoldman merged 3 commits intoapache:trunkfrom
showuon:KAFKA-10754

Conversation

@showuon
Copy link
Copy Markdown
Member

@showuon showuon commented Nov 20, 2020

The flaky test is because we didn't wait for the streams become RUNNING before verifying the state becoming ERROR state. This fix explicitly wait for the streams become RUNNING state. Also, I found we didn't put the 2nd stream into try resource block, so it won't close the stream after tests. In addition, I also fix some logic errors in the tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to put the kafkaStreams1 into the try resource block here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start is async, and we didn't wait for it.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Nov 20, 2020

@ableegoldman @wcarlson5 , could you help review this PR? Thanks.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete unused latch

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw a specific kind of exception, not an Exception.

waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, Duration.ofSeconds(15));

TestUtils.waitForCondition(flag::get, "Handler was called");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wait for the uncaughtExceptionHandler got called before waiting for the streams state change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order is not really that important here, either way works

@ableegoldman
Copy link
Copy Markdown
Member

Hey @showuon thanks for the quick fix! I notice that StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler still failed on the JDK15 PR build, can you look into that?

@mjsax mjsax added streams tests Test fixes (including flaky tests) labels Nov 20, 2020
@showuon
Copy link
Copy Markdown
Member Author

showuon commented Nov 21, 2020

@ableegoldman , thanks for pointing it out. After investigation, I found the test StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownThreadUsingOldHandler failed is because we set 2 stream threads for this test. So when we got the uncaughtException, we shutdown the thread, and rebalancing to the other thread. And we have to wait for rebalancing completes, and later another exception thrown in the thread, then the stream will turn into ERROR state, which is why it is so flaky.

I default set to 1 stream thread in this test since other tests will set to the expected thread number before testing.

The fix is in this commit: e6d39f6. Thank you.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Nov 21, 2020

All tests passed, Yeah!


/**
* Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
* Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG} internal thread
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was actually correct as it was (and ditto for the above). One alternative suggestion:

Suggested change
* Set the handler invoked when an {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG} internal thread
* Set the handler invoked when an internal {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks.

mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @wcarlson5 , can you take a look at this? If we change the default number of threads to 1 will we be reducing test coverage or not testing the correct thing anymore?

FWIW I think for tests where the number of threads doesn't matter, we should default to 1. But I'm not sure which tests do/do not rely on using multiple stream threads

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Both the old handler test and the close client should have 2 threads. We need to ensure that after a rebalance the old handler has attempted the process the record twice and the client shutdown only once. We can not be sure of that with only one thread.

waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, Duration.ofSeconds(15));

TestUtils.waitForCondition(flag::get, "Handler was called");
assertThat(processorValueCollector.size(), equalTo(2));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wcarlson5 for example, this test probably should have multiple threads, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Mostly these are good changes, though @ableegoldman is right changing the default to one thread will reduce our coverage. The only reason one shutdown application uses one thread is to ensure the shutdown signal still gets propagated when the last thread is dying.

mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Both the old handler test and the close client should have 2 threads. We need to ensure that after a rebalance the old handler has attempted the process the record twice and the client shutdown only once. We can not be sure of that with only one thread.

waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, Duration.ofSeconds(15));

TestUtils.waitForCondition(flag::get, "Handler was called");
assertThat(processorValueCollector.size(), equalTo(2));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, Duration.ofSeconds(15));

TestUtils.waitForCondition(flag::get, "Handler was called");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order is not really that important here, either way works

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Nov 24, 2020

@ableegoldman @wcarlson5 , thanks for the comments. Now I know why it sets the default treads to 2. So, to make the test more reliable, I'll do:

  1. wait for the state become running (as before)
  2. wait for the 1st time handler got called in current thread
  3. wait for the 2nd time handler got called after rebalancing
  4. wait for the stream state turned into ERROR state (as before)

This should make this test more reliable. What do you think?
commit: 2b6d0a2

@wcarlson5
Copy link
Copy Markdown
Contributor

These changes LGTM. WDYT @ableegoldman? thanks for the PR!

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, this LGTM. Looks like all tests passed this time around

@ableegoldman ableegoldman merged commit 2a40920 into apache:trunk Nov 24, 2020
@ableegoldman
Copy link
Copy Markdown
Member

Merged to trunk

ijuma added a commit to ijuma/kafka that referenced this pull request Dec 2, 2020
…t-for-generated-requests

* apache-github/trunk: (405 commits)
KAFKA-6687: restrict DSL to allow only Streams from the same source
topics (apache#9609)
  MINOR: Small cleanups in `AlterIsr` handling logic (apache#9663)
MINOR: Increase unit test coverage of method
ProcessorTopology#updateSourceTopics() (apache#9654)
  MINOR: fix reading SSH output in Streams system tests (apache#9665)
  KAFKA-10770: Remove duplicate defination of Metrics#getTags (apache#9659)
  KAFKA-10722: Described the types of the used state stores (apache#9607)
  KAFKA-10702; Skip bookkeeping of empty transactions (apache#9632)
  MINOR: Remove erroneous extra <code> in design doc (apache#9657)
KAFKA-10736 Convert transaction coordinator metadata schemas to use g…
(apache#9611)
  MINOR: Update vagrant/tests readme (apache#9650)
  KAFKA-10720: Document prohibition on header mutation by SMTs (apache#9597)
  KAFKA-10713: Stricter protocol parsing in hostnames (apache#9593)
  KAFKA-10565: Only print console producer prompt with a tty (apache#9644)
  MINOR: fix listeners doc to close <code> properly (apache#9655)
  MINOR: Remove unnecessary statement from WorkerConnector#doRun (apache#9653)
KAFKA-10758: ProcessorTopology should only consider its own nodes when
updating regex source topics (apache#9648)
KAFKA-10754: fix flaky tests by waiting kafka streams be in running
state before assert (apache#9629)
  MINOR: Upgrade to Scala 2.13.4 (apache#9643)
  MINOR: Update build and test dependencies (apache#9645)
MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit
(apache#9642)
  ...

clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants