diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 942dbdde0ae78..ec9f2cf3c4c1c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -150,6 +150,7 @@ + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 85b851d12964f..6ac87ae1fe64a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -81,7 +81,7 @@ public String apply(final Long value1, final String value2) { private ForeachAction foreachAction; @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -212,7 +212,7 @@ public boolean conditionMet() { } - private void createTopics() { + private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; globalOne = "globalOne-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 9397e0331f474..f2a767ceb514e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest { @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -267,7 +267,7 @@ private void produceMessages(long timestamp) } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; outputTopic = "output-" + testNo; CLUSTER.createTopic(streamOneInput, 3, 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 0833f3c8d1fe4..beb41ce68c610 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest { private KStream stream; @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -637,7 +637,7 @@ private void produceMessages(final long timestamp) } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; outputTopic = "output-" + testNo; userSessionsStream = userSessionsStream + "-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 3618f1542c271..0a16494397c71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest { private Properties streamsConfiguration; @Before - public void before() { + public void before() throws InterruptedException { testNo++; userClicksTopic = "user-clicks-" + testNo; userRegionsTopic = "user-regions-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index e8a042ad7c670..6f3c95a3ab536 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -93,7 +93,7 @@ public static Object[] data() { } @Before - public void before() { + public void before() throws InterruptedException { testNo++; String applicationId = "kstream-repartition-join-test-" + testNo; builder = new KStreamBuilder(); @@ -146,7 +146,7 @@ public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception { verifyLeftJoin(leftJoin); } - private ExpectedOutputOnTopic mapStreamOneAndJoin() { + private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException { String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo; doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput); @@ -350,7 +350,7 @@ private void produceToStreamOne() mockTime); } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; streamTwoInput = "stream-two-" + testNo; streamFourInput = "stream-four-" + testNo; @@ -395,7 +395,7 @@ private void verifyCorrectOutput(final List expectedMessages, private void doJoin(final KStream lhs, final KStream rhs, - final String outputTopic) { + final String outputTopic) throws InterruptedException { CLUSTER.createTopic(outputTopic); lhs.join(rhs, TOSTRING_JOINER, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index b8f91faed0341..911c6a89cfe17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -81,6 +81,7 @@ public class QueryableStateIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + public static final int STREAM_THREE_PARTITIONS = 4; private final MockTime mockTime = CLUSTER.time; private String streamOne = "stream-one"; private String streamTwo = "stream-two"; @@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest { private String outputTopicThree = "output-three"; // sufficiently large window size such that everything falls into 1 window private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS); - private static final int NUM_PARTITIONS = 2; + private static final int STREAM_TWO_PARTITIONS = 2; private static final int NUM_REPLICAS = NUM_BROKERS; private Properties streamsConfiguration; private List inputValues; @@ -101,7 +102,7 @@ public class QueryableStateIntegrationTest { private Comparator> stringLongComparator; private static int testNo = 0; - public void createTopics() { + public void createTopics() throws InterruptedException { streamOne = streamOne + "-" + testNo; streamConcurrent = streamConcurrent + "-" + testNo; streamThree = streamThree + "-" + testNo; @@ -111,8 +112,8 @@ public void createTopics() { streamTwo = streamTwo + "-" + testNo; CLUSTER.createTopic(streamOne); CLUSTER.createTopic(streamConcurrent); - CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS); - CLUSTER.createTopic(streamThree, 4, 1); + CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS); + CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1); CLUSTER.createTopic(outputTopic); CLUSTER.createTopic(outputTopicConcurrent); CLUSTER.createTopic(outputTopicThree); @@ -128,7 +129,7 @@ public static Object[] data() { } @Before - public void before() throws IOException { + public void before() throws IOException, InterruptedException { testNo++; createTopics(); streamsConfiguration = new Properties(); @@ -145,7 +146,6 @@ public void before() throws IOException { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - stringComparator = new Comparator>() { @Override @@ -328,7 +328,7 @@ public boolean conditionMet() { @Test public void queryOnRebalance() throws Exception { - final int numThreads = NUM_PARTITIONS; + final int numThreads = STREAM_TWO_PARTITIONS; final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; final Thread[] streamThreads = new Thread[numThreads]; final int numIterations = 500000; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 519b1f5cb14b6..619b6b5ed87e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -18,13 +18,17 @@ package org.apache.kafka.streams.integration.utils; import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; import kafka.utils.MockTime; import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.common.TopicPartition; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; /** @@ -34,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected + public static final int TOPIC_CREATION_TIMEOUT = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private final Properties brokerConfig; @@ -122,7 +127,7 @@ protected void after() { * * @param topic The name of the topic. */ - public void createTopic(final String topic) { + public void createTopic(final String topic) throws InterruptedException { createTopic(topic, 1, 1, new Properties()); } @@ -133,7 +138,7 @@ public void createTopic(final String topic) { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(final String topic, final int partitions, final int replication) { + public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException { createTopic(topic, partitions, replication, new Properties()); } @@ -148,11 +153,24 @@ public void createTopic(final String topic, final int partitions, final int repl public void createTopic(final String topic, final int partitions, final int replication, - final Properties topicConfig) { + final Properties topicConfig) throws InterruptedException { brokers[0].createTopic(topic, partitions, replication, topicConfig); + final List topicPartitions = new ArrayList<>(); + for (int partition = 0; partition < partitions; partition++) { + topicPartitions.add(new TopicPartition(topic, partition)); + } + IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT); } public void deleteTopic(final String topic) { brokers[0].deleteTopic(topic); } + + public List brokers() { + final List servers = new ArrayList<>(); + for (final KafkaEmbedded broker : brokers) { + servers.add(broker.kafkaServer()); + } + return servers; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index aa358ab3322e6..875c3591b881b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -17,6 +17,10 @@ package org.apache.kafka.streams.integration.utils; +import kafka.api.PartitionStateInfo; +import kafka.api.Request; +import kafka.server.KafkaServer; +import kafka.server.MetadataCache; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -24,12 +28,14 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import scala.Option; import java.io.File; import java.io.IOException; @@ -255,4 +261,41 @@ public boolean conditionMet() { return accumData; } + public static void waitForTopicPartitions(final List servers, + final List partitions, + final long timeout) throws InterruptedException { + final long end = System.currentTimeMillis() + timeout; + for (final TopicPartition partition : partitions) { + final long remaining = end - System.currentTimeMillis(); + if (remaining <= 0) { + throw new AssertionError("timed out while waiting for partitions to become available. Timeout=" + timeout); + } + waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining); + } + } + + public static void waitUntilMetadataIsPropagated(final List servers, + final String topic, + final int partition, + final long timeout) throws InterruptedException { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + for (final KafkaServer server : servers) { + final MetadataCache metadataCache = server.apis().metadataCache(); + final Option partitionInfo = + metadataCache.getPartitionInfo(topic, partition); + if (partitionInfo.isEmpty()) { + return false; + } + final PartitionStateInfo partitionStateInfo = partitionInfo.get(); + if (!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) { + return false; + } + } + return true; + } + }, timeout, "metatadata for topic=" + topic + " partition=" + partition + " not propogated to all brokers"); + + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index ac9b670247fa0..0af78b936d7a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -201,4 +201,7 @@ public void deleteTopic(final String topic) { zkClient.close(); } + public KafkaServer kafkaServer() { + return kafka; + } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 555e622237535..73c1b63a2a47b 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -66,4 +66,5 @@ public static List> toList(final Iterator> } return results; } + }