Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@

<subpackage name="integration">
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.server" />
<allow pkg="kafka.tools" />
<allow pkg="kafka.utils" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public String apply(final Long value1, final String value2) {
private ForeachAction<String, String> foreachAction;

@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
Expand Down Expand Up @@ -212,7 +212,7 @@ public boolean conditionMet() {
}


private void createTopics() {
private void createTopics() throws InterruptedException {
inputStream = "input-stream-" + testNo;
inputTable = "input-table-" + testNo;
globalOne = "globalOne-" + testNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest {


@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
Expand Down Expand Up @@ -267,7 +267,7 @@ private void produceMessages(long timestamp)
}


private void createTopics() {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
outputTopic = "output-" + testNo;
CLUSTER.createTopic(streamOneInput, 3, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest {
private KStream<Integer, String> stream;

@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
Expand Down Expand Up @@ -637,7 +637,7 @@ private void produceMessages(final long timestamp)
}


private void createTopics() {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
outputTopic = "output-" + testNo;
userSessionsStream = userSessionsStream + "-" + testNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest {
private Properties streamsConfiguration;

@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
userClicksTopic = "user-clicks-" + testNo;
userRegionsTopic = "user-regions-" + testNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static Object[] data() {
}

@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
String applicationId = "kstream-repartition-join-test-" + testNo;
builder = new KStreamBuilder();
Expand Down Expand Up @@ -146,7 +146,7 @@ public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
verifyLeftJoin(leftJoin);
}

private ExpectedOutputOnTopic mapStreamOneAndJoin() {
private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
Expand Down Expand Up @@ -350,7 +350,7 @@ private void produceToStreamOne()
mockTime);
}

private void createTopics() {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
streamTwoInput = "stream-two-" + testNo;
streamFourInput = "stream-four-" + testNo;
Expand Down Expand Up @@ -395,7 +395,7 @@ private void verifyCorrectOutput(final List<String> expectedMessages,

private void doJoin(final KStream<Integer, Integer> lhs,
final KStream<Integer, String> rhs,
final String outputTopic) {
final String outputTopic) throws InterruptedException {
CLUSTER.createTopic(outputTopic);
lhs.join(rhs,
TOSTRING_JOINER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class QueryableStateIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
private String streamTwo = "stream-two";
Expand All @@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest {
private String outputTopicThree = "output-three";
// sufficiently large window size such that everything falls into 1 window
private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
private static final int NUM_PARTITIONS = 2;
private static final int STREAM_TWO_PARTITIONS = 2;
private static final int NUM_REPLICAS = NUM_BROKERS;
private Properties streamsConfiguration;
private List<String> inputValues;
Expand All @@ -101,7 +102,7 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;

public void createTopics() {
public void createTopics() throws InterruptedException {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
Expand All @@ -111,8 +112,8 @@ public void createTopics() {
streamTwo = streamTwo + "-" + testNo;
CLUSTER.createTopic(streamOne);
CLUSTER.createTopic(streamConcurrent);
CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, 4, 1);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
CLUSTER.createTopic(outputTopic);
CLUSTER.createTopic(outputTopicConcurrent);
CLUSTER.createTopic(outputTopicThree);
Expand All @@ -128,7 +129,7 @@ public static Object[] data() {
}

@Before
public void before() throws IOException {
public void before() throws IOException, InterruptedException {
testNo++;
createTopics();
streamsConfiguration = new Properties();
Expand All @@ -145,7 +146,6 @@ public void before() throws IOException {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);


stringComparator = new Comparator<KeyValue<String, String>>() {

@Override
Expand Down Expand Up @@ -328,7 +328,7 @@ public boolean conditionMet() {

@Test
public void queryOnRebalance() throws Exception {
final int numThreads = NUM_PARTITIONS;
final int numThreads = STREAM_TWO_PARTITIONS;
final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
final Thread[] streamThreads = new Thread[numThreads];
final int numIterations = 500000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.kafka.streams.integration.utils;

import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.TopicPartition;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
Expand All @@ -34,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {

private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
public static final int TOPIC_CREATION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
private final Properties brokerConfig;
Expand Down Expand Up @@ -122,7 +127,7 @@ protected void after() {
*
* @param topic The name of the topic.
*/
public void createTopic(final String topic) {
public void createTopic(final String topic) throws InterruptedException {
createTopic(topic, 1, 1, new Properties());
}

Expand All @@ -133,7 +138,7 @@ public void createTopic(final String topic) {
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (the partitions of) this topic.
*/
public void createTopic(final String topic, final int partitions, final int replication) {
public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException {
createTopic(topic, partitions, replication, new Properties());
}

Expand All @@ -148,11 +153,24 @@ public void createTopic(final String topic, final int partitions, final int repl
public void createTopic(final String topic,
final int partitions,
final int replication,
final Properties topicConfig) {
final Properties topicConfig) throws InterruptedException {
brokers[0].createTopic(topic, partitions, replication, topicConfig);
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (int partition = 0; partition < partitions; partition++) {
topicPartitions.add(new TopicPartition(topic, partition));
}
IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT);
}

public void deleteTopic(final String topic) {
brokers[0].deleteTopic(topic);
}

public List<KafkaServer> brokers() {
final List<KafkaServer> servers = new ArrayList<>();
for (final KafkaEmbedded broker : brokers) {
servers.add(broker.kafkaServer());
}
return servers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@

package org.apache.kafka.streams.integration.utils;

import kafka.api.PartitionStateInfo;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import scala.Option;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -255,4 +261,41 @@ public boolean conditionMet() {
return accumData;
}

public static void waitForTopicPartitions(final List<KafkaServer> servers,
final List<TopicPartition> partitions,
final long timeout) throws InterruptedException {
final long end = System.currentTimeMillis() + timeout;
for (final TopicPartition partition : partitions) {
final long remaining = end - System.currentTimeMillis();
if (remaining <= 0) {
throw new AssertionError("timed out while waiting for partitions to become available. Timeout=" + timeout);
}
waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining);
}
}

public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers,
final String topic,
final int partition,
final long timeout) throws InterruptedException {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
for (final KafkaServer server : servers) {
final MetadataCache metadataCache = server.apis().metadataCache();
final Option<PartitionStateInfo> partitionInfo =
metadataCache.getPartitionInfo(topic, partition);
if (partitionInfo.isEmpty()) {
return false;
}
final PartitionStateInfo partitionStateInfo = partitionInfo.get();
if (!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) {
return false;
}
}
return true;
}
}, timeout, "metatadata for topic=" + topic + " partition=" + partition + " not propogated to all brokers");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,7 @@ public void deleteTopic(final String topic) {
zkClient.close();
}

public KafkaServer kafkaServer() {
return kafka;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>>
}
return results;
}

}