Skip to content

KAFKA-8319: Make KafkaStreamsTest a non-integration test class#7382

Merged
guozhangwang merged 7 commits intoapache:trunkfrom
guozhangwang:KMinor-KafkaStreamsTest-flaky
Sep 27, 2019
Merged

KAFKA-8319: Make KafkaStreamsTest a non-integration test class#7382
guozhangwang merged 7 commits intoapache:trunkfrom
guozhangwang:KMinor-KafkaStreamsTest-flaky

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Sep 24, 2019

Previous KafkaStreamsTest takes 2min20s on my local laptop, because lots of its integration test which is producing / consuming records, and checking state directory file system takes lots of time. On the other hand, these tests should be well simplified with mocks.

This test reduces the test from a clumsy integration test class into a unit tests with mocks of its internal modules. And some other test functions should not be in KafkaStreamsTest actually and have been moved to other modular test classes. Now it takes 2s.

Also it helps removing the potential flakiness of the following (some of them are claimed resolved only because we have not seen them recently, but after looking at the test code I can verify they are still flaky):

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang changed the title MINOR [WIP]: Fix KafkaStreamsTest MINOR: Fix KafkaStreamsTest Sep 24, 2019
@guozhangwang guozhangwang changed the title MINOR: Fix KafkaStreamsTest KAFKA-8319: Make KafkaStreamsTest a non-integration test class Sep 24, 2019
@SuppressWarnings("deprecation")
@Override
public synchronized void close(long timeout, TimeUnit unit) {
ensureNotClosed();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a piggy-backed fix of MockConsumer: in the actual KafkaConsumer we do allow the close() call to be triggered multiple times, so there's no point checking this.

private final Admin adminClient;

private GlobalStreamThread globalStreamThread;
protected GlobalStreamThread globalStreamThread;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to stream threads, making it accessible directly from KafkaStreamsTest.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package-private should also work, right?

}

@Test
public void testOsDefaultSocketBufferSizes() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are moved to StreamsConfigTest.

@Override
public Admin getAdmin(final Map<String, Object> config) {
return new MockAdminClient(cluster.nodes(), cluster.nodeById(0));
return new MockAdminClient(cluster.nodes(), cluster.nodeById(-1));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a piggy-backed fix: the controller's id is defined as -1, so with a single node cluster we should set it's controller as -1 not as 0.


private Cluster createCluster() {
final Node node = new Node(0, "localhost", 8121);
final Node node = new Node(-1, "localhost", 8121);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the other comment below in MockClientSupplier.java

}

@Test
public void shouldCleanupOldStateDirs() throws InterruptedException {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are examples where we should really not rely on checking the underlying state directory files, but just check the mocked state-directory as the expected construction call, etc.

final Collection<String> inputTopics,
final String outputTopic,
final boolean shouldFilesExist) throws Exception {
final File baseDir = new File(TestUtils.IO_TMP_DIR + File.separator + "kafka-" + TestUtils.randomString(5));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the other comment above: instead of checking the file system which is costly and flaky, we should just use a mock with expected constructors / function calls.

}
PowerMock.replayAll();

public static class StateListenerStub implements KafkaStreams.StateListener {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved up in the file, no logical changes.

}

@Test
public void testCannotStartTwice() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a flaky test that should be removed: the streams.start() would only throw if we are still in the REBALANCING state, which is not really a good test anyways.

}

@Test
public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two tests are not really for KafkaStreams but for StreamThreads / GlobalStreamThread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these cases already covered by existing tests over there? I didn't see them moved as part of this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first test is covered in GlobalStreamThreadTest#shouldThrowStreamsExceptionOnStartupIfExceptionOccurred: we call partitionsFor during the initialization of the global task, which would throw if the broker is not connected.

The second test is covered in shouldNotThrowWhenPendingShutdownInRunOnce and shouldNotThrowWithoutPendingShutdownInRunOnce of StreamThreadTest to me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! Thanks for the confirmation.

@mjsax mjsax added the streams label Sep 24, 2019
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang ,

Thanks so much for this. I made a quick pass (definitely didn't go over all the tests to make sure they are effectively the same), and everything looks reasonable. I left a few comments.

Thanks,
-John

private final Admin adminClient;

private GlobalStreamThread globalStreamThread;
protected GlobalStreamThread globalStreamThread;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package-private should also work, right?

} finally {
streams.close();
}
streams.cleanUp();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up with this last one? Should it be in the finally block?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

}

@Test
public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these cases already covered by existing tests over there? I didn't see them moved as part of this PR.

props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);

// make sure we have the global state thread running too
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is incorrect too :) it should be make sure we do NOT have global state thread.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang ,

I made another, closer, pass. Everything looks good to me. I have a concern about one test, but my concern pre-dates this PR. I'll leave it to you whether you want to take the feedback or not. Regardless, your PR should already be a huge help.

Thanks,
-John


