Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ public KeyValue<String, Integer> apply(String key, String value) {
}
}).filter(new Predicate<String, Integer>() {
@Override
public boolean apply(String key, Integer value) {
public boolean test(String key, Integer value) {
return true;
}
});

KStream<String, Integer>[] streams = stream2.branch(
new Predicate<String, Integer>() {
@Override
public boolean apply(String key, Integer value) {
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
},
new Predicate<String, Integer>() {
@Override
public boolean apply(String key, Integer value) {
public boolean test(String key, Integer value) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreaming;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.StreamingConfig;
Expand All @@ -36,10 +36,10 @@

public class ProcessorJob {

private static class MyProcessorDef implements ProcessorDef {
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {

@Override
public Processor<String, String> instance() {
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
Expand Down Expand Up @@ -102,7 +102,7 @@ public static void main(String[] args) throws Exception {

builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");

builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");

builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");

Expand Down
52 changes: 28 additions & 24 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,83 +19,86 @@

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/**
* KStream is an abstraction of a stream of key-value pairs.
*
* @param <K> the type of keys
* @param <V> the type of values
*/
public interface KStream<K, V> {

/**
* Creates a new stream consists of all elements of this stream which satisfy a predicate
*
* @param predicate the instance of Predicate
* @return KStream
* @return the stream with only those elements that satisfy the predicate
*/
KStream<K, V> filter(Predicate<K, V> predicate);

/**
* Creates a new stream consists all elements of this stream which do not satisfy a predicate
*
* @param predicate the instance of Predicate
* @return KStream
* @return the stream with only those elements that do not satisfy the predicate
*/
KStream<K, V> filterOut(Predicate<K, V> predicate);

/**
* Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
* Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
* @return KStream
* @return the mapped stream
*/
<K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);

/**
* Creates a new stream by transforming values by a mapper to all values of this stream
* Creates a new stream by applying transforming each value in this stream into a different value in the new stream.
*
* @param mapper the instance of ValueMapper
* @param <V1> the value type of the new stream
* @return KStream
* @return the mapped stream
*/
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);

/**
* Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable
* Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream.
*
* @param mapper the instance of KeyValueMapper
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
* @return KStream
* @return the mapped stream
*/
<K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);

/**
* Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable
* Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream.
*
* @param processor the instance of Processor
* @param <V1> the value type of the new stream
* @return KStream
* @return the mapped stream
*/
<V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);

/**
* Creates a new windowed stream using a specified window instance.
*
* @param windowDef the instance of Window
* @return KStream
* @return the windowed stream
*/
KStreamWindowed<K, V> with(WindowDef<K, V> windowDef);
KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef);

/**
* Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in
* Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
* supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
* a corresponding stream for the first predicate is evaluated true.
* An element will be dropped if none of the predicates evaluate true.
*
* @param predicates Instances of Predicate
* @return KStream
* @param predicates the ordered list of Predicate instances
* @return the new streams that each contain those elements for which their Predicate evaluated to true.
*/
KStream<K, V>[] branch(Predicate<K, V>... predicates);

Expand All @@ -106,7 +109,7 @@ public interface KStream<K, V> {
* @param topic the topic name
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
* @return KStream
* @return the new stream that consumes the given topic
*/
<K1, V1> KStream<K1, V1> through(String topic);

Expand All @@ -116,16 +119,16 @@ public interface KStream<K, V> {
*
* @param topic the topic name
* @param keySerializer key serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* if not specified the default key serializer defined in the configuration will be used
* @param valSerializer value serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* if not specified the default value serializer defined in the configuration will be used
* @param keyDeserializer key deserializer used to create the new KStream,
* if not specified the default deserializer defined in the configs will be used
* if not specified the default key deserializer defined in the configuration will be used
* @param valDeserializer value deserializer used to create the new KStream,
* if not specified the default deserializer defined in the configs will be used
* if not specified the default value deserializer defined in the configuration will be used
* @param <K1> the key type of the new stream
* @param <V1> the value type of the new stream
* @return KStream
* @return the new stream that consumes the given topic
*/
<K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer);

Expand All @@ -150,7 +153,8 @@ public interface KStream<K, V> {
/**
* Processes all elements in this stream by applying a processor.
*
* @param processorDef the class of ProcessorDef
* @param processorSupplier the supplier of the Processor to use
* @return the new stream containing the processed output
*/
<K1, V1> KStream<K1, V1> process(ProcessorDef<K, V> processorDef);
<K1, V1> KStream<K1, V1> process(ProcessorSupplier<K, V> processorSupplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.kafka.streams.kstream;

/**
* Represents a predicate (boolean-valued function) of two arguments.
*
* @param <K> the type of key
* @param <V> the type of value
*/
public interface Predicate<K, V> {

boolean apply(K key, V value);
boolean test(K key, V value);

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.LinkedList;
import java.util.Map;

public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
private final String name;
private final long duration;
private final int maxCount;
Expand All @@ -46,7 +46,7 @@ public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;

public SlidingWindowDef(
public SlidingWindowSupplier(
String name,
long duration,
int maxCount,
Expand All @@ -69,7 +69,7 @@ public String name() {
}

@Override
public Window<K, V> instance() {
public Window<K, V> get() {
return new SlidingWindow();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.kafka.streams.kstream;

public interface WindowDef<K, V> {
public interface WindowSupplier<K, V> {

String name();

Window<K, V> instance();
Window<K, V> get();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@

import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.kstream.Predicate;

class KStreamBranch<K, V> implements ProcessorDef<K, V> {
class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {

private final Predicate<K, V>[] predicates;

@SuppressWarnings("unchecked")
public KStreamBranch(Predicate... predicates) {
public KStreamBranch(Predicate<K, V> ... predicates) {
this.predicates = predicates;
}

@Override
public Processor<K, V> instance() {
public Processor<K, V> get() {
return new KStreamBranchProcessor();
}

private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
@Override
public void process(K key, V value) {
for (int i = 0; i < predicates.length; i++) {
if (predicates[i].apply(key, value)) {
if (predicates[i].test(key, value)) {
// use forward with childIndex here and then break the loop
// so that no record is going to be piped to multiple streams
context().forward(key, value, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorSupplier;

class KStreamFilter<K, V> implements ProcessorDef<K, V> {
class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {

private final Predicate<K, V> predicate;
private final boolean filterOut;
Expand All @@ -33,14 +33,14 @@ public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) {
}

@Override
public Processor<K, V> instance() {
public Processor<K, V> get() {
return new KStreamFilterProcessor();
}

private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
@Override
public void process(K key, V value) {
if (filterOut ^ predicate.apply(key, value)) {
if (filterOut ^ predicate.test(key, value)) {
context().forward(key, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorSupplier;

class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {

private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;

Expand All @@ -32,7 +32,7 @@ class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorDef<K1, V1> {
}

@Override
public Processor<K1, V1> instance() {
public Processor<K1, V1> get() {
return new KStreamFlatMapProcessor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorDef;
import org.apache.kafka.streams.processor.ProcessorSupplier;

class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {

private final ValueMapper<V1, ? extends Iterable<V2>> mapper;

Expand All @@ -31,7 +31,7 @@ class KStreamFlatMapValues<K1, V1, V2> implements ProcessorDef<K1, V1> {
}

@Override
public Processor<K1, V1> instance() {
public Processor<K1, V1> get() {
return new KStreamFlatMapValuesProcessor();
}

Expand Down
Loading