From 6d577f3716647c3046d40d4e41743a9777d81113 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 1 Oct 2015 11:07:51 -0500 Subject: [PATCH 1/2] KAFKA-2600 Renamed the method of KStream's Predicate to match Java 8's BiPredicate The 'apply' method of KStream's Predicate functional interface was renamed to 'test' to match the method name on java.util.function.BiPredicate. This will allow KStream's Predicate to extend BiPredicate when Kafka moves to Java 8, and for the KStream.filter and filterOut methods to accept BiPredicate, making it a bit easier for developers to use KStream. --- .../kafka/streams/examples/KStreamJob.java | 6 +-- .../apache/kafka/streams/kstream/KStream.java | 44 ++++++++++--------- .../kafka/streams/kstream/Predicate.java | 2 +- .../kstream/internals/KStreamBranch.java | 4 +- .../kstream/internals/KStreamFilter.java | 2 +- .../kstream/internals/KStreamBranchTest.java | 6 +-- .../kstream/internals/KStreamFilterTest.java | 2 +- .../kstream/internals/KStreamImplTest.java | 12 ++--- 8 files changed, 41 insertions(+), 37 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java index feb4ee7d41f2e..87368c1c46520 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java @@ -55,7 +55,7 @@ public KeyValue apply(String key, String value) { } }).filter(new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } }); @@ -63,13 +63,13 @@ public boolean apply(String key, Integer value) { KStream[] streams = stream2.branch( new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return (value % 2) == 0; } }, new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6c488cff177c5..21cf3169e3076 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -23,6 +23,9 @@ /** * KStream is an abstraction of a stream of key-value pairs. + * + * @param the type of keys + * @param the type of values */ public interface KStream { @@ -30,7 +33,7 @@ public interface KStream { * Creates a new stream consists of all elements of this stream which satisfy a predicate * * @param predicate the instance of Predicate - * @return KStream + * @return the stream with only those elements that satisfy the predicate */ KStream filter(Predicate predicate); @@ -38,45 +41,45 @@ public interface KStream { * Creates a new stream consists all elements of this stream which do not satisfy a predicate * * @param predicate the instance of Predicate - * @return KStream + * @return the stream with only those elements that do not satisfy the predicate */ KStream filterOut(Predicate predicate); /** - * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream + * Creates a new stream by applying transforming each element in this stream into a different element in the new stream. * * @param mapper the instance of KeyValueMapper * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream map(KeyValueMapper> mapper); /** - * Creates a new stream by transforming values by a mapper to all values of this stream + * Creates a new stream by applying transforming each value in this stream into a different value in the new stream. * * @param mapper the instance of ValueMapper * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream mapValues(ValueMapper mapper); /** - * Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable + * Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream. * * @param mapper the instance of KeyValueMapper * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream flatMap(KeyValueMapper>> mapper); /** - * Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable + * Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream. * * @param processor the instance of Processor * @param the value type of the new stream - * @return KStream + * @return the mapped stream */ KStream flatMapValues(ValueMapper> processor); @@ -84,18 +87,18 @@ public interface KStream { * Creates a new windowed stream using a specified window instance. * * @param windowDef the instance of Window - * @return KStream + * @return the windowed stream */ KStreamWindowed with(WindowDef windowDef); /** - * Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in + * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to * a corresponding stream for the first predicate is evaluated true. * An element will be dropped if none of the predicates evaluate true. * - * @param predicates Instances of Predicate - * @return KStream + * @param predicates the ordered list of Predicate instances + * @return the new streams that each contain those elements for which their Predicate evaluated to true. */ KStream[] branch(Predicate... predicates); @@ -106,7 +109,7 @@ public interface KStream { * @param topic the topic name * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the new stream that consumes the given topic */ KStream through(String topic); @@ -116,16 +119,16 @@ public interface KStream { * * @param topic the topic name * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used + * if not specified the default key serializer defined in the configuration will be used * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used + * if not specified the default value serializer defined in the configuration will be used * @param keyDeserializer key deserializer used to create the new KStream, - * if not specified the default deserializer defined in the configs will be used + * if not specified the default key deserializer defined in the configuration will be used * @param valDeserializer value deserializer used to create the new KStream, - * if not specified the default deserializer defined in the configs will be used + * if not specified the default value deserializer defined in the configuration will be used * @param the key type of the new stream * @param the value type of the new stream - * @return KStream + * @return the new stream that consumes the given topic */ KStream through(String topic, Serializer keySerializer, Serializer valSerializer, Deserializer keyDeserializer, Deserializer valDeserializer); @@ -151,6 +154,7 @@ public interface KStream { * Processes all elements in this stream by applying a processor. * * @param processorDef the class of ProcessorDef + * @return the new stream containing the processed output */ KStream process(ProcessorDef processorDef); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java index 9cdb3bc6caa56..b2c48f8edaf58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -19,6 +19,6 @@ public interface Predicate { - boolean apply(K key, V value); + boolean test(K key, V value); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index c8061479cff02..83cc2bbc26124 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -27,7 +27,7 @@ class KStreamBranch implements ProcessorDef { private final Predicate[] predicates; @SuppressWarnings("unchecked") - public KStreamBranch(Predicate... predicates) { + public KStreamBranch(Predicate ... predicates) { this.predicates = predicates; } @@ -40,7 +40,7 @@ private class KStreamBranchProcessor extends AbstractProcessor { @Override public void process(K key, V value) { for (int i = 0; i < predicates.length; i++) { - if (predicates[i].apply(key, value)) { + if (predicates[i].test(key, value)) { // use forward with childIndex here and then break the loop // so that no record is going to be piped to multiple streams context().forward(key, value, i); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index 22800f3cd2d63..50d56b7813167 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -40,7 +40,7 @@ public Processor instance() { private class KStreamFilterProcessor extends AbstractProcessor { @Override public void process(K key, V value) { - if (filterOut ^ predicate.apply(key, value)) { + if (filterOut ^ predicate.test(key, value)) { context().forward(key, value); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index c18ddfe3ca041..8a9319b68dea2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -44,19 +44,19 @@ public void testKStreamBranch() { Predicate isEven = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 2) == 0; } }; Predicate isMultipleOfThree = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 3) == 0; } }; Predicate isOdd = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 2) != 0; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index b80e1e2028ef7..264039e0c9f65 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -38,7 +38,7 @@ public class KStreamFilterTest { private Predicate isMultipleOfThree = new Predicate() { @Override - public boolean apply(Integer key, String value) { + public boolean test(Integer key, String value) { return (key % 3) == 0; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 0660ddd87a54f..23f88010d57c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -47,12 +47,12 @@ public void testNumProcesses() { KStream stream1 = source1.filter(new Predicate() { @Override - public boolean apply(String key, String value) { + public boolean test(String key, String value) { return true; } }).filterOut(new Predicate() { @Override - public boolean apply(String key, String value) { + public boolean test(String key, String value) { return false; } }); @@ -74,13 +74,13 @@ public Iterable apply(String value) { KStream[] streams2 = stream2.branch( new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return (value % 2) == 0; } }, new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } } @@ -89,13 +89,13 @@ public boolean apply(String key, Integer value) { KStream[] streams3 = stream3.branch( new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return (value % 2) == 0; } }, new Predicate() { @Override - public boolean apply(String key, Integer value) { + public boolean test(String key, Integer value) { return true; } } From 04ee56d2a130478d06219b67533a73e8454f39c7 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 1 Oct 2015 11:25:12 -0500 Subject: [PATCH 2/2] KAFKA-2600 Renamed the *Def interfaces to *Supplier To better align with Java 8's Supplier interface, the ProcessorDef, WindowDef, and SlidingWindowDef interfaces/classes were renamed to ProcessorSupplier, WindowSupplier, and SlidingWindowSupplier, and their 'instance' methods were renamed to 'get'. When Kafka moves to Java 8, these interfaces/classes can extend/implement Java 8's Supplier, and these interfaces can be eventually removed. --- .../kafka/streams/examples/ProcessorJob.java | 8 ++++---- .../apache/kafka/streams/kstream/KStream.java | 8 ++++---- .../kafka/streams/kstream/Predicate.java | 6 ++++++ ...dowDef.java => SlidingWindowSupplier.java} | 6 +++--- .../{WindowDef.java => WindowSupplier.java} | 4 ++-- .../kstream/internals/KStreamBranch.java | 6 +++--- .../kstream/internals/KStreamFilter.java | 6 +++--- .../kstream/internals/KStreamFlatMap.java | 6 +++--- .../internals/KStreamFlatMapValues.java | 6 +++--- .../kstream/internals/KStreamImpl.java | 20 +++++++++---------- .../kstream/internals/KStreamJoin.java | 7 ++++--- .../streams/kstream/internals/KStreamMap.java | 6 +++--- .../kstream/internals/KStreamMapValues.java | 8 ++++---- .../kstream/internals/KStreamPassThrough.java | 8 ++++---- .../kstream/internals/KStreamWindow.java | 20 +++++++++---------- .../internals/KStreamWindowedImpl.java | 12 +++++------ ...ocessorDef.java => ProcessorSupplier.java} | 4 ++-- .../streams/processor/TopologyBuilder.java | 19 +++++++++--------- .../kstream/internals/KStreamBranchTest.java | 8 ++++---- .../kstream/internals/KStreamFilterTest.java | 10 +++++----- .../kstream/internals/KStreamFlatMapTest.java | 6 +++--- .../internals/KStreamFlatMapValuesTest.java | 6 +++--- .../kstream/internals/KStreamImplTest.java | 4 ++-- .../kstream/internals/KStreamJoinTest.java | 6 +++--- .../kstream/internals/KStreamMapTest.java | 6 +++--- .../internals/KStreamMapValuesTest.java | 4 ++-- .../internals/KStreamWindowedTest.java | 8 ++++---- .../processor/TopologyBuilderTest.java | 10 +++++----- .../internals/ProcessorTopologyTest.java | 14 ++++++------- ...sorDef.java => MockProcessorSupplier.java} | 7 ++++--- .../apache/kafka/test/UnlimitedWindowDef.java | 6 +++--- 31 files changed, 131 insertions(+), 124 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/kstream/{SlidingWindowDef.java => SlidingWindowSupplier.java} (98%) rename streams/src/main/java/org/apache/kafka/streams/kstream/{WindowDef.java => WindowSupplier.java} (93%) rename streams/src/main/java/org/apache/kafka/streams/processor/{ProcessorDef.java => ProcessorSupplier.java} (92%) rename streams/src/test/java/org/apache/kafka/test/{MockProcessorDef.java => MockProcessorSupplier.java} (89%) diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java index 0b3aba8716aed..92e6284e73cfe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KafkaStreaming; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.StreamingConfig; @@ -36,10 +36,10 @@ public class ProcessorJob { - private static class MyProcessorDef implements ProcessorDef { + private static class MyProcessorSupplier implements ProcessorSupplier { @Override - public Processor instance() { + public Processor get() { return new Processor() { private ProcessorContext context; private KeyValueStore kvStore; @@ -102,7 +102,7 @@ public static void main(String[] args) throws Exception { builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); - builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE"); + builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE"); builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 21cf3169e3076..ecec882493382 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; /** * KStream is an abstraction of a stream of key-value pairs. @@ -89,7 +89,7 @@ public interface KStream { * @param windowDef the instance of Window * @return the windowed stream */ - KStreamWindowed with(WindowDef windowDef); + KStreamWindowed with(WindowSupplier windowDef); /** * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in @@ -153,8 +153,8 @@ public interface KStream { /** * Processes all elements in this stream by applying a processor. * - * @param processorDef the class of ProcessorDef + * @param processorSupplier the supplier of the Processor to use * @return the new stream containing the processed output */ - KStream process(ProcessorDef processorDef); + KStream process(ProcessorSupplier processorSupplier); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java index b2c48f8edaf58..c73622e460eca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -17,6 +17,12 @@ package org.apache.kafka.streams.kstream; +/** + * Represents a predicate (boolean-valued function) of two arguments. + * + * @param the type of key + * @param the type of value + */ public interface Predicate { boolean test(K key, V value); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java similarity index 98% rename from streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java index 5927db691db47..0110c875a1419 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java @@ -37,7 +37,7 @@ import java.util.LinkedList; import java.util.Map; -public class SlidingWindowDef implements WindowDef { +public class SlidingWindowSupplier implements WindowSupplier { private final String name; private final long duration; private final int maxCount; @@ -46,7 +46,7 @@ public class SlidingWindowDef implements WindowDef { private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; - public SlidingWindowDef( + public SlidingWindowSupplier( String name, long duration, int maxCount, @@ -69,7 +69,7 @@ public String name() { } @Override - public Window instance() { + public Window get() { return new SlidingWindow(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java similarity index 93% rename from streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java index bbc5979300b79..46a2b9ee91622 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.kstream; -public interface WindowDef { +public interface WindowSupplier { String name(); - Window instance(); + Window get(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index 83cc2bbc26124..06083b30b7d0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -19,10 +19,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.kstream.Predicate; -class KStreamBranch implements ProcessorDef { +class KStreamBranch implements ProcessorSupplier { private final Predicate[] predicates; @@ -32,7 +32,7 @@ public KStreamBranch(Predicate ... predicates) { } @Override - public Processor instance() { + public Processor get() { return new KStreamBranchProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index 50d56b7813167..0b1f1e05a4a47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -20,9 +20,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFilter implements ProcessorDef { +class KStreamFilter implements ProcessorSupplier { private final Predicate predicate; private final boolean filterOut; @@ -33,7 +33,7 @@ public KStreamFilter(Predicate predicate, boolean filterOut) { } @Override - public Processor instance() { + public Processor get() { return new KStreamFilterProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index 6c7f4eac09ae3..175a002d17e85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -21,9 +21,9 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFlatMap implements ProcessorDef { +class KStreamFlatMap implements ProcessorSupplier { private final KeyValueMapper>> mapper; @@ -32,7 +32,7 @@ class KStreamFlatMap implements ProcessorDef { } @Override - public Processor instance() { + public Processor get() { return new KStreamFlatMapProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index 9cdcdf5b8af28..9b4559bd769c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -20,9 +20,9 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamFlatMapValues implements ProcessorDef { +class KStreamFlatMapValues implements ProcessorSupplier { private final ValueMapper> mapper; @@ -31,7 +31,7 @@ class KStreamFlatMapValues implements ProcessorDef { } @Override - public Processor instance() { + public Processor get() { return new KStreamFlatMapValuesProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 52c717feee7f2..cff97d6c31f9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -19,15 +19,15 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.processor.ProcessorDef; -import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; +import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; import java.lang.reflect.Array; import java.util.concurrent.atomic.AtomicInteger; @@ -127,12 +127,12 @@ public KStream flatMapValues(ValueMapper> mapper) { } @Override - public KStreamWindowed with(WindowDef window) { + public KStreamWindowed with(WindowSupplier windowSupplier) { String name = WINDOWED_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, new KStreamWindow<>(window), this.name); + topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name); - return new KStreamWindowedImpl<>(topology, name, window); + return new KStreamWindowedImpl<>(topology, name, windowSupplier); } @Override @@ -191,10 +191,10 @@ public void to(String topic, Serializer keySerializer, Serializer valSeria } @Override - public KStream process(final ProcessorDef processorDef) { + public KStream process(final ProcessorSupplier processorSupplier) { String name = PROCESSOR_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, processorDef, this.name); + topology.addProcessor(name, processorSupplier, this.name); return new KStreamImpl<>(topology, name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java index 311efef4c26ac..997953f579d55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java @@ -22,11 +22,11 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import java.util.Iterator; -class KStreamJoin implements ProcessorDef { +class KStreamJoin implements ProcessorSupplier { private static abstract class Finder { abstract Iterator find(K key, long timestamp); @@ -41,7 +41,7 @@ private static abstract class Finder { } @Override - public Processor instance() { + public Processor get() { return new KStreamJoinProcessor(windowName); } @@ -66,6 +66,7 @@ public void init(ProcessorContext context) { final Window window = (Window) context.getStateStore(windowName); this.finder = new Finder() { + @Override Iterator find(K key, long timestamp) { return window.find(key, timestamp); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index a9a7b24c9382b..3868318f5d136 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -21,9 +21,9 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamMap implements ProcessorDef { +class KStreamMap implements ProcessorSupplier { private final KeyValueMapper> mapper; @@ -32,7 +32,7 @@ public KStreamMap(KeyValueMapper> mapper) { } @Override - public Processor instance() { + public Processor get() { return new KStreamMapProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index ac39f37184a0b..692b421599c06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -17,12 +17,12 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamMapValues implements ProcessorDef { +class KStreamMapValues implements ProcessorSupplier { private final ValueMapper mapper; @@ -31,7 +31,7 @@ public KStreamMapValues(ValueMapper mapper) { } @Override - public Processor instance() { + public Processor get() { return new KStreamMapProcessor(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java index 0f4638d52b95a..59a815ba2b9b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java @@ -19,13 +19,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -class KStreamPassThrough implements ProcessorDef { +class KStreamPassThrough implements ProcessorSupplier { @Override - public Processor instance() { - return new KStreamPassThroughProcessor(); + public Processor get() { + return new KStreamPassThroughProcessor(); } public class KStreamPassThroughProcessor extends AbstractProcessor { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java index bdd132304b600..29239364a8093 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java @@ -18,26 +18,26 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorDef; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; -public class KStreamWindow implements ProcessorDef { +public class KStreamWindow implements ProcessorSupplier { - private final WindowDef windowDef; + private final WindowSupplier windowSupplier; - KStreamWindow(WindowDef windowDef) { - this.windowDef = windowDef; + KStreamWindow(WindowSupplier windowSupplier) { + this.windowSupplier = windowSupplier; } - public WindowDef window() { - return windowDef; + public WindowSupplier window() { + return windowSupplier; } @Override - public Processor instance() { + public Processor get() { return new KStreamWindowProcessor(); } @@ -48,7 +48,7 @@ private class KStreamWindowProcessor extends AbstractProcessor { @Override public void init(ProcessorContext context) { super.init(context); - this.window = windowDef.instance(); + this.window = windowSupplier.get(); this.window.init(context); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java index a208af6cca137..93160122e0ce7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java @@ -20,22 +20,22 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; public final class KStreamWindowedImpl extends KStreamImpl implements KStreamWindowed { - private final WindowDef windowDef; + private final WindowSupplier windowSupplier; - public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowDef windowDef) { + public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowSupplier windowSupplier) { super(topology, name); - this.windowDef = windowDef; + this.windowSupplier = windowSupplier; } @Override public KStream join(KStreamWindowed other, ValueJoiner valueJoiner) { - String thisWindowName = this.windowDef.name(); - String otherWindowName = ((KStreamWindowedImpl) other).windowDef.name(); + String thisWindowName = this.windowSupplier.name(); + String otherWindowName = ((KStreamWindowedImpl) other).windowSupplier.name(); KStreamJoin joinThis = new KStreamJoin<>(otherWindowName, valueJoiner); KStreamJoin joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java similarity index 92% rename from streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java rename to streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java index a32a899ec2744..719d3ac2a34ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorDef.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.processor; -public interface ProcessorDef { +public interface ProcessorSupplier { - Processor instance(); + Processor get(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index a254c131c1a47..833e29b9d9eef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -57,18 +57,17 @@ private interface NodeFactory { private class ProcessorNodeFactory implements NodeFactory { public final String[] parents; private final String name; - private final ProcessorDef definition; + private final ProcessorSupplier supplier; - public ProcessorNodeFactory(String name, String[] parents, ProcessorDef definition) { + public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) { this.name = name; this.parents = parents.clone(); - this.definition = definition; + this.supplier = supplier; } @Override public ProcessorNode build() { - Processor processor = definition.instance(); - return new ProcessorNode(name, processor); + return new ProcessorNode(name, supplier.get()); } } @@ -123,7 +122,7 @@ public TopologyBuilder() {} * {@link StreamingConfig streaming configuration}. * * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}. + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ @@ -136,7 +135,7 @@ public final TopologyBuilder addSource(String name, String... topics) { * The sink will use the specified key and value deserializers. * * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorDef, String...) adding processor children}. + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the * {@link StreamingConfig streaming configuration} @@ -216,12 +215,12 @@ public final TopologyBuilder addSink(String name, String topic, Serializer keySe * Add a new processor node that receives and processes messages output by one or more parent source or processor node. * Any new messages output by this processor will be forwarded to its child processor or sink nodes. * @param name the unique name of the processor node - * @param definition the supplier used to obtain this node's {@link Processor} instance + * @param supplier the supplier used to obtain this node's {@link Processor} instance * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive * and process * @return this builder instance so methods can be chained together; never null */ - public final TopologyBuilder addProcessor(String name, ProcessorDef definition, String... parentNames) { + public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) { if (nodeNames.contains(name)) throw new TopologyException("Processor " + name + " is already added."); @@ -237,7 +236,7 @@ public final TopologyBuilder addProcessor(String name, ProcessorDef definition, } nodeNames.add(name); - nodeFactories.add(new ProcessorNodeFactory(name, parentNames, definition)); + nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier)); return this; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index 8a9319b68dea2..40eba2f103a5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import java.lang.reflect.Array; @@ -65,16 +65,16 @@ public boolean test(Integer key, String value) { KStream stream; KStream[] branches; - MockProcessorDef[] processors; + MockProcessorSupplier[] processors; stream = builder.from(keyDeserializer, valDeserializer, topicName); branches = stream.branch(isEven, isMultipleOfThree, isOdd); assertEquals(3, branches.length); - processors = (MockProcessorDef[]) Array.newInstance(MockProcessorDef.class, branches.length); + processors = (MockProcessorSupplier[]) Array.newInstance(MockProcessorSupplier.class, branches.length); for (int i = 0; i < branches.length; i++) { - processors[i] = new MockProcessorDef<>(); + processors[i] = new MockProcessorSupplier<>(); branches[i].process(processors[i]); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index 264039e0c9f65..d1e5d38a987f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; @@ -49,9 +49,9 @@ public void testFilter() { final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.filter(isMultipleOfThree).process(processor); @@ -69,9 +69,9 @@ public void testFilterOut() { final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.filterOut(isMultipleOfThree).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index e87223e73d886..61b5ccd5a1dc4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -57,9 +57,9 @@ public Iterable> apply(Integer key, String value) { final int[] expectedKeys = {0, 1, 2, 3}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.flatMap(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java index 09dda65fc3cb6..66faf077885c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import java.util.ArrayList; @@ -55,9 +55,9 @@ public Iterable apply(String value) { final int[] expectedKeys = {0, 1, 2, 3}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.flatMapValues(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 23f88010d57c2..875712a04ef6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; @@ -119,7 +119,7 @@ public Integer apply(Integer value1, Integer value2) { stream4.to("topic-5"); - stream5.through("topic-6").process(new MockProcessorDef<>()).to("topic-7"); + stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7"); assertEquals(2 + // sources 2 + // stream1 diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java index 7dea8e08863d2..58899faeda094 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; @@ -90,10 +90,10 @@ public void testJoin() { KStream stream2; KStreamWindowed windowed1; KStreamWindowed windowed2; - MockProcessorDef processor; + MockProcessorSupplier processor; String[] expected; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream1 = builder.from(keyDeserializer, valDeserializer, topic1); stream2 = builder.from(keyDeserializer, valDeserializer, topic2); windowed1 = stream1.with(new UnlimitedWindowDef("window1")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index bec524f37397a..2ae8a97515dcb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -51,9 +51,9 @@ public KeyValue apply(Integer key, String value) { final int[] expectedKeys = new int[]{0, 1, 2, 3}; KStream stream; - MockProcessorDef processor; + MockProcessorSupplier processor; - processor = new MockProcessorDef<>(); + processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.map(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java index b6507fe2f4dd6..f830c0010b338 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -50,7 +50,7 @@ public Integer apply(String value) { final int[] expectedKeys = {1, 10, 100, 1000}; KStream stream; - MockProcessorDef processor = new MockProcessorDef<>(); + MockProcessorSupplier processor = new MockProcessorSupplier<>(); stream = builder.from(keyDeserializer, valDeserializer, topicName); stream.mapValues(mapper).process(processor); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java index 48a9fc3d279e1..c3dc7e08129ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.UnlimitedWindowDef; import org.junit.Test; @@ -46,11 +46,11 @@ public void testWindowedStream() { final int[] expectedKeys = new int[]{0, 1, 2, 3}; KStream stream; - WindowDef windowDef; + WindowSupplier windowSupplier; - windowDef = new UnlimitedWindowDef<>(windowName); + windowSupplier = new UnlimitedWindowDef<>(windowName); stream = builder.from(keyDeserializer, valDeserializer, topicName); - stream.with(windowDef); + stream.with(windowSupplier); KStreamTestDriver driver = new KStreamTestDriver(builder); Window window = (Window) driver.getStateStore(windowName); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 57a78ff87a896..00522d53f6167 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertEquals; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; public class TopologyBuilderTest { @@ -45,22 +45,22 @@ public void testAddProcessorWithSameName() { final TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source", "topic-1"); - builder.addProcessor("processor", new MockProcessorDef(), "source"); - builder.addProcessor("processor", new MockProcessorDef(), "source"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); } @Test(expected = TopologyException.class) public void testAddProcessorWithWrongParent() { final TopologyBuilder builder = new TopologyBuilder(); - builder.addProcessor("processor", new MockProcessorDef(), "source"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); } @Test(expected = TopologyException.class) public void testAddProcessorWithSelfParent() { final TopologyBuilder builder = new TopologyBuilder(); - builder.addProcessor("processor", new MockProcessorDef(), "processor"); + builder.addProcessor("processor", new MockProcessorSupplier(), "processor"); } @Test(expected = TopologyException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 1abb989e30de2..50a23ecbbe750 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -31,13 +31,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.InMemoryKeyValueStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.ProcessorTopologyTestDriver; import org.junit.After; import org.junit.Before; @@ -115,8 +115,8 @@ public void testTopologyMetadata() { builder.addSource("source-1", "topic-1"); builder.addSource("source-2", "topic-2", "topic-3"); - builder.addProcessor("processor-1", new MockProcessorDef(), "source-1"); - builder.addProcessor("processor-2", new MockProcessorDef(), "source-1", "source-2"); + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-1", "source-2"); builder.addSink("sink-1", "topic-3", "processor-1"); builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); @@ -308,10 +308,10 @@ public void punctuate(long streamTime) { } } - protected ProcessorDef define(final Processor processor) { - return new ProcessorDef() { + protected ProcessorSupplier define(final Processor processor) { + return new ProcessorSupplier() { @Override - public Processor instance() { + public Processor get() { return processor; } }; diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java similarity index 89% rename from streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java rename to streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index 918b4683317f9..f1aa16782d5f4 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorDef.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -19,16 +19,17 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.ProcessorSupplier; import java.util.ArrayList; -public class MockProcessorDef implements ProcessorDef { +public class MockProcessorSupplier implements ProcessorSupplier { public final ArrayList processed = new ArrayList<>(); public final ArrayList punctuated = new ArrayList<>(); - public Processor instance() { + @Override + public Processor get() { return new MockProcessor(); } diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java index b5a3b3cb9915d..04c8f61daca00 100644 --- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java +++ b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java @@ -20,14 +20,14 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.kstream.internals.FilteredIterator; import org.apache.kafka.streams.processor.internals.Stamped; import java.util.Iterator; import java.util.LinkedList; -public class UnlimitedWindowDef implements WindowDef { +public class UnlimitedWindowDef implements WindowSupplier { private final String name; @@ -39,7 +39,7 @@ public String name() { return name; } - public Window instance() { + public Window get() { return new UnlimitedWindow(); }