try {
th.start();
th.join(30_000L);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow the logic of this test... We want to show that it doesn't block, but then we just assert that the call doesn't take longer than 30 seconds.

It seems like this test is sensitive to timing, which is not ideal. I guess that what we really wanted to test is that the close method never calls Thread.sleep()/Thread.join() or maybe just never calls a specific list of blocking methods on the clients. Now that we have all the mocks above, it seems like we can make this assertion directly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I've actually simplify the logic to just check that close() call returns false, since with mock-time it would never elapse any time, and if the call ever waits on the state transition it would not return ever.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR @guozhangwang, this will really help with the length of time tests take. I just have one minor comment otherwise this LGTM.

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@Category({IntegrationTest.class})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want this labeled as an integration test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@Category({IntegrationTest.class})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is no more an integration test, this should be removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.


@Category({IntegrationTest.class})
@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaStreams.class, StreamThread.class})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to prepare KafkaStreams for testing? Only classes that are mocked and that are either final or have static methods need to be prepared.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually need to prepare KafkaStreams for cases like metrics reporter registration.

@guozhangwang guozhangwang merged commit 22434e6 into apache:trunk Sep 27, 2019
ijuma added a commit to confluentinc/kafka that referenced this pull request Sep 29, 2019
Conflicts:
* .gitignore: addition of clients/src/generated-test was near
local additions for support-metrics.
* checkstyle/suppressions.xml: upstream refactoring of exclusions
for generator were near the local changes for support-metrics.
* gradle.properties: scala version bump caused a minor conflict
due to the kafka version change locally.
gradle/dependencies.gradle: bcpkix version bump was near avro
additions in the local version.

* apache-github/trunk: (49 commits)
  KAFKA-8471: Replace control requests/responses with automated protocol (apache#7353)
  MINOR: Don't generate unnecessary strings for debug logging in FetchSessionHandler (apache#7394)
  MINOR:fixed typo and removed outdated varilable name (apache#7402)
  KAFKA-8934: Create version file during build for Streams (apache#7397)
  KAFKA-8319: Make KafkaStreamsTest a non-integration test class (apache#7382)
  KAFKA-6883: Add toUpperCase support to sasl.kerberos.principal.to.local rule (KIP-309)
  KAFKA-8907; Return topic configs in CreateTopics response (KIP-525) (apache#7380)
  MINOR: Address review comments for KIP-504 authorizer changes (apache#7379)
  MINOR: add versioning to request and response headers (apache#7372)
  KAFKA-7273: Extend Connect Converter to support headers (apache#6362)
  MINOR: improve the Kafka RPC code generator (apache#7340)
  MINOR: Improve the org.apache.kafka.common.protocol code (apache#7344)
  KAFKA-8880: Docs on upgrade-guide (apache#7385)
  KAFKA-8179: do not suspend standby tasks during rebalance (apache#7321)
  KAFKA-8580: Compute RocksDB metrics (apache#7263)
  KAFKA-8880: Add overloaded function of Consumer.committed (apache#7304)
  HOTFIX: fix Kafka Streams upgrade note for broker backward compatibility (apache#7363)
  KAFKA-8848; Update system tests to use new AclAuthorizer (apache#7374)
  MINOR: remove unnecessary null check (apache#7299)
  KAFKA-6958: Overload methods for group and windowed stream to allow to name operation name using the new Named class (apache#6413)
  ...
cadonna pushed a commit to cadonna/kafka that referenced this pull request Mar 25, 2020
…e#7382)

Previous KafkaStreamsTest takes 2min20s on my local laptop, because lots of its integration test which is producing / consuming records, and checking state directory file system takes lots of time. On the other hand, these tests should be well simplified with mocks.

This test reduces the test from a clumsy integration test class into a unit tests with mocks of its internal modules. And some other test functions should not be in KafkaStreamsTest actually and have been moved to other modular test classes. Now it takes 2s.

Also it helps removing the potential flakiness of the following (some of them are claimed resolved only because we have not seen them recently, but after looking at the test code I can verify they are still flaky):

* KAFKA-5818 (the original JIRA ticket indeed exposed a real issue that has been fixed, but the test itself remains flaky)
* KAFKA-6215
* KAFKA-7921
* KAFKA-7990
* KAFKA-8319
* KAFKA-8427

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Bruno Cadonna <bruno@confluent.io>

This commit was cherry-picked from trunk and adapted.
guozhangwang pushed a commit that referenced this pull request Mar 25, 2020
#8352)

Previous KafkaStreamsTest takes 2min20s on my local laptop, because lots of its integration test which is producing / consuming records, and checking state directory file system takes lots of time. On the other hand, these tests should be well simplified with mocks.

This test reduces the test from a clumsy integration test class into a unit tests with mocks of its internal modules. And some other test functions should not be in KafkaStreamsTest actually and have been moved to other modular test classes. Now it takes 2s.

Also it helps removing the potential flakiness of the following (some of them are claimed resolved only because we have not seen them recently, but after looking at the test code I can verify they are still flaky):

* KAFKA-5818 (the original JIRA ticket indeed exposed a real issue that has been fixed, but the test itself remains flaky)
* KAFKA-6215
* KAFKA-7921
* KAFKA-7990
* KAFKA-8319
* KAFKA-8427

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Bruno Cadonna <bruno@confluent.io>

This commit was cherry-picked from trunk and adapted.

Co-authored-by: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang guozhangwang deleted the KMinor-KafkaStreamsTest-flaky branch April 24, 2020 23:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants