diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 2f1fe93a5ac08..750b8a19aba42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -29,10 +29,15 @@ public class CommonClientConfigs {
*/
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
- public static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+ public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+ "host1:port1,host2:port2,.... Since these servers are just used for the initial connection to "
+ "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+ "servers (you may want more than one, though, in case a server is down).";
+ /**
+ * @deprecated This will be removed in a future release. Please use {@link #BOOTSTRAP_SERVERS_DOC}
+ */
+ @Deprecated
+ public static final String BOOSTRAP_SERVERS_DOC = BOOTSTRAP_SERVERS_DOC;
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 322ae0fbe2315..fd44072c8d5ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,12 +12,6 @@
*/
package org.apache.kafka.clients;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -25,6 +19,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
/**
* A class encapsulating some of the logic around metadata.
*
@@ -254,7 +255,10 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
unauthorizedTopics.retainAll(this.topics);
for (String topic : this.topics) {
- partitionInfos.addAll(cluster.partitionsForTopic(topic));
+ List "
- + "This parameter is deprecated and will be removed in a future release. "
+ + " This parameter is deprecated and will be removed in a future release. "
+ "Parameter send(record, null).
- * See {@link #send(ProducerRecord, Callback)} for details.
+ * Implementation of asynchronously send a record to a topic.
*/
private Future" + MAX_BLOCK_MS_CONFIG + " to Long.MAX_VALUE."
+ + "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the " + MAX_BLOCK_MS_CONFIG + " "
+ + "value to block, after which it will throw a TimeoutException. Setting this property to true will set the " + MAX_BLOCK_MS_CONFIG + " to Long.MAX_VALUE. "
+ "Also if this property is set to true, parameter " + METADATA_FETCH_TIMEOUT_CONFIG + " is not longer honored."
- + "" + MAX_BLOCK_MS_CONFIG + " should be used instead.";
/** buffer.memory */
@@ -218,7 +217,7 @@ public class ProducerConfig extends AbstractConfig {
+ "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
static {
- CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 5339096efa750..a73d882226564 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -74,7 +74,6 @@ public final class RecordAccumulator {
private final Set
#!/bin/bash
#Step 1
- keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
+ keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
@@ -576,6 +576,12 @@ Command Line Interface
Convenience
+
+ --force
+ Convenience option to assume yes to all queries and do not prompt.
+
+ Convenience
+
Examples
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 34c35b7c2fc2f..1ee6928e98f26 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -81,19 +81,17 @@ public void process(String dummy, String line) {
@Override
public void punctuate(long timestamp) {
- KeyValueIterator iter = this.kvStore.all();
+ try (KeyValueIterator iter = this.kvStore.all()) {
+ System.out.println("----------- " + timestamp + " ----------- ");
- System.out.println("----------- " + timestamp + " ----------- ");
+ while (iter.hasNext()) {
+ KeyValue entry = iter.next();
- while (iter.hasNext()) {
- KeyValue entry = iter.next();
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
- System.out.println("[" + entry.key + ", " + entry.value + "]");
-
- context.forward(entry.key, entry.value.toString());
+ context.forward(entry.key, entry.value.toString());
+ }
}
-
- iter.close();
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index efccd7aa842ad..95e55c9d06c37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -122,7 +122,7 @@ public class StreamsConfig extends AbstractConfig {
.define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
Type.LIST,
Importance.HIGH,
- CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 9d90ba053ccee..53b2f4eedd9ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
/**
* {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
@@ -55,6 +56,22 @@ public KStream stream(String... topics) {
return stream(null, null, topics);
}
+
+ /**
+ * Create a {@link KStream} instance from the specified Pattern.
+ * The default deserializers specified in the config are used.
+ *
+ * If multiple topics are matched by the specified pattern, the created stream will read data from all of them,
+ * and there is no ordering guarantee between records from different topics
+ *
+ * @param topicPattern the Pattern to match for topic names
+ * @return a {@link KStream} for topics matching the regex pattern.
+ */
+ public KStream stream(Pattern topicPattern) {
+ return stream(null, null, topicPattern);
+ }
+
+
/**
* Create a {@link KStream} instance from the specified topics.
*
@@ -75,6 +92,28 @@ public KStream stream(Serde keySerde, Serde valSerde, String.
return new KStreamImpl<>(this, name, Collections.singleton(name));
}
+
+ /**
+ * Create a {@link KStream} instance from the specified Pattern.
+ *
+ * If multiple topics are matched by the specified pattern, the created stream will read data from all of them,
+ * and there is no ordering guarantee between records from different topics.
+ *
+ * @param keySerde key serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to read this source {@link KStream},
+ * if not specified the default serde defined in the configs will be used
+ * @param topicPattern the Pattern to match for topic names
+ * @return a {@link KStream} for the specified topics
+ */
+ public KStream stream(Serde keySerde, Serde valSerde, Pattern topicPattern) {
+ String name = newName(KStreamImpl.SOURCE_NAME);
+
+ addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+
+ return new KStreamImpl<>(this, name, Collections.singleton(name));
+ }
+
/**
* Create a {@link KTable} instance for the specified topic.
* The default deserializers specified in the config are used.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d13d11208d913..72029a8c24092 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -25,8 +24,8 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.Iterator;
class KStreamKStreamJoin implements ProcessorSupplier {
@@ -76,15 +75,15 @@ public void process(K key, V1 value) {
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
- Iterator> iter = otherWindow.fetch(key, timeFrom, timeTo);
- while (iter.hasNext()) {
- needOuterJoin = false;
- context().forward(key, joiner.apply(value, iter.next().value));
- }
+ try (WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) {
+ while (iter.hasNext()) {
+ needOuterJoin = false;
+ context().forward(key, joiner.apply(value, iter.next().value));
+ }
- if (needOuterJoin)
- context().forward(key, joiner.apply(value, null));
+ if (needOuterJoin)
+ context().forward(key, joiner.apply(value, null));
+ }
}
}
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index b4272f89a827a..125c7fcc25d04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,7 +29,6 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.Iterator;
import java.util.Map;
public class KStreamWindowAggregate implements KStreamAggProcessorSupplier, V, T> {
@@ -90,38 +89,37 @@ public void process(K key, V value) {
timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
}
- WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo);
+ try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) {
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue entry = iter.next();
- W window = matchedWindows.get(entry.key);
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue entry = iter.next();
+ W window = matchedWindows.get(entry.key);
- if (window != null) {
+ if (window != null) {
- T oldAgg = entry.value;
+ T oldAgg = entry.value;
- if (oldAgg == null)
- oldAgg = initializer.apply();
+ if (oldAgg == null)
+ oldAgg = initializer.apply();
- // try to add the new new value (there will never be old value)
- T newAgg = aggregator.apply(key, value, oldAgg);
+ // try to add the new new value (there will never be old value)
+ T newAgg = aggregator.apply(key, value, oldAgg);
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
- // forward the aggregated change pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
- matchedWindows.remove(entry.key);
+ matchedWindows.remove(entry.key);
+ }
}
}
- iter.close();
-
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
T oldAgg = initializer.apply();
@@ -167,10 +165,9 @@ public T get(Windowed windowedKey) {
W window = (W) windowedKey.window();
// this iterator should contain at most one element
- Iterator> iter = windowStore.fetch(key, window.start(), window.start());
-
- return iter.hasNext() ? iter.next().value : null;
+ try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) {
+ return iter.hasNext() ? iter.next().value : null;
+ }
}
-
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 3ed1499f658a8..a526506c17931 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -28,7 +28,6 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.Iterator;
import java.util.Map;
public class KStreamWindowReduce implements KStreamAggProcessorSupplier, V, V> {
@@ -88,40 +87,38 @@ public void process(K key, V value) {
timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
}
- WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo);
+ try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) {
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue entry = iter.next();
+ W window = matchedWindows.get(entry.key);
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue entry = iter.next();
- W window = matchedWindows.get(entry.key);
+ if (window != null) {
- if (window != null) {
+ V oldAgg = entry.value;
+ V newAgg = oldAgg;
- V oldAgg = entry.value;
- V newAgg = oldAgg;
+ // try to add the new new value (there will never be old value)
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
+ }
- // try to add the new new value (there will never be old value)
- if (newAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(newAgg, value);
- }
-
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
- // forward the aggregated change pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
- matchedWindows.remove(entry.key);
+ matchedWindows.remove(entry.key);
+ }
}
}
- iter.close();
-
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
windowStore.put(key, value, windowStartMs);
@@ -161,10 +158,9 @@ public V get(Windowed windowedKey) {
W window = (W) windowedKey.window();
// this iterator should only contain one element
- Iterator> iter = windowStore.fetch(key, window.start(), window.start());
-
- return iter.next().value;
+ try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) {
+ return iter.next().value;
+ }
}
-
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 5425149528390..29bff6bdb70b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
/**
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
@@ -62,8 +64,14 @@ public class TopologyBuilder {
private final QuickUnion nodeGrouper = new QuickUnion<>();
private final List> copartitionSourceGroups = new ArrayList<>();
private final HashMap nodeToSourceTopics = new HashMap<>();
+ private final HashMap nodeToSourcePatterns = new LinkedHashMap<>();
+ private final HashMap topicToPatterns = new HashMap<>();
private final HashMap nodeToSinkTopic = new HashMap<>();
+ private final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
private Map> nodeGroups = null;
+ private Pattern topicPattern;
+
+
private static class StateStoreFactory {
public final Set users;
@@ -110,18 +118,41 @@ public ProcessorNode build(String applicationId) {
}
}
- private static class SourceNodeFactory extends NodeFactory {
- public final String[] topics;
+ private class SourceNodeFactory extends NodeFactory {
+ private final String[] topics;
+ public final Pattern pattern;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
- private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
+ private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) {
super(name);
- this.topics = topics.clone();
+ this.topics = topics != null ? topics.clone() : null;
+ this.pattern = pattern;
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
}
+ public String[] getTopics() {
+ return topics;
+ }
+
+ public String[] getTopics(Collection subscribedTopics) {
+ List matchedTopics = new ArrayList<>();
+ for (String update : subscribedTopics) {
+ if (this.pattern.matcher(update).matches()) {
+ if (topicToPatterns.containsKey(update)) {
+ if (topicToPatterns.get(update) != this.pattern) {
+ throw new TopologyBuilderException("Topic " + update + " already matched check for overlapping regex patterns");
+ }
+ } else {
+ topicToPatterns.put(update, this.pattern);
+ }
+ matchedTopics.add(update);
+ }
+ }
+ return matchedTopics.toArray(new String[matchedTopics.size()]);
+ }
+
@SuppressWarnings("unchecked")
@Override
public ProcessorNode build(String applicationId) {
@@ -193,7 +224,7 @@ public int hashCode() {
public TopologyBuilder() {}
/**
- * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
* The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
@@ -207,8 +238,25 @@ public final TopologyBuilder addSource(String name, String... topics) {
return addSource(name, (Deserializer) null, (Deserializer) null, topics);
}
+
/**
- * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+ * Add a new source that consumes from topics matching the given pattern
+ * and forwards the records to child processor and/or sink nodes.
+ * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return this builder instance so methods can be chained together; never null
+ */
+ public final TopologyBuilder addSource(String name, Pattern topicPattern) {
+ return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern);
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
*
* @param name the unique name of the source used to reference this node when
@@ -231,16 +279,65 @@ public final TopologyBuilder addSource(String name, Deserializer keyDeserializer
if (sourceTopicNames.contains(topic))
throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
+ for (Pattern pattern : nodeToSourcePatterns.values()) {
+ if (pattern.matcher(topic).matches()) {
+ throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
+ }
+ }
+
sourceTopicNames.add(topic);
}
- nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+ nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
nodeToSourceTopics.put(name, topics.clone());
nodeGrouper.add(name);
return this;
}
+ /**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forwards the records to child processor and/or sink nodes.
+ * The source will use the specified key and value deserializers. The provided
+ * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+ * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return this builder instance so methods can be chained together; never null
+ * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+ */
+
+ public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+
+ if (topicPattern == null) {
+ throw new TopologyBuilderException("Pattern can't be null");
+ }
+
+ if (nodeFactories.containsKey(name)) {
+ throw new TopologyBuilderException("Processor " + name + " is already added.");
+ }
+
+ for (String sourceTopicName : sourceTopicNames) {
+ if (topicPattern.matcher(sourceTopicName).matches()) {
+ throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ }
+ }
+
+ nodeToSourcePatterns.put(name, topicPattern);
+ nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer));
+ nodeGrouper.add(name);
+
+ return this;
+ }
+
/**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
@@ -504,9 +601,19 @@ private void connectProcessorAndStateStore(String processorName, String stateSto
public Map topicGroups(String applicationId) {
Map topicGroups = new HashMap<>();
+
+ if (subscriptionUpdates.hasUpdates()) {
+ for (Map.Entry stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+ SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+ //need to update nodeToSourceTopics with topics matched from given regex
+ nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+ }
+ }
+
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
+
for (Map.Entry> entry : nodeGroups.entrySet()) {
Set sinkTopics = new HashSet<>();
Set sourceTopics = new HashSet<>();
@@ -677,7 +784,9 @@ private ProcessorTopology build(String applicationId, Set nodeGroup) {
}
}
} else if (factory instanceof SourceNodeFactory) {
- for (String topic : ((SourceNodeFactory) factory).topics) {
+ SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
+ String[] topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : sourceNodeFactory.getTopics();
+ for (String topic : topics) {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
@@ -713,4 +822,28 @@ public Set sourceTopics(String applicationId) {
}
return Collections.unmodifiableSet(topics);
}
+
+ public Pattern sourceTopicPattern() {
+ if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ for (Pattern pattern : nodeToSourcePatterns.values()) {
+ builder.append(pattern.pattern()).append("|");
+ }
+ if (!nodeToSourceTopics.isEmpty()) {
+ for (String[] topics : nodeToSourceTopics.values()) {
+ for (String topic : topics) {
+ builder.append(topic).append("|");
+ }
+ }
+ }
+
+ builder.setLength(builder.length() - 1);
+ this.topicPattern = Pattern.compile(builder.toString());
+ }
+ return this.topicPattern;
+ }
+
+ public SubscriptionUpdates getSubscriptionUpdates() {
+ return subscriptionUpdates;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 085ff94aa0ddd..2160d70cace10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -118,7 +118,6 @@ public void configure(Map configs) {
streamThread = (StreamThread) o;
streamThread.partitionAssignor(this);
- this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
@@ -228,12 +227,17 @@ public Map assign(Cluster metadata, Map> consumersByClient = new HashMap<>();
Map> states = new HashMap<>();
-
+ SubscriptionUpdates subscriptionUpdates = streamThread.builder.getSubscriptionUpdates();
// decode subscription info
for (Map.Entry entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
+ if (streamThread.builder.sourceTopicPattern() != null) {
+ // update the topic groups with the returned subscription list for regex pattern subscriptions
+ subscriptionUpdates.updateTopics(subscription.topics());
+ }
+
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
Set consumers = consumersByClient.get(info.processId);
@@ -255,6 +259,8 @@ public Map assign(Cluster metadata, Map();
@@ -486,4 +492,29 @@ public Map> standbyTasks() {
public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
+
+ /**
+ * Used to capture subscribed topic via Patterns discovered during the
+ * partition assignment process.
+ */
+ public static class SubscriptionUpdates {
+
+ private final Set updatedTopicSubscriptions = new HashSet<>();
+
+
+ private void updateTopics(Collection topicNames) {
+ updatedTopicSubscriptions.clear();
+ updatedTopicSubscriptions.addAll(topicNames);
+ }
+
+ public Collection getUpdates() {
+ return Collections.unmodifiableSet(new HashSet<>(updatedTopicSubscriptions));
+ }
+
+ public boolean hasUpdates() {
+ return !updatedTopicSubscriptions.isEmpty();
+ }
+
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72eeef54266d5..64127a89afe11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -62,6 +62,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import static java.util.Collections.singleton;
@@ -78,6 +79,7 @@ public class StreamThread extends Thread {
protected final StreamsConfig config;
protected final TopologyBuilder builder;
protected final Set sourceTopics;
+ protected final Pattern topicPattern;
protected final Producer producer;
protected final Consumer consumer;
protected final Consumer restoreConsumer;
@@ -160,6 +162,7 @@ public StreamThread(TopologyBuilder builder,
this.config = config;
this.builder = builder;
this.sourceTopics = builder.sourceTopics(applicationId);
+ this.topicPattern = builder.sourceTopicPattern();
this.clientId = clientId;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -283,7 +286,12 @@ private void runLoop() {
long lastPoll = 0L;
boolean requiresPoll = true;
- consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+ if (topicPattern != null) {
+ consumer.subscribe(topicPattern, rebalanceListener);
+ } else {
+ consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+ }
+
while (stillRunning()) {
// try to fetch some records if necessary
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index cdb3de5f90a2d..ddbc7b333b6b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -27,6 +27,9 @@
/**
* Iterator interface of {@link KeyValue}.
*
+ * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ *
* @param Type of keys
* @param Type of values
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 7c474dd60bf8a..b6e6d0c2df381 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -21,13 +21,19 @@
import org.apache.kafka.streams.KeyValue;
+import java.io.Closeable;
import java.util.Iterator;
/**
* Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}.
*
+ * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ *
* @param Type of values
*/
-public interface WindowStoreIterator extends Iterator> {
+public interface WindowStoreIterator extends Iterator>, Closeable {
+
+ @Override
void close();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 37609a0d28b3a..a00de19926fc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -77,17 +77,18 @@ public class RocksDBStore implements KeyValueStore {
private final String name;
private final String parentDir;
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
+ protected File dbDir;
+ private StateSerdes serdes;
private final Serde keySerde;
private final Serde valueSerde;
- private StateSerdes serdes;
- protected File dbDir;
private RocksDB db;
+ // the following option objects will be created at constructor and disposed at close()
+ private Options options;
+ private WriteOptions wOptions;
+ private FlushOptions fOptions;
+
private boolean loggingEnabled = false;
private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
@@ -313,14 +314,16 @@ public void putAll(List> entries) {
private void putAllInternal(List> entries) {
WriteBatch batch = new WriteBatch();
- for (KeyValue entry : entries) {
- batch.put(entry.key, entry.value);
- }
-
try {
+ for (KeyValue entry : entries) {
+ batch.put(entry.key, entry.value);
+ }
+
db.write(wOptions, batch);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
+ } finally {
+ batch.dispose();
}
}
@@ -425,7 +428,15 @@ public void flushInternal() {
@Override
public void close() {
flush();
+ options.dispose();
+ wOptions.dispose();
+ fOptions.dispose();
db.close();
+
+ options = null;
+ wOptions = null;
+ fOptions = null;
+ db = null;
}
private static class RocksDbIterator implements KeyValueIterator {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 2e11cd23e1e3a..64945338e5726 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -97,6 +97,7 @@ public void shouldFanoutTheInput() throws Exception {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStream stream1 = builder.stream(INPUT_TOPIC_A);
KStream stream2 = stream1.mapValues(
@@ -119,10 +120,6 @@ public String apply(String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
- // of the input data we produce below).
- Thread.sleep(5000);
-
//
// Step 2: Produce some input data to the input topic.
//
@@ -134,10 +131,6 @@ public String apply(String value) {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig);
- // Give the stream processing application some time to do its work.
- Thread.sleep(10000);
- streams.close();
-
//
// Step 3: Verify the application's output data.
//
@@ -149,7 +142,8 @@ public String apply(String value) {
consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValuesForB = IntegrationTestUtils.readValues(OUTPUT_TOPIC_B, consumerConfigB, inputValues.size());
+ List actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
+ OUTPUT_TOPIC_B, inputValues.size());
assertThat(actualValuesForB, equalTo(expectedValuesForB));
// Verify output topic C
@@ -159,7 +153,9 @@ public String apply(String value) {
consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValuesForC = IntegrationTestUtils.readValues(OUTPUT_TOPIC_C, consumerConfigC, inputValues.size());
+ List actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
+ OUTPUT_TOPIC_C, inputValues.size());
+ streams.close();
assertThat(actualValuesForC, equalTo(expectedValuesForC));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 66111c4279cde..809a238f71f31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -19,6 +19,7 @@
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -123,7 +124,7 @@ public void shouldCompactTopicsForStateChangelogs() throws Exception {
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
-
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 93e31e22652cd..9e9d366723681 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -141,6 +141,7 @@ public void shouldCountClicksPerRegion() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Explicitly place the state directory under /tmp so that we can remove it via
// `purgeLocalStreamsState` below. Once Streams is updated to expose the effective
// StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
@@ -217,10 +218,6 @@ public Long apply(Long value1, Long value2) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
- // of the input data we produce below).
- Thread.sleep(5000);
-
//
// Step 2: Publish user-region information.
//
@@ -246,10 +243,6 @@ public Long apply(Long value1, Long value2) {
userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
- // Give the stream processing application some time to do its work.
- Thread.sleep(10000);
- streams.close();
-
//
// Step 4: Verify the application's output data.
//
@@ -259,7 +252,9 @@ public Long apply(Long value1, Long value2) {
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- List> actualClicksPerRegion = IntegrationTestUtils.readKeyValues(OUTPUT_TOPIC, consumerConfig);
+ List> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+ OUTPUT_TOPIC, expectedClicksPerRegion.size());
+ streams.close();
assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
index 31ac4006aa253..2096d9bb00b7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
@@ -79,6 +79,7 @@ public void shouldUppercaseTheInput() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStream input = builder.stream(DEFAULT_INPUT_TOPIC);
KStream uppercased = input.mapValues(new ValueMapper() {
@@ -92,10 +93,6 @@ public String apply(String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
- // of the input data we produce below).
- Thread.sleep(5000);
-
//
// Step 2: Produce some input data to the input topic.
//
@@ -107,10 +104,6 @@ public String apply(String value) {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
- // Give the stream processing application some time to do its work.
- Thread.sleep(10000);
- streams.close();
-
//
// Step 3: Verify the application's output data.
//
@@ -120,7 +113,9 @@ public String apply(String value) {
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size());
+ List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+ DEFAULT_OUTPUT_TOPIC, inputValues.size());
+ streams.close();
assertThat(actualValues, equalTo(expectedValues));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
index e126ed8cb92e1..4e6dcb2fed209 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
@@ -72,6 +72,7 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Write the input data as-is to the output topic.
builder.stream(DEFAULT_INPUT_TOPIC).to(DEFAULT_OUTPUT_TOPIC);
@@ -79,10 +80,6 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
- // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
- // of the input data we produce below).
- Thread.sleep(5000);
-
//
// Step 2: Produce some input data to the input topic.
//
@@ -94,10 +91,6 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
- // Give the stream processing application some time to do its work.
- Thread.sleep(10000);
- streams.close();
-
//
// Step 3: Verify the application's output data.
//
@@ -107,7 +100,9 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size());
+ List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+ DEFAULT_OUTPUT_TOPIC, inputValues.size());
+ streams.close();
assertThat(actualValues, equalTo(inputValues));
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
new file mode 100644
index 0000000000000..f8c54f6a7426e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test based on using regex and named topics for creating sources, using
+ * an embedded Kafka cluster.
+ */
+
+public class RegexSourceIntegrationTest {
+ @ClassRule
+ public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
+ private static final String TOPIC_1 = "topic-1";
+ private static final String TOPIC_2 = "topic-2";
+ private static final String TOPIC_A = "topic-A";
+ private static final String TOPIC_C = "topic-C";
+ private static final String TOPIC_Y = "topic-Y";
+ private static final String TOPIC_Z = "topic-Z";
+ private static final String FA_TOPIC = "fa";
+ private static final String FOO_TOPIC = "foo";
+
+ private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+
+ @BeforeClass
+ public static void startKafkaCluster() throws Exception {
+ CLUSTER.createTopic(TOPIC_1);
+ CLUSTER.createTopic(TOPIC_2);
+ CLUSTER.createTopic(TOPIC_A);
+ CLUSTER.createTopic(TOPIC_C);
+ CLUSTER.createTopic(TOPIC_Y);
+ CLUSTER.createTopic(TOPIC_Z);
+ CLUSTER.createTopic(FA_TOPIC);
+ CLUSTER.createTopic(FOO_TOPIC);
+
+ }
+
+
+ @Test
+ public void testShouldReadFromRegexAndNamedTopics() throws Exception {
+
+ String topic1TestMessage = "topic-1 test";
+ String topic2TestMessage = "topic-2 test";
+ String topicATestMessage = "topic-A test";
+ String topicCTestMessage = "topic-C test";
+ String topicYTestMessage = "topic-Y test";
+ String topicZTestMessage = "topic-Z test";
+
+
+ final Serde stringSerde = Serdes.String();
+
+ Properties streamsConfiguration = getStreamsConfig();
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
+ KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
+ KStream namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+
+ pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+ pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+ namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ Properties producerConfig = getProducerConfig();
+
+ produceMessage(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig);
+ produceMessage(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig);
+ produceMessage(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig);
+ produceMessage(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig);
+ produceMessage(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig);
+ produceMessage(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig);
+
+ Properties consumerConfig = getConsumerConfig();
+
+ List expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
+ List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
+ List actualValues = new ArrayList<>(6);
+
+ for (KeyValue receivedKeyValue : receivedKeyValues) {
+ actualValues.add(receivedKeyValue.value);
+ }
+
+ streams.close();
+ Collections.sort(actualValues);
+ Collections.sort(expectedReceivedValues);
+ assertThat(actualValues, equalTo(expectedReceivedValues));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
+
+ String fooMessage = "fooMessage";
+ String fMessage = "fMessage";
+
+
+ final Serde stringSerde = Serdes.String();
+
+ Properties streamsConfiguration = getStreamsConfig();
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ /*
+ overlapping patterns here, no messages should be sent as TopologyBuilderException
+ will be thrown when the processor topology is built.
+ */
+ KStream pattern1Stream = builder.stream(Pattern.compile("foo.*"));
+ KStream pattern2Stream = builder.stream(Pattern.compile("f.*"));
+
+
+ pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+ pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ Properties producerConfig = getProducerConfig();
+
+ produceMessage(FA_TOPIC, Arrays.asList(fMessage), producerConfig);
+ produceMessage(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig);
+
+ Properties consumerConfig = getConsumerConfig();
+
+ try {
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+ } finally {
+ streams.close();
+ }
+
+ }
+
+ private void produceMessage(String inputTopic, List input, Properties producerConfig) throws Exception {
+ IntegrationTestUtils.produceValuesSynchronously(inputTopic, input, producerConfig);
+ }
+
+
+ private Properties getProducerConfig() {
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return producerConfig;
+ }
+
+ private Properties getStreamsConfig() {
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex-source-integration-test");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+ return streamsConfiguration;
+ }
+
+ private Properties getConsumerConfig() {
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-source-integration-consumer");
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return consumerConfig;
+ }
+
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index c8583d1da0ed1..e00cd13c370e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -83,6 +83,7 @@ public void shouldCountWords() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Explicitly place the state directory under /tmp so that we can remove it via
// `purgeLocalStreamsState` below. Once Streams is updated to expose the effective
// StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
@@ -115,11 +116,7 @@ public KeyValue apply(String key, String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
-
- // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
- // of the input data we produce below).
- Thread.sleep(5000);
-
+
//
// Step 2: Produce some input data to the input topic.
//
@@ -134,15 +131,15 @@ public KeyValue apply(String key, String value) {
//
// Step 3: Verify the application's output data.
//
- Thread.sleep(10000);
- streams.close();
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- List> actualWordCounts = IntegrationTestUtils.readKeyValues(DEFAULT_OUTPUT_TOPIC, consumerConfig);
+ List> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
+ DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size());
+ streams.close();
assertThat(actualWordCounts, equalTo(expectedWordCounts));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 89fe0c4ef943e..c3f90897fcc2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -44,7 +44,8 @@
*/
public class IntegrationTestUtils {
- private static final int UNLIMITED_MESSAGES = -1;
+ public static final int UNLIMITED_MESSAGES = -1;
+ public static final long DEFAULT_TIMEOUT = 30 * 1000L;
/**
* Returns up to `maxMessages` message-values from the topic.
@@ -54,10 +55,10 @@ public class IntegrationTestUtils {
* @param maxMessages Maximum number of messages to read via the consumer.
* @return The values retrieved via the consumer.
*/
- public static List readValues(String topic, Properties consumerConfig, int maxMessages) {
+ public static List readValues(String topic, Properties consumerConfig, int maxMessages) {
List returnList = new ArrayList<>();
- List> kvs = readKeyValues(topic, consumerConfig, maxMessages);
- for (KeyValue kv : kvs) {
+ List> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+ for (KeyValue, V> kv : kvs) {
returnList.add(kv.value);
}
return returnList;
@@ -154,4 +155,75 @@ public static void produceValuesSynchronously(
produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
}
+ public static List> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
+ String topic,
+ int expectedNumRecords) throws InterruptedException {
+
+ return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Wait until enough data (key-value records) has been consumed.
+ * @param consumerConfig Kafka Consumer configuration
+ * @param topic Topic to consume from
+ * @param expectedNumRecords Minimum number of expected records
+ * @param waitTime Upper bound in waiting time in milliseconds
+ * @return All the records consumed, or null if no records are consumed
+ * @throws InterruptedException
+ * @throws AssertionError if the given wait time elapses
+ */
+ public static List> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
+ String topic,
+ int expectedNumRecords,
+ long waitTime) throws InterruptedException {
+ List> accumData = new ArrayList<>();
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ List> readData = readKeyValues(topic, consumerConfig);
+ accumData.addAll(readData);
+ if (accumData.size() >= expectedNumRecords)
+ return accumData;
+ if (System.currentTimeMillis() > startTime + waitTime)
+ throw new AssertionError("Expected " + expectedNumRecords +
+ " but received only " + accumData.size() +
+ " records before timeout " + waitTime + " ms");
+ Thread.sleep(Math.min(waitTime, 100L));
+ }
+ }
+
+ public static List waitUntilMinValuesRecordsReceived(Properties consumerConfig,
+ String topic,
+ int expectedNumRecords) throws InterruptedException {
+
+ return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Wait until enough data (value records) has been consumed.
+ * @param consumerConfig Kafka Consumer configuration
+ * @param topic Topic to consume from
+ * @param expectedNumRecords Minimum number of expected records
+ * @param waitTime Upper bound in waiting time in milliseconds
+ * @return All the records consumed, or null if no records are consumed
+ * @throws InterruptedException
+ * @throws AssertionError if the given wait time elapses
+ */
+ public static List waitUntilMinValuesRecordsReceived(Properties consumerConfig,
+ String topic,
+ int expectedNumRecords,
+ long waitTime) throws InterruptedException {
+ List accumData = new ArrayList<>();
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ List readData = readValues(topic, consumerConfig, expectedNumRecords);
+ accumData.addAll(readData);
+ if (accumData.size() >= expectedNumRecords)
+ return accumData;
+ if (System.currentTimeMillis() > startTime + waitTime)
+ throw new AssertionError("Expected " + expectedNumRecords +
+ " but received only " + accumData.size() +
+ " records before timeout " + waitTime + " ms");
+ Thread.sleep(Math.min(waitTime, 100L));
+ }
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 9af313a958fa8..ad1cb27d44887 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -18,10 +18,10 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
@@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -152,6 +153,46 @@ public void testSourceTopics() {
assertEquals(expected, builder.sourceTopics("X"));
}
+ @Test
+ public void testPatternSourceTopic() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ Pattern expectedPattern = Pattern.compile("topic-\\d");
+ builder.addSource("source-1", expectedPattern);
+ assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+ }
+
+ @Test
+ public void testAddMoreThanOnePatternSourceNode() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
+ builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
+ builder.addSource("source-2", Pattern.compile(".*-\\d"));
+ assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+ }
+
+ @Test
+ public void testSubscribeTopicNameAndPattern() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ Pattern expectedPattern = Pattern.compile(".*-\\d|topic-foo|topic-bar");
+ builder.addSource("source-1", "topic-foo", "topic-bar");
+ builder.addSource("source-2", Pattern.compile(".*-\\d"));
+ assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+ }
+
+ @Test(expected = TopologyBuilderException.class)
+ public void testPatternMatchesAlreadyProvidedTopicSource() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source-1", "foo");
+ builder.addSource("source-2", Pattern.compile("foo"));
+ }
+
+ @Test(expected = TopologyBuilderException.class)
+ public void testNamedTopicMatchesAlreadyProvidedPattern() {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source-1", Pattern.compile("foo"));
+ builder.addSource("source-2", "foo");
+ }
+
@Test(expected = TopologyBuilderException.class)
public void testAddStateStoreWithNonExistingProcessor() {
final TopologyBuilder builder = new TopologyBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1095fcf513ff2..62b283aefd944 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -351,9 +351,11 @@ public void process(String key, String value) {
@Override
public void punctuate(long streamTime) {
int count = 0;
- for (KeyValueIterator iter = store.all(); iter.hasNext();) {
- iter.next();
- ++count;
+ try (KeyValueIterator iter = store.all()) {
+ while (iter.hasNext()) {
+ iter.next();
+ ++count;
+ }
}
context().forward(Long.toString(streamTime), count);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 3a35d7542fcee..be5596d053626 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -362,9 +362,11 @@ public int checkForRestoredEntries(KeyValueStore store) {
*/
public int sizeOf(KeyValueStore store) {
int size = 0;
- for (KeyValueIterator iterator = store.all(); iterator.hasNext();) {
- iterator.next();
- ++size;
+ try (KeyValueIterator iterator = store.all()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ ++size;
+ }
}
return size;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index e9888ada6be6c..d889e7b323429 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -785,9 +785,10 @@ public void send(ProducerRecord record, Serializer keySeria
segmentDirs(baseDir)
);
- WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
- while (iter.hasNext()) {
- iter.next();
+ try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) {
+ while (iter.hasNext()) {
+ iter.next();
+ }
}
assertEquals(
diff --git a/tests/README.md b/tests/README.md
index 8950b4682c923..098922f877a7d 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -14,9 +14,11 @@ https://cwiki.apache.org/confluence/display/KAFKA/tutorial+-+set+up+and+run+Kafk
* Install Virtual Box from [https://www.virtualbox.org/](https://www.virtualbox.org/) (run `$ vboxmanage --version` to check if it's installed).
* Install Vagrant >= 1.6.4 from [http://www.vagrantup.com/](http://www.vagrantup.com/) (run `vagrant --version` to check if it's installed).
-* Install system test dependiences, including ducktape, a command-line tool and library for testing distributed systems.
+* Install system test dependencies, including ducktape, a command-line tool and library for testing distributed systems. We recommend to use virtual env for system test development
$ cd kafka/tests
+ $ virtualenv venv
+ $ . ./venv/bin/activate
$ python setup.py develop
$ cd .. # back to base kafka directory