From 52f5792a41878a5decf26ad011178737096e0933 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 13 Jan 2017 16:51:49 +0000 Subject: [PATCH 1/5] metadata hack --- .../java/org/apache/kafka/clients/Metadata.java | 9 ++++++++- .../org/apache/kafka/clients/MetadataTest.java | 17 +++++++++++------ .../internals/ConsumerCoordinatorTest.java | 3 ++- .../internals/StreamPartitionAssignor.java | 1 + 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 75d48abf247ac..5cae0a5d30b64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -168,7 +168,11 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs * @param topics */ public synchronized void setTopics(Collection topics) { - if (!this.topics.keySet().containsAll(topics)) { + if (Thread.currentThread().getName().startsWith("StreamThread")) { + System.out.println("setting topics to:" + topics + " was:" + this.topics()); + } + + if (!cluster.topics().containsAll(topics)) { requestUpdateForNewTopics(); } this.topics.clear(); @@ -229,6 +233,9 @@ else if (expireMs <= now) { this.needUpdate = false; this.cluster = getClusterForCurrentTopics(cluster); } else { + if (Thread.currentThread().getName().startsWith("StreamThread")) { + System.out.println("metadata update received " + cluster.topics()); + } this.cluster = cluster; } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index cfd2a941eb9f5..69ba468eed910 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -156,12 +156,6 @@ public void testTimeToNextUpdate_OverwriteBackoff() { metadata.add("new-topic"); assertEquals(0, metadata.timeToNextUpdate(now)); - // Even though setTopics called, immediate update isn't necessary if the new topic set isn't - // containing a new topic, - metadata.update(Cluster.empty(), now); - metadata.setTopics(metadata.topics()); - assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now)); - // If the new set of topics containing a new topic then it should allow immediate update. metadata.setTopics(Collections.singletonList("another-new-topic")); assertEquals(0, metadata.timeToNextUpdate(now)); @@ -405,6 +399,17 @@ public void testNonExpiringMetadata() throws Exception { assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4")); } + @Test + public void shouldRequestMetadataUpdateWhenClusterDoesntContainAllTopics() throws Exception { + final Set topics = Collections.singleton("test-topic"); + metadata.setTopics(topics); + final long now = 10000; + metadata.update(Cluster.empty(), now); + metadata.setTopics(topics); + assertTrue("metadata update should have been requested", metadata.updateRequested()); + assertEquals("time till next update should be 0", 0, metadata.timeToNextUpdate(now)); + } + private Thread asyncFetch(final String topic, final long maxWaitMs) { Thread thread = new Thread() { public void run() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 0637ea4c4e667..5e9f5ecd633d6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -112,9 +112,9 @@ public class ConsumerCoordinatorTest { @Before public void setup() { this.time = new MockTime(); - this.client = new MockClient(time); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE); + this.client = new MockClient(time, metadata); this.metadata.update(cluster, time.milliseconds()); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); @@ -695,6 +695,7 @@ public boolean matches(AbstractRequest body) { client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code())); client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code())); + client.prepareMetadataUpdate(TestUtils.singletonCluster(topic1, 1)); coordinator.poll(time.milliseconds()); assertFalse(coordinator.needRejoin()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 7b48a6fdb784c..d25c6d1e8fb9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -263,6 +263,7 @@ public Subscription subscription(Set topics) { */ @Override public Map assign(Cluster metadata, Map subscriptions) { + System.out.println("assign topics:" + metadata.topics()); // construct the client metadata from the decoded subscription info Map clientsMetadata = new HashMap<>(); From c85facef85cfcfd25c7cae3560f4a098f0a97b92 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 13 Jan 2017 18:07:48 +0000 Subject: [PATCH 2/5] wait for topics to be created --- .../org/apache/kafka/clients/Metadata.java | 9 +------ .../apache/kafka/clients/MetadataTest.java | 17 +++++-------- .../internals/ConsumerCoordinatorTest.java | 3 +-- .../internals/StreamPartitionAssignor.java | 2 -- .../QueryableStateIntegrationTest.java | 9 ++++++- .../utils/EmbeddedKafkaCluster.java | 24 +++++++++++++++++++ 6 files changed, 40 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 5cae0a5d30b64..75d48abf247ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -168,11 +168,7 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs * @param topics */ public synchronized void setTopics(Collection topics) { - if (Thread.currentThread().getName().startsWith("StreamThread")) { - System.out.println("setting topics to:" + topics + " was:" + this.topics()); - } - - if (!cluster.topics().containsAll(topics)) { + if (!this.topics.keySet().containsAll(topics)) { requestUpdateForNewTopics(); } this.topics.clear(); @@ -233,9 +229,6 @@ else if (expireMs <= now) { this.needUpdate = false; this.cluster = getClusterForCurrentTopics(cluster); } else { - if (Thread.currentThread().getName().startsWith("StreamThread")) { - System.out.println("metadata update received " + cluster.topics()); - } this.cluster = cluster; } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 69ba468eed910..cfd2a941eb9f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -156,6 +156,12 @@ public void testTimeToNextUpdate_OverwriteBackoff() { metadata.add("new-topic"); assertEquals(0, metadata.timeToNextUpdate(now)); + // Even though setTopics called, immediate update isn't necessary if the new topic set isn't + // containing a new topic, + metadata.update(Cluster.empty(), now); + metadata.setTopics(metadata.topics()); + assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now)); + // If the new set of topics containing a new topic then it should allow immediate update. metadata.setTopics(Collections.singletonList("another-new-topic")); assertEquals(0, metadata.timeToNextUpdate(now)); @@ -399,17 +405,6 @@ public void testNonExpiringMetadata() throws Exception { assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4")); } - @Test - public void shouldRequestMetadataUpdateWhenClusterDoesntContainAllTopics() throws Exception { - final Set topics = Collections.singleton("test-topic"); - metadata.setTopics(topics); - final long now = 10000; - metadata.update(Cluster.empty(), now); - metadata.setTopics(topics); - assertTrue("metadata update should have been requested", metadata.updateRequested()); - assertEquals("time till next update should be 0", 0, metadata.timeToNextUpdate(now)); - } - private Thread asyncFetch(final String topic, final long maxWaitMs) { Thread thread = new Thread() { public void run() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 5e9f5ecd633d6..0637ea4c4e667 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -112,9 +112,9 @@ public class ConsumerCoordinatorTest { @Before public void setup() { this.time = new MockTime(); + this.client = new MockClient(time); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE); - this.client = new MockClient(time, metadata); this.metadata.update(cluster, time.milliseconds()); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); @@ -695,7 +695,6 @@ public boolean matches(AbstractRequest body) { client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code())); client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code())); - client.prepareMetadataUpdate(TestUtils.singletonCluster(topic1, 1)); coordinator.poll(time.milliseconds()); assertFalse(coordinator.needRejoin()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index d25c6d1e8fb9d..95f9adb135181 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -263,8 +263,6 @@ public Subscription subscription(Set topics) { */ @Override public Map assign(Cluster metadata, Map subscriptions) { - System.out.println("assign topics:" + metadata.topics()); - // construct the client metadata from the decoded subscription info Map clientsMetadata = new HashMap<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index b8f91faed0341..790301b707253 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -128,7 +128,7 @@ public static Object[] data() { } @Before - public void before() throws IOException { + public void before() throws IOException, InterruptedException { testNo++; createTopics(); streamsConfiguration = new Properties(); @@ -145,6 +145,13 @@ public void before() throws IOException { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + CLUSTER.waitForTopics(new StreamsConfig(streamsConfiguration), streamOne, + streamTwo, + streamThree, + streamConcurrent, + outputTopic, + outputTopicConcurrent, + outputTopicThree); stringComparator = new Comparator>() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 519b1f5cb14b6..84a752c703bc2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -20,12 +20,19 @@ import kafka.server.KafkaConfig$; import kafka.utils.MockTime; import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.internals.StreamsKafkaClient; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.Properties; +import java.util.Set; /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. @@ -34,6 +41,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected + public static final int MAX_TOPIC_CREATION_WAIT_TIME = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private final Properties brokerConfig; @@ -155,4 +163,20 @@ public void createTopic(final String topic, public void deleteTopic(final String topic) { brokers[0].deleteTopic(topic); } + + public void waitForTopics(final StreamsConfig config, final String...topics) throws InterruptedException, IOException { + final StreamsKafkaClient streamsKafkaClient = new StreamsKafkaClient(config); + final Set expectedTopics = new HashSet<>(Arrays.asList(topics)); + final long start = System.currentTimeMillis(); + while (!expectedTopics.isEmpty() && System.currentTimeMillis() - start < MAX_TOPIC_CREATION_WAIT_TIME) { + final Collection topicMetadatas = + streamsKafkaClient.fetchTopicMetadata(); + for (MetadataResponse.TopicMetadata topicMetadata : topicMetadatas) { + expectedTopics.remove(topicMetadata.topic()); + } + Thread.sleep(10); + } + streamsKafkaClient.close(); + + } } From d392046e1d8d900e2fb25e7445f13205b872c0e1 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 13 Jan 2017 18:11:50 +0000 Subject: [PATCH 3/5] revert StreamPartitionAssignor --- .../streams/processor/internals/StreamPartitionAssignor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 95f9adb135181..7b48a6fdb784c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -263,6 +263,7 @@ public Subscription subscription(Set topics) { */ @Override public Map assign(Cluster metadata, Map subscriptions) { + // construct the client metadata from the decoded subscription info Map clientsMetadata = new HashMap<>(); From ef7a75ff7a1e35e3171e3087e4b73d1fab105881 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 16 Jan 2017 10:17:54 +0000 Subject: [PATCH 4/5] look in broker metadata cache to see if topic/partitions have been created --- checkstyle/import-control.xml | 1 + .../QueryableStateIntegrationTest.java | 33 +++++++++----- .../utils/EmbeddedKafkaCluster.java | 29 ++++--------- .../utils/IntegrationTestUtils.java | 43 +++++++++++++++++++ .../integration/utils/KafkaEmbedded.java | 3 ++ .../apache/kafka/test/StreamsTestUtils.java | 1 + 6 files changed, 78 insertions(+), 32 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 942dbdde0ae78..ec9f2cf3c4c1c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -150,6 +150,7 @@ + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 790301b707253..b4921b6623e7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -81,6 +82,7 @@ public class QueryableStateIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + public static final int STREAM_THREE_PARTITIONS = 4; private final MockTime mockTime = CLUSTER.time; private String streamOne = "stream-one"; private String streamTwo = "stream-two"; @@ -91,7 +93,7 @@ public class QueryableStateIntegrationTest { private String outputTopicThree = "output-three"; // sufficiently large window size such that everything falls into 1 window private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS); - private static final int NUM_PARTITIONS = 2; + private static final int STREAM_TWO_PARTITIONS = 2; private static final int NUM_REPLICAS = NUM_BROKERS; private Properties streamsConfiguration; private List inputValues; @@ -111,8 +113,8 @@ public void createTopics() { streamTwo = streamTwo + "-" + testNo; CLUSTER.createTopic(streamOne); CLUSTER.createTopic(streamConcurrent); - CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS); - CLUSTER.createTopic(streamThree, 4, 1); + CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS); + CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1); CLUSTER.createTopic(outputTopic); CLUSTER.createTopic(outputTopicConcurrent); CLUSTER.createTopic(outputTopicThree); @@ -145,13 +147,22 @@ public void before() throws IOException, InterruptedException { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - CLUSTER.waitForTopics(new StreamsConfig(streamsConfiguration), streamOne, - streamTwo, - streamThree, - streamConcurrent, - outputTopic, - outputTopicConcurrent, - outputTopicThree); + IntegrationTestUtils.waitForTopicPartitions(CLUSTER.brokers(), + Arrays.asList(new TopicPartition(streamOne, 0), + new TopicPartition(streamTwo, 0), + new TopicPartition(streamTwo, 1), + new TopicPartition(streamThree, 0), + new TopicPartition(streamThree, 1), + new TopicPartition(streamThree, 2), + new TopicPartition(streamThree, 3), + new TopicPartition(streamConcurrent, 0), + new TopicPartition(outputTopic, 0), + new TopicPartition(outputTopicConcurrent, 0), + new TopicPartition(outputTopicThree, 0)), + 30000); + + + stringComparator = new Comparator>() { @@ -335,7 +346,7 @@ public boolean conditionMet() { @Test public void queryOnRebalance() throws Exception { - final int numThreads = NUM_PARTITIONS; + final int numThreads = STREAM_TWO_PARTITIONS; final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; final Thread[] streamThreads = new Thread[numThreads]; final int numIterations = 500000; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 84a752c703bc2..9e3cf9b7a7623 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -18,21 +18,17 @@ package org.apache.kafka.streams.integration.utils; import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; import kafka.utils.MockTime; import kafka.zk.EmbeddedZookeeper; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.internals.StreamsKafkaClient; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; -import java.util.Set; /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. @@ -41,7 +37,6 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected - public static final int MAX_TOPIC_CREATION_WAIT_TIME = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private final Properties brokerConfig; @@ -164,19 +159,11 @@ public void deleteTopic(final String topic) { brokers[0].deleteTopic(topic); } - public void waitForTopics(final StreamsConfig config, final String...topics) throws InterruptedException, IOException { - final StreamsKafkaClient streamsKafkaClient = new StreamsKafkaClient(config); - final Set expectedTopics = new HashSet<>(Arrays.asList(topics)); - final long start = System.currentTimeMillis(); - while (!expectedTopics.isEmpty() && System.currentTimeMillis() - start < MAX_TOPIC_CREATION_WAIT_TIME) { - final Collection topicMetadatas = - streamsKafkaClient.fetchTopicMetadata(); - for (MetadataResponse.TopicMetadata topicMetadata : topicMetadatas) { - expectedTopics.remove(topicMetadata.topic()); - } - Thread.sleep(10); + public List brokers() { + final List servers = new ArrayList<>(); + for (final KafkaEmbedded broker : brokers) { + servers.add(broker.kafkaServer()); } - streamsKafkaClient.close(); - + return servers; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index aa358ab3322e6..875c3591b881b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -17,6 +17,10 @@ package org.apache.kafka.streams.integration.utils; +import kafka.api.PartitionStateInfo; +import kafka.api.Request; +import kafka.server.KafkaServer; +import kafka.server.MetadataCache; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -24,12 +28,14 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import scala.Option; import java.io.File; import java.io.IOException; @@ -255,4 +261,41 @@ public boolean conditionMet() { return accumData; } + public static void waitForTopicPartitions(final List servers, + final List partitions, + final long timeout) throws InterruptedException { + final long end = System.currentTimeMillis() + timeout; + for (final TopicPartition partition : partitions) { + final long remaining = end - System.currentTimeMillis(); + if (remaining <= 0) { + throw new AssertionError("timed out while waiting for partitions to become available. Timeout=" + timeout); + } + waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining); + } + } + + public static void waitUntilMetadataIsPropagated(final List servers, + final String topic, + final int partition, + final long timeout) throws InterruptedException { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + for (final KafkaServer server : servers) { + final MetadataCache metadataCache = server.apis().metadataCache(); + final Option partitionInfo = + metadataCache.getPartitionInfo(topic, partition); + if (partitionInfo.isEmpty()) { + return false; + } + final PartitionStateInfo partitionStateInfo = partitionInfo.get(); + if (!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) { + return false; + } + } + return true; + } + }, timeout, "metatadata for topic=" + topic + " partition=" + partition + " not propogated to all brokers"); + + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index ac9b670247fa0..0af78b936d7a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -201,4 +201,7 @@ public void deleteTopic(final String topic) { zkClient.close(); } + public KafkaServer kafkaServer() { + return kafka; + } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 555e622237535..73c1b63a2a47b 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -66,4 +66,5 @@ public static List> toList(final Iterator> } return results; } + } From 83ed16cb9fef27d4a3e26b6616daa059bab33be5 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 17 Jan 2017 10:15:49 +0000 Subject: [PATCH 5/5] wait for topic to be created in createTopic --- .../GlobalKTableIntegrationTest.java | 4 ++-- ...StreamAggregationDedupIntegrationTest.java | 4 ++-- .../KStreamAggregationIntegrationTest.java | 4 ++-- .../KStreamKTableJoinIntegrationTest.java | 2 +- .../KStreamRepartitionJoinTest.java | 8 ++++---- .../QueryableStateIntegrationTest.java | 20 +------------------ .../utils/EmbeddedKafkaCluster.java | 13 +++++++++--- 7 files changed, 22 insertions(+), 33 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 85b851d12964f..6ac87ae1fe64a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -81,7 +81,7 @@ public String apply(final Long value1, final String value2) { private ForeachAction foreachAction; @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -212,7 +212,7 @@ public boolean conditionMet() { } - private void createTopics() { + private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; globalOne = "globalOne-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 9397e0331f474..f2a767ceb514e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest { @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -267,7 +267,7 @@ private void produceMessages(long timestamp) } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; outputTopic = "output-" + testNo; CLUSTER.createTopic(streamOneInput, 3, 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 0833f3c8d1fe4..beb41ce68c610 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest { private KStream stream; @Before - public void before() { + public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -637,7 +637,7 @@ private void produceMessages(final long timestamp) } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; outputTopic = "output-" + testNo; userSessionsStream = userSessionsStream + "-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 3618f1542c271..0a16494397c71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest { private Properties streamsConfiguration; @Before - public void before() { + public void before() throws InterruptedException { testNo++; userClicksTopic = "user-clicks-" + testNo; userRegionsTopic = "user-regions-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index e8a042ad7c670..6f3c95a3ab536 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -93,7 +93,7 @@ public static Object[] data() { } @Before - public void before() { + public void before() throws InterruptedException { testNo++; String applicationId = "kstream-repartition-join-test-" + testNo; builder = new KStreamBuilder(); @@ -146,7 +146,7 @@ public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception { verifyLeftJoin(leftJoin); } - private ExpectedOutputOnTopic mapStreamOneAndJoin() { + private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException { String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo; doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput); @@ -350,7 +350,7 @@ private void produceToStreamOne() mockTime); } - private void createTopics() { + private void createTopics() throws InterruptedException { streamOneInput = "stream-one-" + testNo; streamTwoInput = "stream-two-" + testNo; streamFourInput = "stream-four-" + testNo; @@ -395,7 +395,7 @@ private void verifyCorrectOutput(final List expectedMessages, private void doJoin(final KStream lhs, final KStream rhs, - final String outputTopic) { + final String outputTopic) throws InterruptedException { CLUSTER.createTopic(outputTopic); lhs.join(rhs, TOSTRING_JOINER, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index b4921b6623e7c..911c6a89cfe17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -103,7 +102,7 @@ public class QueryableStateIntegrationTest { private Comparator> stringLongComparator; private static int testNo = 0; - public void createTopics() { + public void createTopics() throws InterruptedException { streamOne = streamOne + "-" + testNo; streamConcurrent = streamConcurrent + "-" + testNo; streamThree = streamThree + "-" + testNo; @@ -147,23 +146,6 @@ public void before() throws IOException, InterruptedException { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - IntegrationTestUtils.waitForTopicPartitions(CLUSTER.brokers(), - Arrays.asList(new TopicPartition(streamOne, 0), - new TopicPartition(streamTwo, 0), - new TopicPartition(streamTwo, 1), - new TopicPartition(streamThree, 0), - new TopicPartition(streamThree, 1), - new TopicPartition(streamThree, 2), - new TopicPartition(streamThree, 3), - new TopicPartition(streamConcurrent, 0), - new TopicPartition(outputTopic, 0), - new TopicPartition(outputTopicConcurrent, 0), - new TopicPartition(outputTopicThree, 0)), - 30000); - - - - stringComparator = new Comparator>() { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 9e3cf9b7a7623..619b6b5ed87e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -21,6 +21,7 @@ import kafka.server.KafkaServer; import kafka.utils.MockTime; import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.common.TopicPartition; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected + public static final int TOPIC_CREATION_TIMEOUT = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; private final Properties brokerConfig; @@ -125,7 +127,7 @@ protected void after() { * * @param topic The name of the topic. */ - public void createTopic(final String topic) { + public void createTopic(final String topic) throws InterruptedException { createTopic(topic, 1, 1, new Properties()); } @@ -136,7 +138,7 @@ public void createTopic(final String topic) { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(final String topic, final int partitions, final int replication) { + public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException { createTopic(topic, partitions, replication, new Properties()); } @@ -151,8 +153,13 @@ public void createTopic(final String topic, final int partitions, final int repl public void createTopic(final String topic, final int partitions, final int replication, - final Properties topicConfig) { + final Properties topicConfig) throws InterruptedException { brokers[0].createTopic(topic, partitions, replication, topicConfig); + final List topicPartitions = new ArrayList<>(); + for (int partition = 0; partition < partitions; partition++) { + topicPartitions.add(new TopicPartition(topic, partition)); + } + IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT); } public void deleteTopic(final String topic) {