diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index bab5c9f108cf1..4c7602e4f541c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -43,15 +43,15 @@ import java.io.File; import java.io.IOException; import java.time.Duration; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.junit.Assert.assertTrue; /** @@ -63,7 +63,7 @@ public class StandbyTaskEOSIntegrationTest { @Parameterized.Parameters(name = "{0}") public static Collection data() { - return Arrays.asList(new String[][] { + return asList(new String[][] { {StreamsConfig.EXACTLY_ONCE}, {StreamsConfig.EXACTLY_ONCE_BETA} }); @@ -91,7 +91,7 @@ public void createTopics() throws Exception { } @Test - public void surviveWithOneTaskAsStandby() throws Exception { + public void shouldSurviveWithOneTaskAsStandby() throws Exception { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( inputTopic, Collections.singletonList( @@ -111,18 +111,11 @@ public void surviveWithOneTaskAsStandby() throws Exception { final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch); final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch); ) { - streamInstanceOne.start(); - - streamInstanceTwo.start(); + startApplicationAndWaitUntilRunning(asList(streamInstanceOne, streamInstanceTwo), Duration.ofSeconds(60)); // Wait for the record to be processed assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); - waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), - "Stream instance two should be up and running by now"); - streamInstanceOne.close(Duration.ZERO); streamInstanceTwo.close(Duration.ZERO);