diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java index feb4ee7d41f2e..87368c1c46520 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java @@ -55,7 +55,7 @@ public KeyValue apply(String key, String value) { } }).filter(new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } }); @@ -63,13 +63,13 @@ public boolean apply(String key, Integer value) { KStream[] streams = stream2.branch( new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return (value % 2) == 0; } }, new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java index 0b3aba8716aed..92e6284e73cfe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KafkaStreaming; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.StreamingConfig; @@ -36,10 +36,10 @@ public class ProcessorJob { - private static class MyProcessorDef implements ProcessorDef { + private static class MyProcessorSupplier implements ProcessorSupplier { @Override - public Processor instance() { + public Processor get() { return new Processor() { private ProcessorContext context; private KeyValueStore kvStore; @@ -102,7 +102,7 @@ public static void main(String[] args) throws Exception { builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); - builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE"); + builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE"); builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6c488cff177c5..ecec882493382 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -19,10 +19,13 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; /** * KStream is an abstraction of a stream of key-value pairs. + * + * @param the type of keys + * @param the type of values */ public interface KStream { @@ -30,7 +33,7 @@ public interface KStream { * Creates a new stream consists of all elements of this stream which satisfy a predicate * * @param predicate the instance of Predicate - * @return KStream + * @return the stream with only those elements that satisfy the predicate */ KStream filter(Predicate predicate); @@ -38,45 +41,45 @@ public interface KStream { * Creates a new stream consists all elements of this stream which do not satisfy a predicate * * @param predicate the instance of Predicate - * @return KStream + * @return the stream with only those elements that do not satisfy the predicate */ KStream filterOut(Predicate predicate); /** - * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream + * Creates a new stream by applying transforming each element in this stream into a different element in the new stream. * * @param mapper the instance of KeyValueMapper * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream map(KeyValueMapper> mapper); /** - * Creates a new stream by transforming values by a mapper to all values of this stream + * Creates a new stream by applying transforming each value in this stream into a different value in the new stream. * * @param mapper the instance of ValueMapper * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream mapValues(ValueMapper mapper); /** - * Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable + * Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream. * * @param mapper the instance of KeyValueMapper * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream flatMap(KeyValueMapper>> mapper); /** - * Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable + * Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream. * * @param processor the instance of Processor * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream flatMapValues(ValueMapper> processor); @@ -84,18 +87,18 @@ public interface KStream { * Creates a new windowed stream using a specified window instance. * * @param windowDef the instance of Window - * @return KStream + * @return the windowed stream */ - KStreamWindowed with(WindowDef windowDef); + KStreamWindowed with(WindowSupplier windowDef); /** - * Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in + * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to * a corresponding stream for the first predicate is evaluated true. * An element will be dropped if none of the predicates evaluate true. * - * @param predicates Instances of Predicate - * @return KStream + * @param predicates the ordered list of Predicate instances + * @return the new streams that each contain those elements for which their Predicate evaluated to true. */ KStream[] branch(Predicate... predicates); @@ -106,7 +109,7 @@ public interface KStream { * @param topic the topic name * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the new stream that consumes the given topic */ KStream through(String topic); @@ -116,16 +119,16 @@ public interface KStream { * * @param topic the topic name * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used + * if not specified the default key serializer defined in the configuration will be used * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used + * if not specified the default value serializer defined in the configuration will be used * @param keyDeserializer key deserializer used to create the new KStream, - * if not specified the default deserializer defined in the configs will be used + * if not specified the default key deserializer defined in the configuration will be used * @param valDeserializer value deserializer used to create the new KStream, - * if not specified the default deserializer defined in the configs will be used + * if not specified the default value deserializer defined in the configuration will be used * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the new stream that consumes the given topic */ KStream through(String topic, Serializer keySerializer, Serializer valSerializer, Deserializer keyDeserializer, Deserializer valDeserializer); @@ -150,7 +153,8 @@ public interface KStream { /** * Processes all elements in this stream by applying a processor. * - * @param processorDef the class of ProcessorDef + * @param processorSupplier the supplier of the Processor to use + * @return the new stream containing the processed output */ - KStream process(ProcessorDef processorDef); + KStream process(ProcessorSupplier processorSupplier); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java index 9cdb3bc6caa56..c73622e460eca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -17,8 +17,14 @@ package org.apache.kafka.streams.kstream; +/** + * Represents a predicate (boolean-valued function) of two arguments. + * + * @param the type of key + * @param the type of value + */ public interface Predicate { - boolean apply(K key, V value); + boolean test(K key, V value); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java similarity index 98% rename from streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java index 5927db691db47..0110c875a1419 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java @@ -37,7 +37,7 @@ import java.util.LinkedList; import java.util.Map; -public class SlidingWindowDef implements WindowDef { +public class SlidingWindowSupplier implements WindowSupplier { private final String name; private final long duration; private final int maxCount; @@ -46,7 +46,7 @@ public class SlidingWindowDef implements WindowDef { private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; - public SlidingWindowDef( + public SlidingWindowSupplier( String name, long duration, int maxCount, @@ -69,7 +69,7 @@ public String name() { } @Override - public Window instance() { + public Window get() { return new SlidingWindow(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java similarity index 93% rename from streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java index bbc5979300b79..46a2b9ee91622 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.kstream; -public interface WindowDef { +public interface WindowSupplier { String name(); - Window instance(); + Window get(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index c8061479cff02..06083b30b7d0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -19,20 +19,20 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.kstream.Predicate; -class KStreamBranch implements ProcessorDef { +class KStreamBranch implements ProcessorSupplier { private final Predicate[] predicates; @SuppressWarnings("unchecked") - public KStreamBranch(Predicate... predicates) { + public KStreamBranch(Predicate ... predicates) { this.predicates = predicates; } @Override - public Processor instance() { + public Processor get() { return new KStreamBranchProcessor(); } @@ -40,7 +40,7 @@ private class KStreamBranchProcessor extends AbstractProcessor { @Override public void process(K key, V value) { for (int i = 0; i < predicates.length; i++) { - if (predicates[i].apply(key, value)) { + if (predicates[i].test(key, value)) { // use forward with childIndex here and then break the loop // so that no record is going to be piped to multiple streams context().forward(key, value, i); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index 22800f3cd2d63..0b1f1e05a4a47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -20,9 +20,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFilter implements ProcessorDef { +class KStreamFilter implements ProcessorSupplier { private final Predicate predicate; private final boolean filterOut; @@ -33,14 +33,14 @@ public KStreamFilter(Predicate predicate, boolean filterOut) { } @Override - public Processor instance() { + public Processor get() { return new KStreamFilterProcessor(); } private class KStreamFilterProcessor extends AbstractProcessor { @Override public void process(K key, V value) { - if (filterOut ^ predicate.apply(key, value)) { + if (filterOut ^ predicate.test(key, value)) { context().forward(key, value); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index 6c7f4eac09ae3..175a002d17e85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -21,9 +21,9 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFlatMap implements ProcessorDef { +class KStreamFlatMap implements ProcessorSupplier { private final KeyValueMapper>> mapper; @@ -32,7 +32,7 @@ class KStreamFlatMap implements ProcessorDef { } @Override - public Processor instance() { + public Processor get() { return new KStreamFlatMapProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index 9cdcdf5b8af28..9b4559bd769c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -20,9 +20,9 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFlatMapValues implements ProcessorDef { +class KStreamFlatMapValues implements ProcessorSupplier { private final ValueMapper> mapper; @@ -31,7 +31,7 @@ class KStreamFlatMapValues implements ProcessorDef { } @Override - public Processor instance() { + public Processor get() { return new KStreamFlatMapValuesProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 52c717feee7f2..cff97d6c31f9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -19,15 +19,15 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.processor.ProcessorDef; -import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; +import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; import java.lang.reflect.Array; import java.util.concurrent.atomic.AtomicInteger; @@ -127,12 +127,12 @@ public KStream flatMapValues(ValueMapper> mapper) { } @Override - public KStreamWindowed with(WindowDef window) { + public KStreamWindowed with(WindowSupplier windowSupplier) { String name = WINDOWED_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, new KStreamWindow<>(window), this.name); + topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name); - return new KStreamWindowedImpl<>(topology, name, window); + return new KStreamWindowedImpl<>(topology, name, windowSupplier); } @Override @@ -191,10 +191,10 @@ public void to(String topic, Serializer keySerializer, Serializer valSeria } @Override - public KStream process(final ProcessorDef processorDef) { + public KStream process(final ProcessorSupplier processorSupplier) { String name = PROCESSOR_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, processorDef, this.name); + topology.addProcessor(name, processorSupplier, this.name); return new KStreamImpl<>(topology, name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java index 311efef4c26ac..997953f579d55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java @@ -22,11 +22,11 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import java.util.Iterator; -class KStreamJoin implements ProcessorDef { +class KStreamJoin implements ProcessorSupplier { private static abstract class Finder { abstract Iterator find(K key, long timestamp); @@ -41,7 +41,7 @@ private static abstract class Finder { } @Override - public Processor instance() { + public Processor get() { return new KStreamJoinProcessor(windowName); } @@ -66,6 +66,7 @@ public void init(ProcessorContext context) { final Window window = (Window) context.getStateStore(windowName); this.finder = new Finder() { + @Override Iterator find(K key, long timestamp) { return window.find(key, timestamp); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index a9a7b24c9382b..3868318f5d136 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -21,9 +21,9 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamMap implements ProcessorDef { +class KStreamMap implements ProcessorSupplier { private final KeyValueMapper> mapper; @@ -32,7 +32,7 @@ public KStreamMap(KeyValueMapper> mapper) { } @Override - public Processor instance() { + public Processor get() { return new KStreamMapProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index ac39f37184a0b..692b421599c06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -17,12 +17,12 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamMapValues implements ProcessorDef { +class KStreamMapValues implements ProcessorSupplier { private final ValueMapper mapper; @@ -31,7 +31,7 @@ public KStreamMapValues(ValueMapper mapper) { } @Override - public Processor instance() { + public Processor get() { return new KStreamMapProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java index 0f4638d52b95a..59a815ba2b9b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java @@ -19,13 +19,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamPassThrough implements ProcessorDef { +class KStreamPassThrough implements ProcessorSupplier { @Override - public Processor instance() { - return new KStreamPassThroughProcessor(); + public Processor get() { + return new KStreamPassThroughProcessor(); } public class KStreamPassThroughProcessor extends AbstractProcessor { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java index bdd132304b600..29239364a8093 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java @@ -18,26 +18,26 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -public class KStreamWindow implements ProcessorDef { +public class KStreamWindow implements ProcessorSupplier { - private final WindowDef windowDef; + private final WindowSupplier windowSupplier; - KStreamWindow(WindowDef windowDef) { - this.windowDef = windowDef; + KStreamWindow(WindowSupplier windowSupplier) { + this.windowSupplier = windowSupplier; } - public WindowDef window() { - return windowDef; + public WindowSupplier window() { + return windowSupplier; } @Override - public Processor instance() { + public Processor get() { return new KStreamWindowProcessor(); } @@ -48,7 +48,7 @@ private class KStreamWindowProcessor extends AbstractProcessor { @Override public void init(ProcessorContext context) { super.init(context); - this.window = windowDef.instance(); + this.window = windowSupplier.get(); this.window.init(context); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java index a208af6cca137..93160122e0ce7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java @@ -20,22 +20,22 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; public final class KStreamWindowedImpl extends KStreamImpl implements KStreamWindowed { - private final WindowDef windowDef; + private final WindowSupplier windowSupplier; - public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowDef windowDef) { + public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowSupplier windowSupplier) { super(topology, name); - this.windowDef = windowDef; + this.windowSupplier = windowSupplier; } @Override public KStream join(KStreamWindowed other, ValueJoiner valueJoiner) { - String thisWindowName = this.windowDef.name(); - String otherWindowName = ((KStreamWindowedImpl) other).windowDef.name(); + String thisWindowName = this.windowSupplier.name(); + String otherWindowName = ((KStreamWindowedImpl) other).windowSupplier.name(); KStreamJoin joinThis = new KStreamJoin<>(otherWindowName, valueJoiner); KStreamJoin joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java similarity index 92% rename from streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java rename to streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java index a32a899ec2744..719d3ac2a34ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.processor; -public interface ProcessorDef { +public interface ProcessorSupplier { - Processor instance(); + Processor get(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index a254c131c1a47..833e29b9d9eef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -57,18 +57,17 @@ private interface NodeFactory { private class ProcessorNodeFactory implements NodeFactory { public final String[] parents; private final String name; - private final ProcessorDef definition; + private final ProcessorSupplier supplier; - public ProcessorNodeFactory(String name, String[] parents, ProcessorDef definition) { + public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) { this.name = name; this.parents = parents.clone(); - this.definition = definition; + this.supplier = supplier; } @Override public ProcessorNode build() { - Processor processor = definition.instance(); - return new ProcessorNode(name, processor); + return new ProcessorNode(name, supplier.get()); } } @@ -123,7 +122,7 @@ public TopologyBuilder() {} * {@link StreamingConfig streaming configuration}. * * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}. + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ @@ -136,7 +135,7 @@ public final TopologyBuilder addSource(String name, String... topics) { * The sink will use the specified key and value deserializers. * * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}. + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the * {@link StreamingConfig streaming configuration} @@ -216,12 +215,12 @@ public final TopologyBuilder addSink(String name, String topic, Serializer keySe * Add a new processor node that receives and processes messages output by one or more parent source or processor node. * Any new messages output by this processor will be forwarded to its child processor or sink nodes. * @param name the unique name of the processor node - * @param definition the supplier used to obtain this node's {@link Processor} instance + * @param supplier the supplier used to obtain this node's {@link Processor} instance * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive * and process * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) { + public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) { if (nodeNames.contains(name)) throw new TopologyException("Processor " + name + " is already added."); @@ -237,7 +236,7 @@ public final TopologyBuilder addProcessor(String name, ProcessorDef definition, } nodeNames.add(name); - nodeFactories.add(new ProcessorNodeFactory(name, parentNames, definition)); + nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier)); return this; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index c18ddfe3ca041..40eba2f103a5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import java.lang.reflect.Array; @@ -44,19 +44,19 @@ public void testKStreamBranch() { Predicate isEven = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 2) == 0; } }; Predicate isMultipleOfThree = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 3) == 0; } }; Predicate isOdd = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 2) != 0; } }; @@ -65,16 +65,16 @@ public boolean apply(Integer key, String value) { KStream stream; KStream[] branches; - MockProcessorDef[] processors; + MockProcessorSupplier[] processors; stream = builder.from(keyDeserializer, valDeserializer, topicName); branches = stream.branch(isEven, isMultipleOfThree, isOdd); assertEquals(3, branches.length); - processors = (MockProcessorDef[]) Array.newInstance(MockProcessorDef.class, branches.length); + processors = (MockProcessorSupplier[]) Array.newInstance(MockProcessorSupplier.class, branches.length); for (int i = 0; i < branches.length; i++) { - processors[i] = new MockProcessorDef<>(); + processors[i] = new MockProcessorSupplier<>(); branches[i].process(processors[i]); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index b80e1e2028ef7..d1e5d38a987f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; @@ -38,7 +38,7 @@ public class KStreamFilterTest { private Predicate isMultipleOfThree = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 3) == 0; } }; @@ -49,9 +49,9 @@ public void testFilter() { final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.filter(isMultipleOfThree).process(processor); @@ -69,9 +69,9 @@ public void testFilterOut() { final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.filterOut(isMultipleOfThree).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index e87223e73d886..61b5ccd5a1dc4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -57,9 +57,9 @@ public Iterable> apply(Integer key, String value) { final int[] expectedKeys = {0, 1, 2, 3}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.flatMap(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java index 09dda65fc3cb6..66faf077885c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import java.util.ArrayList; @@ -55,9 +55,9 @@ public Iterable apply(String value) { final int[] expectedKeys = {0, 1, 2, 3}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.flatMapValues(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 0660ddd87a54f..875712a04ef6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; @@ -47,12 +47,12 @@ public void testNumProcesses() { KStream stream1 = source1.filter(new Predicate() { @Override - public boolean apply(String key, String value) { + public boolean test(String key, String value) { return true; } }).filterOut(new Predicate() { @Override - public boolean apply(String key, String value) { + public boolean test(String key, String value) { return false; } }); @@ -74,13 +74,13 @@ public Iterable apply(String value) { KStream[] streams2 = stream2.branch( new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return (value % 2) == 0; } }, new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } } @@ -89,13 +89,13 @@ public boolean apply(String key, Integer value) { KStream[] streams3 = stream3.branch( new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return (value % 2) == 0; } }, new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } } @@ -119,7 +119,7 @@ public Integer apply(Integer value1, Integer value2) { stream4.to("topic-5"); - stream5.through("topic-6").process(new MockProcessorDef<>()).to("topic-7"); + stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7"); assertEquals(2 + // sources 2 + // stream1 diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java index 7dea8e08863d2..58899faeda094 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; @@ -90,10 +90,10 @@ public void testJoin() { KStream stream2; KStreamWindowed windowed1; KStreamWindowed windowed2; - MockProcessorDef processor; + MockProcessorSupplier processor; String[] expected; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream2 = builder.from(keyDeserializer, valDeserializer, topic2); windowed1 = stream1.with(new UnlimitedWindowDef("window1")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index bec524f37397a..2ae8a97515dcb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -51,9 +51,9 @@ public KeyValue apply(Integer key, String value) { final int[] expectedKeys = new int[]{0, 1, 2, 3}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.map(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java index b6507fe2f4dd6..f830c0010b338 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -50,7 +50,7 @@ public Integer apply(String value) { final int[] expectedKeys = {1, 10, 100, 1000}; KStream stream; - MockProcessorDef processor = new MockProcessorDef<>(); + MockProcessorSupplier processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.mapValues(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java index 48a9fc3d279e1..c3dc7e08129ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; @@ -46,11 +46,11 @@ public void testWindowedStream() { final int[] expectedKeys = new int[]{0, 1, 2, 3}; KStream stream; - WindowDef windowDef; + WindowSupplier windowSupplier; - windowDef = new UnlimitedWindowDef<>(windowName); + windowSupplier = new UnlimitedWindowDef<>(windowName); stream = builder.from(keyDeserializer, valDeserializer, topicName); - stream.with(windowDef); + stream.with(windowSupplier); KStreamTestDriver driver = new KStreamTestDriver(builder); Window window = (Window) driver.getStateStore(windowName); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 57a78ff87a896..00522d53f6167 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertEquals; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; public class TopologyBuilderTest { @@ -45,22 +45,22 @@ public void testAddProcessorWithSameName() { final TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source", "topic-1"); - builder.addProcessor("processor", new MockProcessorDef(), "source"); - builder.addProcessor("processor", new MockProcessorDef(), "source"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); } @Test(expected = TopologyException.class) public void testAddProcessorWithWrongParent() { final TopologyBuilder builder = new TopologyBuilder(); - builder.addProcessor("processor", new MockProcessorDef(), "source"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); } @Test(expected = TopologyException.class) public void testAddProcessorWithSelfParent() { final TopologyBuilder builder = new TopologyBuilder(); - builder.addProcessor("processor", new MockProcessorDef(), "processor"); + builder.addProcessor("processor", new MockProcessorSupplier(), "processor"); } @Test(expected = TopologyException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 1abb989e30de2..50a23ecbbe750 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -31,13 +31,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.InMemoryKeyValueStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.ProcessorTopologyTestDriver; import org.junit.After; import org.junit.Before; @@ -115,8 +115,8 @@ public void testTopologyMetadata() { builder.addSource("source-1", "topic-1"); builder.addSource("source-2", "topic-2", "topic-3"); - builder.addProcessor("processor-1", new MockProcessorDef(), "source-1"); - builder.addProcessor("processor-2", new MockProcessorDef(), "source-1", "source-2"); + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-1", "source-2"); builder.addSink("sink-1", "topic-3", "processor-1"); builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); @@ -308,10 +308,10 @@ public void punctuate(long streamTime) { } } - protected ProcessorDef define(final Processor processor) { - return new ProcessorDef() { + protected ProcessorSupplier define(final Processor processor) { + return new ProcessorSupplier() { @Override - public Processor instance() { + public Processor get() { return processor; } }; diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java similarity index 89% rename from streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java rename to streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index 918b4683317f9..f1aa16782d5f4 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -19,16 +19,17 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import java.util.ArrayList; -public class MockProcessorDef implements ProcessorDef { +public class MockProcessorSupplier implements ProcessorSupplier { public final ArrayList processed = new ArrayList<>(); public final ArrayList punctuated = new ArrayList<>(); - public Processor instance() { + @Override + public Processor get() { return new MockProcessor(); } diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java index b5a3b3cb9915d..04c8f61daca00 100644 --- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java +++ b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java @@ -20,14 +20,14 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.kstream.internals.FilteredIterator; import org.apache.kafka.streams.processor.internals.Stamped; import java.util.Iterator; import java.util.LinkedList; -public class UnlimitedWindowDef implements WindowDef { +public class UnlimitedWindowDef implements WindowSupplier { private final String name; @@ -39,7 +39,7 @@ public String name() { return name; } - public Window instance() { + public Window get() { return new UnlimitedWindow(); }