From a530dad67a5f519b61c9ecba2343da4cda7f5968 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 16 Jun 2020 13:47:46 -0700 Subject: [PATCH 1/2] use TestUtil, first wait for RUNNING --- .../StandbyTaskEOSIntegrationTest.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index bab5c9f108cf1..ef354d60245da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -50,7 +50,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertTrue; @@ -63,7 +65,7 @@ public class StandbyTaskEOSIntegrationTest { @Parameterized.Parameters(name = "{0}") public static Collection data() { - return Arrays.asList(new String[][] { + return asList(new String[][] { {StreamsConfig.EXACTLY_ONCE}, {StreamsConfig.EXACTLY_ONCE_BETA} }); @@ -91,7 +93,7 @@ public void createTopics() throws Exception { } @Test - public void surviveWithOneTaskAsStandby() throws Exception { + public void shouldSurviveWithOneTaskAsStandby() throws Exception { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( inputTopic, Collections.singletonList( @@ -111,18 +113,11 @@ public void surviveWithOneTaskAsStandby() throws Exception { final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch); final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch); ) { - streamInstanceOne.start(); - - streamInstanceTwo.start(); + startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60)); // Wait for the record to be processed assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); - waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), - "Stream instance two should be up and running by now"); - streamInstanceOne.close(Duration.ZERO); streamInstanceTwo.close(Duration.ZERO); From 46a50f39faaff31f9b90c887d8e692c5c7c81d38 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 16 Jun 2020 18:35:12 -0700 Subject: [PATCH 2/2] checkstyle --- .../streams/integration/StandbyTaskEOSIntegrationTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index ef354d60245da..4c7602e4f541c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -43,7 +43,6 @@ import java.io.File; import java.io.IOException; import java.time.Duration; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Properties; @@ -53,7 +52,6 @@ import static java.util.Arrays.asList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertTrue; /**