From 7386a3b6be7533e5241b1b08ac498446cbbe397f Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Sat, 23 Apr 2016 10:41:52 -0700 Subject: [PATCH 1/9] Added structure for integration tests --- build.gradle | 2 + checkstyle/import-control.xml | 8 + gradle/dependencies.gradle | 5 +- .../integration/StoresIntegrationTest.java | 137 +++++++++++++ .../utils/EmbeddedSingleNodeKafkaCluster.java | 135 +++++++++++++ .../integration/utils/KafkaEmbedded.java | 183 ++++++++++++++++++ .../integration/utils/ZooKeeperEmbedded.java | 81 ++++++++ .../test/resources/broker-defaults.properties | 34 ++++ 8 files changed, 584 insertions(+), 1 deletion(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java create mode 100644 streams/src/test/resources/broker-defaults.properties diff --git a/build.gradle b/build.gradle index da0152b990f97..6523cdfb12d64 100644 --- a/build.gradle +++ b/build.gradle @@ -669,6 +669,8 @@ project(':streams') { compile libs.jacksonDatabind // this dependency should be removed after KIP-4 testCompile project(':clients').sourceSets.test.output + testCompile project(':core') + testCompile libs.curator testCompile libs.junit } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 051c8d13ef67c..6e4acdb34cf35 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -131,6 +131,14 @@ + + + + + + + + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 8b292e8f9c749..16f6955649878 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -45,6 +45,8 @@ versions += [ snappy: "1.1.2.4", zkclient: "0.8", zookeeper: "3.4.6", + curator: "2.9.0", + assertj: "3.3.0", ] // Add Scala version @@ -102,5 +104,6 @@ libs += [ slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j", snappy: "org.xerial.snappy:snappy-java:$versions.snappy", zkclient: "com.101tec:zkclient:$versions.zkclient", - zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper" + zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", + curator: "org.apache.curator:curator-test:$versions.curator" ] diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java new file mode 100644 index 0000000000000..a027e09112a8d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; + + +public class StoresIntegrationTest { + private static EmbeddedSingleNodeKafkaCluster cluster = null; + private static final String INPUT_TOPIC = "inputTopic"; + private static final String OUTPUT_TOPIC = "outputTopic"; + private static int foundStores = 0; + + @BeforeClass + public static void startKafkaCluster() throws Exception { + cluster = new EmbeddedSingleNodeKafkaCluster(); + cluster.createTopic(INPUT_TOPIC); + cluster.createTopic(OUTPUT_TOPIC); + } + + @AfterClass + public static void stopKafkaCluster() throws IOException { + if (cluster != null) { + cluster.stop(); + } + } + + private static class MyProcessorSupplier implements ProcessorSupplier { + private final int numStores; + MyProcessorSupplier(int numStores) { + this.numStores = numStores; + } + @Override + public Processor get() { + return new Processor() { + private ProcessorContext context; + + @Override + @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + this.context = context; + this.context.schedule(1000); + for (int i = 0; i < numStores; i++) { + String storeName = "Counts" + i; + if (context.getStateStore(storeName) != null) { + foundStores++; + } + } + } + + @Override + public void process(String dummy, String line) { + // do nothing + } + + @Override + public void punctuate(long timestamp) { + // do noting + } + + @Override + public void close() { + // do nothing + } + }; + } + } + + + /** + * This tests the problem described in https://issues.apache.org/jira/browse/KAFKA-3559 + */ + @Test + public void testCreateLargeNumberOfStores() throws Exception { + int numStores = 100; + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stores-integration-test-processor"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zookeeperConnect()); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("Source", "streams-input"); + builder.addProcessor("Process", new MyProcessorSupplier(numStores), "Source"); + + for (int i = 0; i < numStores; i++) { + String storeName = "Counts" + i; + builder.addStateStore(Stores.create(storeName).withStringKeys().withIntegerValues().inMemory().build(), "Process"); + } + builder.addSink("Sink", "streams-output", "Process"); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the stream application would be running forever, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + assertEquals(foundStores, numStores); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java new file mode 100644 index 0000000000000..6e20361b16855 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + +import org.apache.curator.test.InstanceSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + + +/** + * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. + */ +public class EmbeddedSingleNodeKafkaCluster { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); + + private final ZooKeeperEmbedded zookeeper; + private final KafkaEmbedded broker; + + /** + * Creates and starts a Kafka cluster. + */ + public EmbeddedSingleNodeKafkaCluster() throws Exception { + this(new Properties()); + } + + /** + * Creates and starts a Kafka cluster. + * + * @param brokerConfig Additional broker configuration settings. + */ + public EmbeddedSingleNodeKafkaCluster(Properties brokerConfig) throws Exception { + log.debug("Initiating embedded Kafka cluster startup"); + int zkPort = InstanceSpec.getRandomPort(); + log.debug("Starting a ZooKeeper instance on port {} ...", zkPort); + zookeeper = new ZooKeeperEmbedded(zkPort); + log.debug("ZooKeeper instance is running at {}", zookeeper.connectString()); + + Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper); + log.debug("Starting a Kafka instance on port {} ...", effectiveBrokerConfig.getProperty("port")); + broker = new KafkaEmbedded(effectiveBrokerConfig); + broker.start(); + log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", + broker.brokerList(), broker.zookeeperConnect()); + } + + private Properties effectiveBrokerConfigFrom(Properties brokerConfig, ZooKeeperEmbedded zookeeper) { + Properties effectiveConfig = new Properties(); + effectiveConfig.put("zookeeper.connect", zookeeper.connectString()); + int brokerPort = InstanceSpec.getRandomPort(); + effectiveConfig.put("port", String.valueOf(brokerPort)); + effectiveConfig.putAll(brokerConfig); + return effectiveConfig; + } + + /** + * Stop the Kafka cluster. + */ + public void stop() throws IOException { + broker.stop(); + zookeeper.stop(); + } + + /** + * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. + * + * You can use this to tell Kafka producers how to connect to this cluster. + */ + public String bootstrapServers() { + return broker.brokerList(); + } + + /** + * This cluster's ZK connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. + * Example: `127.0.0.1:2181`. + * + * You can use this to e.g. tell Kafka consumers how to connect to this cluster. + */ + public String zookeeperConnect() { + return zookeeper.connectString(); + } + + /** + * Create a Kafka topic with 1 partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic) { + createTopic(topic, 1, 1, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (the partitions of) this topic. + */ + public void createTopic(String topic, int partitions, int replication) { + createTopic(topic, partitions, replication, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(String topic, + int partitions, + int replication, + Properties topicConfig) { + broker.createTopic(topic, partitions, replication, topicConfig); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java new file mode 100644 index 0000000000000..0bb77c967aa7d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.CoreUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +/** + * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by + * default. + * + * Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance + * running at `127.0.0.1:2181`. You can specify a different ZooKeeper instance by setting the + * `zookeeper.connect` parameter in the broker's configuration. + */ +public class KafkaEmbedded { + + private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class); + + private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181"; + private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; + private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; + private final Properties effectiveConfig; + private final File logDir; + private final KafkaServerStartable kafka; + + /** + * @param config Broker configuration settings. Used to modify, for example, on which port the + * broker should listen to. Note that you cannot change the `log.dirs` setting + * currently. + */ + public KafkaEmbedded(Properties config) throws IOException { + logDir = randomTempDirectory(); + effectiveConfig = effectiveConfigFrom(config); + boolean loggingEnabled = true; + KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + kafka = new KafkaServerStartable(kafkaConfig); + } + + private File randomTempDirectory() { + int randomNumber = Math.abs(new Random().nextInt()); + String path = System.getProperty("java.io.tmpdir") + + File.separator + + "kafka-embedded-logs-dir-" + randomNumber; + return new File(path); + } + + private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { + Properties effectiveConfig = new Properties(); + effectiveConfig.load(this.getClass().getResourceAsStream("/broker-defaults.properties")); + effectiveConfig.putAll(initialConfig); + effectiveConfig.setProperty("log.dirs", logDir.getAbsolutePath()); + return effectiveConfig; + } + + /** + * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. + * + * You can use this to tell Kafka producers and consumers how to connect to this instance. + */ + public String brokerList() { + return kafka.serverConfig().hostName() + ":" + kafka.serverConfig().port().toString(); + } + + + /** + * The ZooKeeper connection string aka `zookeeper.connect`. + */ + public String zookeeperConnect() { + return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); + } + + /** + * Start the broker. + */ + public void start() { + log.debug("Starting embedded Kafka broker at {} (with log.dirs={} and ZK ensemble at {}) ...", + brokerList(), logDir, zookeeperConnect()); + kafka.startup(); + log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + } + + /** + * Stop the broker. + */ + public void stop() { + log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + kafka.shutdown(); + kafka.awaitShutdown(); + log.debug("Removing logs.dir at {} ...", logDir); + List logDirs = Collections.singletonList(logDir.getAbsolutePath()); + CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq()); + log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + } + + /** + * Create a Kafka topic with 1 partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic) { + createTopic(topic, 1, 1, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (the partitions of) this topic. + */ + public void createTopic(String topic, int partitions, int replication) { + createTopic(topic, partitions, replication, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(String topic, + int partitions, + int replication, + Properties topicConfig) { + log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", + topic, partitions, replication, topicConfig); + + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient( + zookeeperConnect(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + boolean isSecure = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); + zkClient.close(); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java new file mode 100644 index 0000000000000..a75bbc39a2589 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + +import org.apache.curator.test.TestingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Runs an in-memory, "embedded" instance of a ZooKeeper server. + * + * The ZooKeeper server instance is automatically started when you create a new instance of this + * class. + */ +public class ZooKeeperEmbedded { + + private static final Logger log = LoggerFactory.getLogger(ZooKeeperEmbedded.class); + + private static final int DEFAULT_PORT = 2181; + + private final TestingServer server; + + /** + * Starts a ZooKeeper instance that listens on port 2181. + */ + public ZooKeeperEmbedded() throws Exception { + this(DEFAULT_PORT); + } + + /** + * Starts a ZooKeeper instance that listens at the defined port. + * + * @param port The port (aka `clientPort`) to listen to. Default: 2181. + */ + public ZooKeeperEmbedded(int port) throws Exception { + log.debug("Starting embedded ZooKeeper server on port {} ...", port); + this.server = new TestingServer(port); + } + + public void stop() throws IOException { + log.debug("Shutting down embedded ZooKeeper server at {} ...", server.getConnectString()); + server.close(); + log.debug("Shutdown of embedded ZooKeeper server at {} completed", server.getConnectString()); + } + + /** + * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. + * Example: `127.0.0.1:2181`. + * + * You can use this to e.g. tell Kafka brokers how to connect to this instance. + */ + public String connectString() { + return server.getConnectString(); + } + + /** + * The hostname of the ZooKeeper instance. Example: `127.0.0.1` + */ + public String hostname() { + // "server:1:2:3" -> "server:1:2" + return connectString().substring(0, connectString().lastIndexOf(':')); + } + +} \ No newline at end of file diff --git a/streams/src/test/resources/broker-defaults.properties b/streams/src/test/resources/broker-defaults.properties new file mode 100644 index 0000000000000..bf84b3d29ef96 --- /dev/null +++ b/streams/src/test/resources/broker-defaults.properties @@ -0,0 +1,34 @@ +# See http://kafka.apache.org/documentation.html#brokerconfigs for default values. + +# Each broker is uniquely identified by a non-negative integer id. This id serves as the brokers "name", and allows +# the broker to be moved to a different host/port without confusing consumers. You can choose any number you like so +# long as it is unique. +broker.id=0 + +# Hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all +# interfaces, and publish one to ZK. +host.name=127.0.0.1 + +# The port on which the server accepts client connections. +port=9092 + +# The default number of partitions per topic. +# +num.partitions=1 + +# Enable auto creation of topic on the server. If this is set to true then attempts to produce, consume, or fetch +# metadata for a non-existent topic will automatically create it with the default replication factor and number of +# partitions. +auto.create.topics.enable=true + +# The maximum size of a message that the server can receive. It is important that this property be in sync with the +# maximum fetch size your consumers use or else an unruly consumer will be able to publish messages too large for +# consumers to consume. +# +# Be careful with this setting when producing messages in batches with compression enabled. In such a scenario the +# batch of messages is treated as a single message, and its total size must be smaller than this setting. +# +message.max.bytes=1000000 + +# Enable controlled broker shutdown. +controlled.shutdown.enable=true \ No newline at end of file From 99255fe81be9aa348e23c27ce72d1ad911d1c57a Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Sun, 24 Apr 2016 11:20:16 -0700 Subject: [PATCH 2/9] Removed curator dependencies. Rewrote parts of zookeeper and broker embedded classes. --- build.gradle | 2 +- checkstyle/import-control.xml | 2 +- gradle/dependencies.gradle | 5 +--- .../utils/EmbeddedSingleNodeKafkaCluster.java | 11 ++++--- .../integration/utils/KafkaEmbedded.java | 17 ++++++----- .../integration/utils/ZooKeeperEmbedded.java | 29 +++++-------------- 6 files changed, 25 insertions(+), 41 deletions(-) diff --git a/build.gradle b/build.gradle index 6523cdfb12d64..65bf3b63bd86e 100644 --- a/build.gradle +++ b/build.gradle @@ -670,7 +670,7 @@ project(':streams') { testCompile project(':clients').sourceSets.test.output testCompile project(':core') - testCompile libs.curator + testCompile project(':core').sourceSets.test.output testCompile libs.junit } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 6e4acdb34cf35..a528ea22b4b62 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -136,7 +136,7 @@ - + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 16f6955649878..8b292e8f9c749 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -45,8 +45,6 @@ versions += [ snappy: "1.1.2.4", zkclient: "0.8", zookeeper: "3.4.6", - curator: "2.9.0", - assertj: "3.3.0", ] // Add Scala version @@ -104,6 +102,5 @@ libs += [ slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j", snappy: "org.xerial.snappy:snappy-java:$versions.snappy", zkclient: "com.101tec:zkclient:$versions.zkclient", - zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", - curator: "org.apache.curator:curator-test:$versions.curator" + zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper" ] diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index 6e20361b16855..14f4fa71e2d4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.integration.utils; -import org.apache.curator.test.InstanceSpec; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ public class EmbeddedSingleNodeKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); - + private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected private final ZooKeeperEmbedded zookeeper; private final KafkaEmbedded broker; @@ -49,9 +49,8 @@ public EmbeddedSingleNodeKafkaCluster() throws Exception { */ public EmbeddedSingleNodeKafkaCluster(Properties brokerConfig) throws Exception { log.debug("Initiating embedded Kafka cluster startup"); - int zkPort = InstanceSpec.getRandomPort(); - log.debug("Starting a ZooKeeper instance on port {} ...", zkPort); - zookeeper = new ZooKeeperEmbedded(zkPort); + log.debug("Starting a ZooKeeper instance"); + zookeeper = new ZooKeeperEmbedded(); log.debug("ZooKeeper instance is running at {}", zookeeper.connectString()); Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper); @@ -65,7 +64,7 @@ public EmbeddedSingleNodeKafkaCluster(Properties brokerConfig) throws Exception private Properties effectiveBrokerConfigFrom(Properties brokerConfig, ZooKeeperEmbedded zookeeper) { Properties effectiveConfig = new Properties(); effectiveConfig.put("zookeeper.connect", zookeeper.connectString()); - int brokerPort = InstanceSpec.getRandomPort(); + int brokerPort = DEFAULT_BROKER_PORT; effectiveConfig.put("port", String.valueOf(brokerPort)); effectiveConfig.putAll(brokerConfig); return effectiveConfig; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 0bb77c967aa7d..53f51ab491948 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration.utils; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +36,10 @@ import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; import kafka.utils.CoreUtils; +import kafka.utils.SystemTime$; +import kafka.utils.TestUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; @@ -44,9 +47,7 @@ * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by * default. * - * Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance - * running at `127.0.0.1:2181`. You can specify a different ZooKeeper instance by setting the - * `zookeeper.connect` parameter in the broker's configuration. + * Requires a running ZooKeeper instance to connect to. */ public class KafkaEmbedded { @@ -57,7 +58,7 @@ public class KafkaEmbedded { private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; private final Properties effectiveConfig; private final File logDir; - private final KafkaServerStartable kafka; + private final KafkaServer kafka; /** * @param config Broker configuration settings. Used to modify, for example, on which port the @@ -69,7 +70,7 @@ public KafkaEmbedded(Properties config) throws IOException { effectiveConfig = effectiveConfigFrom(config); boolean loggingEnabled = true; KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); - kafka = new KafkaServerStartable(kafkaConfig); + kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); } private File randomTempDirectory() { @@ -94,7 +95,7 @@ private Properties effectiveConfigFrom(Properties initialConfig) throws IOExcept * You can use this to tell Kafka producers and consumers how to connect to this instance. */ public String brokerList() { - return kafka.serverConfig().hostName() + ":" + kafka.serverConfig().port().toString(); + return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT); } @@ -111,7 +112,7 @@ public String zookeeperConnect() { public void start() { log.debug("Starting embedded Kafka broker at {} (with log.dirs={} and ZK ensemble at {}) ...", brokerList(), logDir, zookeeperConnect()); - kafka.startup(); + // already started in constructore log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java index a75bbc39a2589..36676f2f4a9a3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.integration.utils; -import org.apache.curator.test.TestingServer; +import kafka.zk.EmbeddedZookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,32 +32,19 @@ public class ZooKeeperEmbedded { private static final Logger log = LoggerFactory.getLogger(ZooKeeperEmbedded.class); - - private static final int DEFAULT_PORT = 2181; - - private final TestingServer server; + private EmbeddedZookeeper zookeeper = null; /** * Starts a ZooKeeper instance that listens on port 2181. */ public ZooKeeperEmbedded() throws Exception { - this(DEFAULT_PORT); - } - - /** - * Starts a ZooKeeper instance that listens at the defined port. - * - * @param port The port (aka `clientPort`) to listen to. Default: 2181. - */ - public ZooKeeperEmbedded(int port) throws Exception { - log.debug("Starting embedded ZooKeeper server on port {} ...", port); - this.server = new TestingServer(port); + zookeeper = new EmbeddedZookeeper(); } public void stop() throws IOException { - log.debug("Shutting down embedded ZooKeeper server at {} ...", server.getConnectString()); - server.close(); - log.debug("Shutdown of embedded ZooKeeper server at {} completed", server.getConnectString()); + log.debug("Shutting down embedded ZooKeeper server at {} ...", connectString()); + zookeeper.shutdown(); + log.debug("Shutdown of embedded ZooKeeper server at {} completed", connectString()); } /** @@ -67,7 +54,7 @@ public void stop() throws IOException { * You can use this to e.g. tell Kafka brokers how to connect to this instance. */ public String connectString() { - return server.getConnectString(); + return "localhost:" + zookeeper.port(); } /** @@ -75,7 +62,7 @@ public String connectString() { */ public String hostname() { // "server:1:2:3" -> "server:1:2" - return connectString().substring(0, connectString().lastIndexOf(':')); + return "localhost"; } } \ No newline at end of file From dfbbac40900d68c0660a1b6471cf792a86e73172 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Sun, 24 Apr 2016 11:29:17 -0700 Subject: [PATCH 3/9] Removed unneeded Zookeeper class in favor of the one inluded with Kafka --- .../integration/StoresIntegrationTest.java | 4 +- .../utils/EmbeddedSingleNodeKafkaCluster.java | 35 +++++----- .../integration/utils/ZooKeeperEmbedded.java | 68 ------------------- 3 files changed, 20 insertions(+), 87 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java index a027e09112a8d..1fff281fea3e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java @@ -102,11 +102,11 @@ public void close() { */ @Test public void testCreateLargeNumberOfStores() throws Exception { - int numStores = 100; + int numStores = 10; Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stores-integration-test-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zookeeperConnect()); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString()); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index 14f4fa71e2d4a..dd3cb2f1fd23b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.integration.utils; - +import kafka.zk.EmbeddedZookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ public class EmbeddedSingleNodeKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected - private final ZooKeeperEmbedded zookeeper; + private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded broker; /** @@ -50,10 +50,10 @@ public EmbeddedSingleNodeKafkaCluster() throws Exception { public EmbeddedSingleNodeKafkaCluster(Properties brokerConfig) throws Exception { log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); - zookeeper = new ZooKeeperEmbedded(); - log.debug("ZooKeeper instance is running at {}", zookeeper.connectString()); + zookeeper = new EmbeddedZookeeper(); + log.debug("ZooKeeper instance is running at {}", zKConnectString()); - Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig, zookeeper); + Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig); log.debug("Starting a Kafka instance on port {} ...", effectiveBrokerConfig.getProperty("port")); broker = new KafkaEmbedded(effectiveBrokerConfig); broker.start(); @@ -61,9 +61,9 @@ public EmbeddedSingleNodeKafkaCluster(Properties brokerConfig) throws Exception broker.brokerList(), broker.zookeeperConnect()); } - private Properties effectiveBrokerConfigFrom(Properties brokerConfig, ZooKeeperEmbedded zookeeper) { + private Properties effectiveBrokerConfigFrom(Properties brokerConfig) { Properties effectiveConfig = new Properties(); - effectiveConfig.put("zookeeper.connect", zookeeper.connectString()); + effectiveConfig.put("zookeeper.connect", zKConnectString()); int brokerPort = DEFAULT_BROKER_PORT; effectiveConfig.put("port", String.valueOf(brokerPort)); effectiveConfig.putAll(brokerConfig); @@ -75,28 +75,29 @@ private Properties effectiveBrokerConfigFrom(Properties brokerConfig, ZooKeeperE */ public void stop() throws IOException { broker.stop(); - zookeeper.stop(); + zookeeper.shutdown(); } /** - * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. + * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. + * Example: `127.0.0.1:2181`. * - * You can use this to tell Kafka producers how to connect to this cluster. + * You can use this to e.g. tell Kafka brokers how to connect to this instance. */ - public String bootstrapServers() { - return broker.brokerList(); + public String zKConnectString() { + return "localhost:" + zookeeper.port(); } /** - * This cluster's ZK connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. - * Example: `127.0.0.1:2181`. + * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. * - * You can use this to e.g. tell Kafka consumers how to connect to this cluster. + * You can use this to tell Kafka producers how to connect to this cluster. */ - public String zookeeperConnect() { - return zookeeper.connectString(); + public String bootstrapServers() { + return broker.brokerList(); } + /** * Create a Kafka topic with 1 partition and a replication factor of 1. * diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java deleted file mode 100644 index 36676f2f4a9a3..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/ZooKeeperEmbedded.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.integration.utils; - -import kafka.zk.EmbeddedZookeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Runs an in-memory, "embedded" instance of a ZooKeeper server. - * - * The ZooKeeper server instance is automatically started when you create a new instance of this - * class. - */ -public class ZooKeeperEmbedded { - - private static final Logger log = LoggerFactory.getLogger(ZooKeeperEmbedded.class); - private EmbeddedZookeeper zookeeper = null; - - /** - * Starts a ZooKeeper instance that listens on port 2181. - */ - public ZooKeeperEmbedded() throws Exception { - zookeeper = new EmbeddedZookeeper(); - } - - public void stop() throws IOException { - log.debug("Shutting down embedded ZooKeeper server at {} ...", connectString()); - zookeeper.shutdown(); - log.debug("Shutdown of embedded ZooKeeper server at {} completed", connectString()); - } - - /** - * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. - * Example: `127.0.0.1:2181`. - * - * You can use this to e.g. tell Kafka brokers how to connect to this instance. - */ - public String connectString() { - return "localhost:" + zookeeper.port(); - } - - /** - * The hostname of the ZooKeeper instance. Example: `127.0.0.1` - */ - public String hostname() { - // "server:1:2:3" -> "server:1:2" - return "localhost"; - } - -} \ No newline at end of file From dcdb8e00c9a219549d341df8ba964f887412d4b9 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Sun, 24 Apr 2016 14:41:24 -0700 Subject: [PATCH 4/9] Further trimming of unnecessary code. Using Rules as per Damian's suggestion --- .../integration/StoresIntegrationTest.java | 17 ++++---------- .../utils/EmbeddedSingleNodeKafkaCluster.java | 22 +++++++++++++------ .../integration/utils/KafkaEmbedded.java | 16 +++++--------- 3 files changed, 25 insertions(+), 30 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java index 1fff281fea3e3..4e59e124a5e56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java @@ -24,37 +24,28 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.Stores; -import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.ClassRule; import static org.junit.Assert.assertEquals; - -import java.io.IOException; import java.util.Properties; import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; public class StoresIntegrationTest { - private static EmbeddedSingleNodeKafkaCluster cluster = null; + @ClassRule + public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster(); private static final String INPUT_TOPIC = "inputTopic"; private static final String OUTPUT_TOPIC = "outputTopic"; private static int foundStores = 0; @BeforeClass public static void startKafkaCluster() throws Exception { - cluster = new EmbeddedSingleNodeKafkaCluster(); cluster.createTopic(INPUT_TOPIC); cluster.createTopic(OUTPUT_TOPIC); } - @AfterClass - public static void stopKafkaCluster() throws IOException { - if (cluster != null) { - cluster.stop(); - } - } - private static class MyProcessorSupplier implements ProcessorSupplier { private final int numStores; MyProcessorSupplier(int numStores) { @@ -102,7 +93,7 @@ public void close() { */ @Test public void testCreateLargeNumberOfStores() throws Exception { - int numStores = 10; + int numStores = 100; Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stores-integration-test-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index dd3cb2f1fd23b..af078616f7706 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -23,31 +23,32 @@ import java.io.IOException; import java.util.Properties; - +import org.junit.rules.ExternalResource; /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. */ -public class EmbeddedSingleNodeKafkaCluster { +public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected private EmbeddedZookeeper zookeeper = null; - private final KafkaEmbedded broker; + private KafkaEmbedded broker = null; /** * Creates and starts a Kafka cluster. */ - public EmbeddedSingleNodeKafkaCluster() throws Exception { - this(new Properties()); + public void start() throws IOException, InterruptedException { + start(new Properties()); } + /** * Creates and starts a Kafka cluster. * * @param brokerConfig Additional broker configuration settings. */ - public EmbeddedSingleNodeKafkaCluster(Properties brokerConfig) throws Exception { + public void start(Properties brokerConfig) throws IOException, InterruptedException { log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); zookeeper = new EmbeddedZookeeper(); @@ -73,7 +74,7 @@ private Properties effectiveBrokerConfigFrom(Properties brokerConfig) { /** * Stop the Kafka cluster. */ - public void stop() throws IOException { + public void stop() { broker.stop(); zookeeper.shutdown(); } @@ -97,6 +98,13 @@ public String bootstrapServers() { return broker.brokerList(); } + protected void before() throws Throwable { + start(); + } + + protected void after() { + stop(); + } /** * Create a Kafka topic with 1 partition and a replication factor of 1. diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 53f51ab491948..c09e9f1c6eecd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -31,7 +31,6 @@ import java.io.File; import java.util.Collections; import java.util.List; -import java.util.Random; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; @@ -42,7 +41,7 @@ import kafka.utils.TestUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; - +import org.junit.rules.TemporaryFolder; /** * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by * default. @@ -58,6 +57,7 @@ public class KafkaEmbedded { private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; private final Properties effectiveConfig; private final File logDir; + public final TemporaryFolder tmpFolder; private final KafkaServer kafka; /** @@ -66,20 +66,15 @@ public class KafkaEmbedded { * currently. */ public KafkaEmbedded(Properties config) throws IOException { - logDir = randomTempDirectory(); + tmpFolder = new TemporaryFolder(); + tmpFolder.create(); + logDir = tmpFolder.newFolder(); effectiveConfig = effectiveConfigFrom(config); boolean loggingEnabled = true; KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); } - private File randomTempDirectory() { - int randomNumber = Math.abs(new Random().nextInt()); - String path = System.getProperty("java.io.tmpdir") + - File.separator + - "kafka-embedded-logs-dir-" + randomNumber; - return new File(path); - } private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { Properties effectiveConfig = new Properties(); @@ -128,6 +123,7 @@ public void stop() { log.debug("Removing logs.dir at {} ...", logDir); List logDirs = Collections.singletonList(logDir.getAbsolutePath()); CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq()); + tmpFolder.delete(); log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect()); } From 5fcae7e292a55e7e72b291ce590ee76d8ea228b6 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Mon, 25 Apr 2016 11:43:26 -0700 Subject: [PATCH 5/9] Swaped stores test for the compact topic test, addressed other comments --- checkstyle/import-control.xml | 3 + .../InternalTopicIntegrationTest.java | 190 ++++++++++++++++++ .../integration/StoresIntegrationTest.java | 128 ------------ .../utils/EmbeddedSingleNodeKafkaCluster.java | 2 +- .../utils/IntegrationTestUtils.java | 159 +++++++++++++++ .../integration/utils/KafkaEmbedded.java | 23 ++- 6 files changed, 365 insertions(+), 140 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java delete mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a528ea22b4b62..0ec8232fd6de9 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -137,6 +137,9 @@ + + + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java new file mode 100644 index 0000000000000..ecac4bb2b572c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; + +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import kafka.admin.AdminUtils; +import kafka.log.LogConfig; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.Map; + +/** + * Tests related to internal topics in streams + * + * Note: This example uses lambda expressions and thus works with Java 8+ only. + */ +public class InternalTopicIntegrationTest { + @ClassRule + public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster(); + private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; + private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; + private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; + private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; + + @BeforeClass + public static void startKafkaCluster() throws Exception { + cluster.createTopic(DEFAULT_INPUT_TOPIC); + cluster.createTopic(DEFAULT_OUTPUT_TOPIC); + } + + @AfterClass + public static void stopKafkaCluster() throws IOException { + if (cluster != null) { + cluster.stop(); + } + } + + + /** + * Validates that any state changelog topics are compacted + * @return true if topics have a valid config, false otherwise + */ + private boolean isUsingCompactionForStateChangelogTopics() { + boolean valid = true; + + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient( + cluster.zKConnectString(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + boolean isSecure = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure); + + Map topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils); + Iterator it = topicConfigs.iterator(); + while (it.hasNext()) { + Tuple2 topicConfig = (Tuple2) it.next(); + String topic = topicConfig._1; + Properties prop = topicConfig._2; + + // state changelogs should be compacted + if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { + if (!prop.containsKey(LogConfig.CleanupPolicyProp()) || + !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) { + valid = false; + break; + } + } + } + zkClient.close(); + return valid; + } + + @Test + public void shouldValidateCompactTopics() throws Exception { + List inputValues = Arrays.asList("hello", "world", "world", "hello world"); + + // + // Step 1: Configure and start a simple word count topology + // + final Serde stringSerde = Serdes.String(); + final Serde longSerde = Serdes.Long(); + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString()); + streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC); + + KStream wordCounts = textLines + .flatMapValues(new ValueMapper>() { + @Override + public Iterable apply(String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }).map(new KeyValueMapper>() { + @Override + public KeyValue apply(String key, String value) { + return new KeyValue(value, value); + } + }).countByKey("Counts").toStream(); + + wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); + + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all + // of the input data we produce below). + // Note: The sleep times are relatively high to support running the build on Travis CI. + Thread.sleep(5000); + + // + // Step 2: Produce some input data to the input topic. + // + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); + + // Give the stream processing application some time to do its work. + // Note: The sleep times are relatively high to support running the build on Travis CI. + Thread.sleep(10000); + streams.close(); + + // + // Step 3: Verify the state changelog topics are compact + // + assertEquals(isUsingCompactionForStateChangelogTopics(), true); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java deleted file mode 100644 index 4e59e124a5e56..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoresIntegrationTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.Stores; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.ClassRule; -import static org.junit.Assert.assertEquals; -import java.util.Properties; - -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; - - -public class StoresIntegrationTest { - @ClassRule - public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster(); - private static final String INPUT_TOPIC = "inputTopic"; - private static final String OUTPUT_TOPIC = "outputTopic"; - private static int foundStores = 0; - - @BeforeClass - public static void startKafkaCluster() throws Exception { - cluster.createTopic(INPUT_TOPIC); - cluster.createTopic(OUTPUT_TOPIC); - } - - private static class MyProcessorSupplier implements ProcessorSupplier { - private final int numStores; - MyProcessorSupplier(int numStores) { - this.numStores = numStores; - } - @Override - public Processor get() { - return new Processor() { - private ProcessorContext context; - - @Override - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - this.context = context; - this.context.schedule(1000); - for (int i = 0; i < numStores; i++) { - String storeName = "Counts" + i; - if (context.getStateStore(storeName) != null) { - foundStores++; - } - } - } - - @Override - public void process(String dummy, String line) { - // do nothing - } - - @Override - public void punctuate(long timestamp) { - // do noting - } - - @Override - public void close() { - // do nothing - } - }; - } - } - - - /** - * This tests the problem described in https://issues.apache.org/jira/browse/KAFKA-3559 - */ - @Test - public void testCreateLargeNumberOfStores() throws Exception { - int numStores = 100; - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stores-integration-test-processor"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString()); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.addSource("Source", "streams-input"); - builder.addProcessor("Process", new MyProcessorSupplier(numStores), "Source"); - - for (int i = 0; i < numStores; i++) { - String storeName = "Counts" + i; - builder.addStateStore(Stores.create(storeName).withStringKeys().withIntegerValues().inMemory().build(), "Process"); - } - builder.addSink("Sink", "streams-output", "Process"); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the stream application would be running forever, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - assertEquals(foundStores, numStores); - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index af078616f7706..5e2c2618c350d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -57,7 +57,7 @@ public void start(Properties brokerConfig) throws IOException, InterruptedExcept Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig); log.debug("Starting a Kafka instance on port {} ...", effectiveBrokerConfig.getProperty("port")); broker = new KafkaEmbedded(effectiveBrokerConfig); - broker.start(); + log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", broker.brokerList(), broker.zookeeperConnect()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java new file mode 100644 index 0000000000000..192a5efa937a1 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import kafka.utils.CoreUtils; + +/** + * Utility functions to make integration testing more convenient. + */ +public class IntegrationTestUtils { + + private static final int UNLIMITED_MESSAGES = -1; + + /** + * Returns up to `maxMessages` message-values from the topic. + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @param maxMessages Maximum number of messages to read via the consumer. + * @return The values retrieved via the consumer. + */ + public static List readValues(String topic, Properties consumerConfig, int maxMessages) { + List returnList = new ArrayList<>(); + List> kvs = readKeyValues(topic, consumerConfig, maxMessages); + for (KeyValue kv : kvs) { + returnList.add(kv.value); + } + return returnList; + } + + /** + * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is + * reached. + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @return The KeyValue elements retrieved via the consumer. + */ + public static List> readKeyValues(String topic, Properties consumerConfig) { + return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES); + } + + /** + * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from + * are already configured in the consumer). + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @param maxMessages Maximum number of messages to read via the consumer + * @return The KeyValue elements retrieved via the consumer + */ + public static List> readKeyValues(String topic, Properties consumerConfig, int maxMessages) { + KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); + consumer.subscribe(Collections.singletonList(topic)); + int pollIntervalMs = 100; + int maxTotalPollTimeMs = 2000; + int totalPollTimeMs = 0; + List> consumedValues = new ArrayList<>(); + while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { + totalPollTimeMs += pollIntervalMs; + ConsumerRecords records = consumer.poll(pollIntervalMs); + for (ConsumerRecord record : records) { + consumedValues.add(new KeyValue<>(record.key(), record.value())); + } + } + consumer.close(); + return consumedValues; + } + + private static boolean continueConsuming(int messagesConsumed, int maxMessages) { + return maxMessages <= 0 || messagesConsumed < maxMessages; + } + + /** + * Removes local state stores. Useful to reset state in-between integration test runs. + * + * @param streamsConfiguration Streams configuration settings + */ + public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException { + String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); + if (path != null) { + File node = Paths.get(path).normalize().toFile(); + // Only purge state when it's under /tmp. This is a safety net to prevent accidentally + // deleting important local directory trees. + if (node.getAbsolutePath().startsWith("/tmp")) { + List nodes = Collections.singletonList(node.getAbsolutePath()); + CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(nodes).seq()); + } + } + } + + /** + * @param topic Kafka topic to write the data records to + * @param records Data records to write to Kafka + * @param producerConfig Kafka producer configuration + * @param Key type of the data records + * @param Value type of the data records + */ + public static void produceKeyValuesSynchronously( + String topic, Collection> records, Properties producerConfig) + throws ExecutionException, InterruptedException { + Producer producer = new KafkaProducer<>(producerConfig); + for (KeyValue record : records) { + Future f = producer.send( + new ProducerRecord<>(topic, record.key, record.value)); + f.get(); + } + producer.flush(); + producer.close(); + } + + public static void produceValuesSynchronously( + String topic, Collection records, Properties producerConfig) + throws ExecutionException, InterruptedException { + Collection> keyedRecords = new ArrayList<>(); + for (V value : records) { + KeyValue kv = new KeyValue<>(null, value); + keyedRecords.add(kv); + } + produceKeyValuesSynchronously(topic, keyedRecords, producerConfig); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index c09e9f1c6eecd..daf6fa5e44d63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -61,6 +61,7 @@ public class KafkaEmbedded { private final KafkaServer kafka; /** + * Creates and starts an embedded Kafka broker. * @param config Broker configuration settings. Used to modify, for example, on which port the * broker should listen to. Note that you cannot change the `log.dirs` setting * currently. @@ -72,10 +73,21 @@ public KafkaEmbedded(Properties config) throws IOException { effectiveConfig = effectiveConfigFrom(config); boolean loggingEnabled = true; KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", + logDir, zookeeperConnect()); kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); + log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); } + /** + * Creates the configuration for starting the Kafka broker by merging default values with + * overwrites. + * @param initialConfig Broker configuration settings that override the default config. + * @return + * @throws IOException + */ private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { Properties effectiveConfig = new Properties(); effectiveConfig.load(this.getClass().getResourceAsStream("/broker-defaults.properties")); @@ -101,17 +113,6 @@ public String zookeeperConnect() { return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); } - /** - * Start the broker. - */ - public void start() { - log.debug("Starting embedded Kafka broker at {} (with log.dirs={} and ZK ensemble at {}) ...", - brokerList(), logDir, zookeeperConnect()); - // already started in constructore - log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", - brokerList(), zookeeperConnect()); - } - /** * Stop the broker. */ From 886b0b6099aa213c4c70b3587e53f986bf3fbc3c Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Mon, 25 Apr 2016 16:42:06 -0700 Subject: [PATCH 6/9] Addressed comments --- .../integration/InternalTopicIntegrationTest.java | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index ecac4bb2b572c..2e6a715caf943 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -54,8 +54,6 @@ /** * Tests related to internal topics in streams - * - * Note: This example uses lambda expressions and thus works with Java 8+ only. */ public class InternalTopicIntegrationTest { @ClassRule @@ -119,7 +117,7 @@ private boolean isUsingCompactionForStateChangelogTopics() { } @Test - public void shouldValidateCompactTopics() throws Exception { + public void shouldCompactTopicsForStateChangelogs() throws Exception { List inputValues = Arrays.asList("hello", "world", "world", "hello world"); // @@ -161,11 +159,6 @@ public KeyValue apply(String key, String value) { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - // Note: The sleep times are relatively high to support running the build on Travis CI. - Thread.sleep(5000); - // // Step 2: Produce some input data to the input topic. // @@ -177,14 +170,10 @@ public KeyValue apply(String key, String value) { producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); - // Give the stream processing application some time to do its work. - // Note: The sleep times are relatively high to support running the build on Travis CI. - Thread.sleep(10000); - streams.close(); - // // Step 3: Verify the state changelog topics are compact // + streams.close(); assertEquals(isUsingCompactionForStateChangelogTopics(), true); } } From 421f26c8e63f955eae00d2f639c31a4a6adb3eea Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Tue, 26 Apr 2016 06:05:18 -0700 Subject: [PATCH 7/9] Removed unneeded AfterClass method as per dguy's comments --- .../integration/InternalTopicIntegrationTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 2e6a715caf943..2a3e7670e2878 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -31,12 +31,10 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import static org.junit.Assert.assertEquals; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; @@ -69,14 +67,6 @@ public static void startKafkaCluster() throws Exception { cluster.createTopic(DEFAULT_OUTPUT_TOPIC); } - @AfterClass - public static void stopKafkaCluster() throws IOException { - if (cluster != null) { - cluster.stop(); - } - } - - /** * Validates that any state changelog topics are compacted * @return true if topics have a valid config, false otherwise From dabdcf72ea80e311cb29340ac72c11df4337e1cc Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Tue, 26 Apr 2016 06:19:43 -0700 Subject: [PATCH 8/9] Simplify delete as per Ismael's suggestion --- .../streams/integration/utils/IntegrationTestUtils.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 192a5efa937a1..89fe0c4ef943e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -38,8 +39,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import kafka.utils.CoreUtils; - /** * Utility functions to make integration testing more convenient. */ @@ -119,8 +118,7 @@ public static void purgeLocalStreamsState(Properties streamsConfiguration) throw // Only purge state when it's under /tmp. This is a safety net to prevent accidentally // deleting important local directory trees. if (node.getAbsolutePath().startsWith("/tmp")) { - List nodes = Collections.singletonList(node.getAbsolutePath()); - CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(nodes).seq()); + Utils.delete(new File(node.getAbsolutePath())); } } } From b6828960dd78805c753249fde5cb42158b2d2ef8 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Tue, 26 Apr 2016 09:54:12 -0700 Subject: [PATCH 9/9] Removed config file --- .../utils/EmbeddedSingleNodeKafkaCluster.java | 27 ++++----------- .../integration/utils/KafkaEmbedded.java | 12 +++++-- .../test/resources/broker-defaults.properties | 34 ------------------- 3 files changed, 16 insertions(+), 57 deletions(-) delete mode 100644 streams/src/test/resources/broker-defaults.properties diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index 5e2c2618c350d..34753ae2e672f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration.utils; +import kafka.server.KafkaConfig$; import kafka.zk.EmbeddedZookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,38 +40,22 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { * Creates and starts a Kafka cluster. */ public void start() throws IOException, InterruptedException { - start(new Properties()); - } - + Properties brokerConfig = new Properties(); - /** - * Creates and starts a Kafka cluster. - * - * @param brokerConfig Additional broker configuration settings. - */ - public void start(Properties brokerConfig) throws IOException, InterruptedException { log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); zookeeper = new EmbeddedZookeeper(); log.debug("ZooKeeper instance is running at {}", zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); - Properties effectiveBrokerConfig = effectiveBrokerConfigFrom(brokerConfig); - log.debug("Starting a Kafka instance on port {} ...", effectiveBrokerConfig.getProperty("port")); - broker = new KafkaEmbedded(effectiveBrokerConfig); + log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); + broker = new KafkaEmbedded(brokerConfig); log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", broker.brokerList(), broker.zookeeperConnect()); } - private Properties effectiveBrokerConfigFrom(Properties brokerConfig) { - Properties effectiveConfig = new Properties(); - effectiveConfig.put("zookeeper.connect", zKConnectString()); - int brokerPort = DEFAULT_BROKER_PORT; - effectiveConfig.put("port", String.valueOf(brokerPort)); - effectiveConfig.putAll(brokerConfig); - return effectiveConfig; - } - /** * Stop the Kafka cluster. */ diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index daf6fa5e44d63..348b46b5c7112 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -35,6 +35,7 @@ import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; import kafka.server.KafkaServer; import kafka.utils.CoreUtils; import kafka.utils.SystemTime$; @@ -90,9 +91,16 @@ public KafkaEmbedded(Properties config) throws IOException { */ private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { Properties effectiveConfig = new Properties(); - effectiveConfig.load(this.getClass().getResourceAsStream("/broker-defaults.properties")); + effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0); + effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1"); + effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092"); + effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1); + effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); + effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000); + effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); + effectiveConfig.putAll(initialConfig); - effectiveConfig.setProperty("log.dirs", logDir.getAbsolutePath()); + effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath()); return effectiveConfig; } diff --git a/streams/src/test/resources/broker-defaults.properties b/streams/src/test/resources/broker-defaults.properties deleted file mode 100644 index bf84b3d29ef96..0000000000000 --- a/streams/src/test/resources/broker-defaults.properties +++ /dev/null @@ -1,34 +0,0 @@ -# See http://kafka.apache.org/documentation.html#brokerconfigs for default values. - -# Each broker is uniquely identified by a non-negative integer id. This id serves as the brokers "name", and allows -# the broker to be moved to a different host/port without confusing consumers. You can choose any number you like so -# long as it is unique. -broker.id=0 - -# Hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all -# interfaces, and publish one to ZK. -host.name=127.0.0.1 - -# The port on which the server accepts client connections. -port=9092 - -# The default number of partitions per topic. -# -num.partitions=1 - -# Enable auto creation of topic on the server. If this is set to true then attempts to produce, consume, or fetch -# metadata for a non-existent topic will automatically create it with the default replication factor and number of -# partitions. -auto.create.topics.enable=true - -# The maximum size of a message that the server can receive. It is important that this property be in sync with the -# maximum fetch size your consumers use or else an unruly consumer will be able to publish messages too large for -# consumers to consume. -# -# Be careful with this setting when producing messages in batches with compression enabled. In such a scenario the -# batch of messages is treated as a single message, and its total size must be smaller than this setting. -# -message.max.bytes=1000000 - -# Enable controlled broker shutdown. -controlled.shutdown.enable=true \ No newline at end of file