diff --git a/build.gradle b/build.gradle
index da0152b990f97..65bf3b63bd86e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -669,6 +669,8 @@ project(':streams') {
compile libs.jacksonDatabind // this dependency should be removed after KIP-4
testCompile project(':clients').sourceSets.test.output
+ testCompile project(':core')
+ testCompile project(':core').sourceSets.test.output
testCompile libs.junit
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 051c8d13ef67c..0ec8232fd6de9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -131,6 +131,17 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
new file mode 100644
index 0000000000000..2a3e7670e2878
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import kafka.admin.AdminUtils;
+import kafka.log.LogConfig;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Map;
+
+/**
+ * Tests related to internal topics in streams
+ */
+public class InternalTopicIntegrationTest {
+ @ClassRule
+ public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster();
+ private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+ private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+ private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
+ private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+
+ @BeforeClass
+ public static void startKafkaCluster() throws Exception {
+ cluster.createTopic(DEFAULT_INPUT_TOPIC);
+ cluster.createTopic(DEFAULT_OUTPUT_TOPIC);
+ }
+
+ /**
+ * Validates that any state changelog topics are compacted
+ * @return true if topics have a valid config, false otherwise
+ */
+ private boolean isUsingCompactionForStateChangelogTopics() {
+ boolean valid = true;
+
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ cluster.zKConnectString(),
+ DEFAULT_ZK_SESSION_TIMEOUT_MS,
+ DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecure = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure);
+
+ Map topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
+ Iterator it = topicConfigs.iterator();
+ while (it.hasNext()) {
+ Tuple2 topicConfig = (Tuple2) it.next();
+ String topic = topicConfig._1;
+ Properties prop = topicConfig._2;
+
+ // state changelogs should be compacted
+ if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
+ if (!prop.containsKey(LogConfig.CleanupPolicyProp()) ||
+ !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) {
+ valid = false;
+ break;
+ }
+ }
+ }
+ zkClient.close();
+ return valid;
+ }
+
+ @Test
+ public void shouldCompactTopicsForStateChangelogs() throws Exception {
+ List inputValues = Arrays.asList("hello", "world", "world", "hello world");
+
+ //
+ // Step 1: Configure and start a simple word count topology
+ //
+ final Serde stringSerde = Serdes.String();
+ final Serde longSerde = Serdes.Long();
+
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString());
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC);
+
+ KStream wordCounts = textLines
+ .flatMapValues(new ValueMapper>() {
+ @Override
+ public Iterable apply(String value) {
+ return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+ }
+ }).map(new KeyValueMapper>() {
+ @Override
+ public KeyValue apply(String key, String value) {
+ return new KeyValue(value, value);
+ }
+ }).countByKey("Counts").toStream();
+
+ wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
+
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ //
+ // Step 2: Produce some input data to the input topic.
+ //
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+
+ //
+ // Step 3: Verify the state changelog topics are compact
+ //
+ streams.close();
+ assertEquals(isUsingCompactionForStateChangelogTopics(), true);
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
new file mode 100644
index 0000000000000..34753ae2e672f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import kafka.server.KafkaConfig$;
+import kafka.zk.EmbeddedZookeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
+ */
+public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
+
+ private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class);
+ private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
+ private EmbeddedZookeeper zookeeper = null;
+ private KafkaEmbedded broker = null;
+
+ /**
+ * Creates and starts a Kafka cluster.
+ */
+ public void start() throws IOException, InterruptedException {
+ Properties brokerConfig = new Properties();
+
+ log.debug("Initiating embedded Kafka cluster startup");
+ log.debug("Starting a ZooKeeper instance");
+ zookeeper = new EmbeddedZookeeper();
+ log.debug("ZooKeeper instance is running at {}", zKConnectString());
+ brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+ brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+
+ log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
+ broker = new KafkaEmbedded(brokerConfig);
+
+ log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
+ broker.brokerList(), broker.zookeeperConnect());
+ }
+
+ /**
+ * Stop the Kafka cluster.
+ */
+ public void stop() {
+ broker.stop();
+ zookeeper.shutdown();
+ }
+
+ /**
+ * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
+ * Example: `127.0.0.1:2181`.
+ *
+ * You can use this to e.g. tell Kafka brokers how to connect to this instance.
+ */
+ public String zKConnectString() {
+ return "localhost:" + zookeeper.port();
+ }
+
+ /**
+ * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`.
+ *
+ * You can use this to tell Kafka producers how to connect to this cluster.
+ */
+ public String bootstrapServers() {
+ return broker.brokerList();
+ }
+
+ protected void before() throws Throwable {
+ start();
+ }
+
+ protected void after() {
+ stop();
+ }
+
+ /**
+ * Create a Kafka topic with 1 partition and a replication factor of 1.
+ *
+ * @param topic The name of the topic.
+ */
+ public void createTopic(String topic) {
+ createTopic(topic, 1, 1, new Properties());
+ }
+
+ /**
+ * Create a Kafka topic with the given parameters.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ * @param replication The replication factor for (the partitions of) this topic.
+ */
+ public void createTopic(String topic, int partitions, int replication) {
+ createTopic(topic, partitions, replication, new Properties());
+ }
+
+ /**
+ * Create a Kafka topic with the given parameters.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ * @param replication The replication factor for (partitions of) this topic.
+ * @param topicConfig Additional topic-level configuration settings.
+ */
+ public void createTopic(String topic,
+ int partitions,
+ int replication,
+ Properties topicConfig) {
+ broker.createTopic(topic, partitions, replication, topicConfig);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
new file mode 100644
index 0000000000000..89fe0c4ef943e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Utility functions to make integration testing more convenient.
+ */
+public class IntegrationTestUtils {
+
+ private static final int UNLIMITED_MESSAGES = -1;
+
+ /**
+ * Returns up to `maxMessages` message-values from the topic.
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer configuration
+ * @param maxMessages Maximum number of messages to read via the consumer.
+ * @return The values retrieved via the consumer.
+ */
+ public static List readValues(String topic, Properties consumerConfig, int maxMessages) {
+ List returnList = new ArrayList<>();
+ List> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+ for (KeyValue kv : kvs) {
+ returnList.add(kv.value);
+ }
+ return returnList;
+ }
+
+ /**
+ * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is
+ * reached.
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer configuration
+ * @return The KeyValue elements retrieved via the consumer.
+ */
+ public static List> readKeyValues(String topic, Properties consumerConfig) {
+ return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
+ }
+
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer configuration
+ * @param maxMessages Maximum number of messages to read via the consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ public static List> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
+ consumer.subscribe(Collections.singletonList(topic));
+ int pollIntervalMs = 100;
+ int maxTotalPollTimeMs = 2000;
+ int totalPollTimeMs = 0;
+ List> consumedValues = new ArrayList<>();
+ while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
+ totalPollTimeMs += pollIntervalMs;
+ ConsumerRecords records = consumer.poll(pollIntervalMs);
+ for (ConsumerRecord record : records) {
+ consumedValues.add(new KeyValue<>(record.key(), record.value()));
+ }
+ }
+ consumer.close();
+ return consumedValues;
+ }
+
+ private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
+ return maxMessages <= 0 || messagesConsumed < maxMessages;
+ }
+
+ /**
+ * Removes local state stores. Useful to reset state in-between integration test runs.
+ *
+ * @param streamsConfiguration Streams configuration settings
+ */
+ public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
+ String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+ if (path != null) {
+ File node = Paths.get(path).normalize().toFile();
+ // Only purge state when it's under /tmp. This is a safety net to prevent accidentally
+ // deleting important local directory trees.
+ if (node.getAbsolutePath().startsWith("/tmp")) {
+ Utils.delete(new File(node.getAbsolutePath()));
+ }
+ }
+ }
+
+ /**
+ * @param topic Kafka topic to write the data records to
+ * @param records Data records to write to Kafka
+ * @param producerConfig Kafka producer configuration
+ * @param Key type of the data records
+ * @param Value type of the data records
+ */
+ public static void produceKeyValuesSynchronously(
+ String topic, Collection> records, Properties producerConfig)
+ throws ExecutionException, InterruptedException {
+ Producer producer = new KafkaProducer<>(producerConfig);
+ for (KeyValue record : records) {
+ Future f = producer.send(
+ new ProducerRecord<>(topic, record.key, record.value));
+ f.get();
+ }
+ producer.flush();
+ producer.close();
+ }
+
+ public static void produceValuesSynchronously(
+ String topic, Collection records, Properties producerConfig)
+ throws ExecutionException, InterruptedException {
+ Collection> keyedRecords = new ArrayList<>();
+ for (V value : records) {
+ KeyValue