emptyList() : other;
+ }
}
diff --git a/settings.gradle b/settings.gradle
index 9c7fea5dce257..357305ba23e8c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,5 +14,5 @@
// limitations under the License.
apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender',
- 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'
+include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+ 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
new file mode 100644
index 0000000000000..f3a99e0168caa
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
+ * sends output to zero or more output topics.
+ *
+ * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
+ * the transformation.
+ * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
+ * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ *
+ * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
+ * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
+ * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
+ * started in the appropriate processes to balance processing load.
+ *
+ * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
+ * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
+ *
+ * A simple example might look like this:
+ *
+ * Map<String, Object> props = new HashMap<>();
+ * props.put("bootstrap.servers", "localhost:4242");
+ * props.put("key.deserializer", StringDeserializer.class);
+ * props.put("value.deserializer", StringDeserializer.class);
+ * props.put("key.serializer", StringSerializer.class);
+ * props.put("value.serializer", IntegerSerializer.class);
+ * props.put("timestamp.extractor", MyTimestampExtractor.class);
+ * StreamingConfig config = new StreamingConfig(props);
+ *
+ * KStreamBuilder builder = new KStreamBuilder();
+ * builder.from("topic1").mapValue(value -> value.length()).to("topic2");
+ *
+ * KafkaStreaming streaming = new KafkaStreaming(builder, config);
+ * streaming.start();
+ *
+ *
+ */
+public class KafkaStreaming {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
+
+ // Container States
+ private static final int CREATED = 0;
+ private static final int RUNNING = 1;
+ private static final int STOPPED = 2;
+ private int state = CREATED;
+
+ private final StreamThread[] threads;
+
+ public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
+ this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
+ for (int i = 0; i < this.threads.length; i++) {
+ this.threads[i] = new StreamThread(builder, config);
+ }
+ }
+
+ /**
+ * Start the stream process by starting all its threads
+ */
+ public synchronized void start() {
+ log.debug("Starting Kafka Stream process");
+
+ if (state == CREATED) {
+ for (StreamThread thread : threads)
+ thread.start();
+
+ state = RUNNING;
+
+ log.info("Started Kafka Stream process");
+ } else {
+ throw new IllegalStateException("This process was already started.");
+ }
+ }
+
+ /**
+ * Shutdown this stream process by signaling the threads to stop,
+ * wait for them to join and clean up the process instance.
+ */
+ public synchronized void close() {
+ log.debug("Stopping Kafka Stream process");
+
+ if (state == RUNNING) {
+ // signal the threads to stop and wait
+ for (StreamThread thread : threads)
+ thread.close();
+
+ for (StreamThread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ }
+ }
+
+ state = STOPPED;
+
+ log.info("Stopped Kafka Stream process");
+ } else {
+ throw new IllegalStateException("This process has not started yet.");
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
new file mode 100644
index 0000000000000..dce69b6000cfd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Map;
+
+public class StreamingConfig extends AbstractConfig {
+
+ private static final ConfigDef CONFIG;
+
+ /** state.dir */
+ public static final String STATE_DIR_CONFIG = "state.dir";
+ private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+ /** commit.interval.ms */
+ public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+ private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+
+ /** poll.ms */
+ public static final String POLL_MS_CONFIG = "poll.ms";
+ private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+
+ /** num.stream.threads */
+ public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+ private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+
+ /** buffered.records.per.partition */
+ public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
+ private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
+
+ /** state.cleanup.delay */
+ public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+ private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
+
+ /** total.records.to.process */
+ public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
+ private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
+
+ /** window.time.ms */
+ public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
+ private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
+ + "with this frequency even if there is no message.";
+
+ /** timestamp.extractor */
+ public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+ private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the TimestampExtractor interface.";
+
+ /** client.id */
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+ /** key.serializer */
+ public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+ /** value.serializer */
+ public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+ /** key.deserializer */
+ public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+ /** value.deserializer */
+ public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+ /**
+ * bootstrap.servers
+ */
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+ private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+ static {
+ CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
+ Type.STRING,
+ "",
+ Importance.MEDIUM,
+ CommonClientConfigs.CLIENT_ID_DOC)
+ .define(STATE_DIR_CONFIG,
+ Type.STRING,
+ SYSTEM_TEMP_DIRECTORY,
+ Importance.MEDIUM,
+ STATE_DIR_DOC)
+ .define(COMMIT_INTERVAL_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ Importance.HIGH,
+ COMMIT_INTERVAL_MS_DOC)
+ .define(POLL_MS_CONFIG,
+ Type.LONG,
+ 100,
+ Importance.LOW,
+ POLL_MS_DOC)
+ .define(NUM_STREAM_THREADS_CONFIG,
+ Type.INT,
+ 1,
+ Importance.LOW,
+ NUM_STREAM_THREADS_DOC)
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ 1000,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+ .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+ Type.LONG,
+ 60000,
+ Importance.LOW,
+ STATE_CLEANUP_DELAY_MS_DOC)
+ .define(TOTAL_RECORDS_TO_PROCESS,
+ Type.LONG,
+ -1L,
+ Importance.LOW,
+ TOTAL_RECORDS_TO_DOC)
+ .define(WINDOW_TIME_MS_CONFIG,
+ Type.LONG,
+ -1L,
+ Importance.MEDIUM,
+ WINDOW_TIME_MS_DOC)
+ .define(KEY_SERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
+ .define(VALUE_SERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
+ .define(KEY_DESERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
+ .define(VALUE_DESERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
+ .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(BOOTSTRAP_SERVERS_CONFIG,
+ Type.STRING,
+ Importance.HIGH,
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC);
+ }
+
+ public StreamingConfig(Map, ?> props) {
+ super(CONFIG, props);
+ }
+
+ public Map getConsumerConfigs() {
+ Map props = this.originals();
+
+ // set consumer default property values
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range");
+
+ // remove properties that are not required for consumers
+ props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+ return props;
+ }
+
+ public Map getProducerConfigs() {
+ Map props = this.originals();
+
+ // set producer default property values
+ props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+ // remove properties that are not required for producers
+ props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+ return props;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(CONFIG.toHtmlTable());
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
new file mode 100644
index 0000000000000..feb4ee7d41f2e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+
+import java.util.Properties;
+
+public class KStreamJob {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
+ props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamingConfig config = new StreamingConfig(props);
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream stream1 = builder.from("topic1");
+
+ KStream stream2 =
+ stream1.map(new KeyValueMapper>() {
+ @Override
+ public KeyValue apply(String key, String value) {
+ return new KeyValue<>(key, new Integer(value));
+ }
+ }).filter(new Predicate() {
+ @Override
+ public boolean apply(String key, Integer value) {
+ return true;
+ }
+ });
+
+ KStream[] streams = stream2.branch(
+ new Predicate() {
+ @Override
+ public boolean apply(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ },
+ new Predicate() {
+ @Override
+ public boolean apply(String key, Integer value) {
+ return true;
+ }
+ }
+ );
+
+ streams[0].to("topic2");
+ streams[1].to("topic3");
+
+ KafkaStreaming kstream = new KafkaStreaming(builder, config);
+ kstream.start();
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
new file mode 100644
index 0000000000000..0b3aba8716aed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Properties;
+
+public class ProcessorJob {
+
+ private static class MyProcessorDef implements ProcessorDef {
+
+ @Override
+ public Processor instance() {
+ return new Processor() {
+ private ProcessorContext context;
+ private KeyValueStore kvStore;
+
+ @Override
+ public void init(ProcessorContext context) {
+ this.context = context;
+ this.context.schedule(1000);
+ this.kvStore = new InMemoryKeyValueStore<>("local-state", context);
+ }
+
+ @Override
+ public void process(String key, String value) {
+ Integer oldValue = this.kvStore.get(key);
+ Integer newValue = Integer.parseInt(value);
+ if (oldValue == null) {
+ this.kvStore.put(key, newValue);
+ } else {
+ this.kvStore.put(key, oldValue + newValue);
+ }
+
+ context.commit();
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ KeyValueIterator iter = this.kvStore.all();
+
+ while (iter.hasNext()) {
+ Entry entry = iter.next();
+
+ System.out.println("[" + entry.key() + ", " + entry.value() + "]");
+
+ context.forward(entry.key(), entry.value());
+ }
+
+ iter.close();
+ }
+
+ @Override
+ public void close() {
+ this.kvStore.close();
+ }
+ };
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
+ props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamingConfig config = new StreamingConfig(props);
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
+
+ builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
+
+ builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
+
+ KafkaStreaming streaming = new KafkaStreaming(builder, config);
+ streaming.start();
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
new file mode 100644
index 0000000000000..26281d69d0293
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class WallclockTimestampExtractor implements TimestampExtractor {
+ @Override
+ public long extract(ConsumerRecord record) {
+ return System.currentTimeMillis();
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
new file mode 100644
index 0000000000000..7f101ab48af91
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+/**
+ * KStream is an abstraction of a stream of key-value pairs.
+ */
+public interface KStream {
+
+ /**
+ * Creates a new stream consists of all elements of this stream which satisfy a predicate
+ *
+ * @param predicate the instance of Predicate
+ * @return KStream
+ */
+ KStream filter(Predicate predicate);
+
+ /**
+ * Creates a new stream consists all elements of this stream which do not satisfy a predicate
+ *
+ * @param predicate the instance of Predicate
+ * @return KStream
+ */
+ KStream filterOut(Predicate predicate);
+
+ /**
+ * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
+ *
+ * @param mapper the instance of KeyValueMapper
+ * @param the key type of the new stream
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream map(KeyValueMapper> mapper);
+
+ /**
+ * Creates a new stream by transforming values by a mapper to all values of this stream
+ *
+ * @param mapper the instance of ValueMapper
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream mapValues(ValueMapper mapper);
+
+ /**
+ * Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable
+ *
+ * @param mapper the instance of KeyValueMapper
+ * @param the key type of the new stream
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream flatMap(KeyValueMapper>> mapper);
+
+ /**
+ * Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable
+ *
+ * @param processor the instance of Processor
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream flatMapValues(ValueMapper> processor);
+
+ /**
+ * Creates a new windowed stream using a specified window instance.
+ *
+ * @param windowDef the instance of Window
+ * @return KStream
+ */
+ KStreamWindowed with(WindowDef windowDef);
+
+ /**
+ * Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in
+ * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
+ * a corresponding stream for the first predicate is evaluated true.
+ * An element will be dropped if none of the predicates evaluate true.
+ *
+ * @param predicates Instances of Predicate
+ * @return KStream
+ */
+ KStream[] branch(Predicate... predicates);
+
+ /**
+ * Sends key-value to a topic, also creates a new stream from the topic.
+ * This is equivalent to calling to(topic) and from(topic).
+ *
+ * @param topic the topic name
+ * @param the key type of the new stream
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream through(String topic);
+
+ /**
+ * Sends key-value to a topic, also creates a new stream from the topic.
+ * This is equivalent to calling to(topic) and from(topic).
+ *
+ * @param topic the topic name
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ * @param keyDeserializer key deserializer used to create the new KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param valDeserializer value deserializer used to create the new KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param the key type of the new stream
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream through(String topic, Serializer keySerializer, Serializer valSerializer, Deserializer keyDeserializer, Deserializer valDeserializer);
+
+ /**
+ * Sends key-value to a topic using default serializers specified in the config.
+ *
+ * @param topic the topic name
+ */
+ void to(String topic);
+
+ /**
+ * Sends key-value to a topic.
+ *
+ * @param topic the topic name
+ * @param keySerializer key serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ * @param valSerializer value serializer used to send key-value pairs,
+ * if not specified the default serializer defined in the configs will be used
+ */
+ void to(String topic, Serializer keySerializer, Serializer valSerializer);
+
+ /**
+ * Processes all elements in this stream by applying a processor.
+ *
+ * @param processorDef the class of ProcessorDef
+ */
+ KStream process(ProcessorDef processorDef);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
new file mode 100644
index 0000000000000..2d4dcc72f8057
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.internals.KStreamImpl;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+
+/**
+ * KStreamBuilder is the class to create KStream instances.
+ */
+public class KStreamBuilder extends TopologyBuilder {
+
+ public KStreamBuilder() {
+ super();
+ }
+
+ /**
+ * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group.
+ * The default deserializers specified in the config are used.
+ *
+ * @param topics the topic names, if empty default to all the topics in the config
+ * @return KStream
+ */
+ public KStream from(String... topics) {
+ String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
+
+ addSource(name, topics);
+
+ return new KStreamImpl<>(this, name);
+ }
+
+ /**
+ * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group.
+ *
+ * @param keyDeserializer key deserializer used to read this source KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param valDeserializer value deserializer used to read this source KStream,
+ * if not specified the default deserializer defined in the configs will be used
+ * @param topics the topic names, if empty default to all the topics in the config
+ * @return KStream
+ */
+ public KStream from(Deserializer extends K> keyDeserializer, Deserializer extends V> valDeserializer, String... topics) {
+ String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
+
+ addSource(name, keyDeserializer, valDeserializer, topics);
+
+ return new KStreamImpl<>(this, name);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
new file mode 100644
index 0000000000000..4d73128839f21
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+/**
+ * KStreamWindowed is an abstraction of a stream of key-value pairs with a window.
+ */
+public interface KStreamWindowed extends KStream {
+
+ /**
+ * Creates a new stream by joining this windowed stream with the other windowed stream.
+ * Each element arrived from either of the streams is joined with elements in a window of each other.
+ * The resulting values are computed by applying a joiner.
+ *
+ * @param other the other windowed stream
+ * @param joiner ValueJoiner
+ * @param the value type of the other stream
+ * @param the value type of the new stream
+ * @return KStream
+ */
+ KStream join(KStreamWindowed other, ValueJoiner joiner);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
new file mode 100644
index 0000000000000..f633f6e337532
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class KeyValue {
+
+ public final K key;
+ public final V value;
+
+ public KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static KeyValue pair(K key, V value) {
+ return new KeyValue<>(key, value);
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
new file mode 100644
index 0000000000000..62b07f646c812
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueMapper {
+
+ R apply(K key, V value);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
new file mode 100644
index 0000000000000..9cdb3bc6caa56
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Predicate {
+
+ boolean apply(K key, V value);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
new file mode 100644
index 0000000000000..cc03541796cd0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.FilteredIterator;
+import org.apache.kafka.streams.kstream.internals.WindowSupport;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.internals.Stamped;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+public class SlidingWindowDef implements WindowDef {
+ private final String name;
+ private final long duration;
+ private final int maxCount;
+ private final Serializer keySerializer;
+ private final Serializer valueSerializer;
+ private final Deserializer keyDeserializer;
+ private final Deserializer valueDeserializer;
+
+ public SlidingWindowDef(
+ String name,
+ long duration,
+ int maxCount,
+ Serializer keySerializer,
+ Serializer valueSerializer,
+ Deserializer keyDeseriaizer,
+ Deserializer valueDeserializer) {
+ this.name = name;
+ this.duration = duration;
+ this.maxCount = maxCount;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.keyDeserializer = keyDeseriaizer;
+ this.valueDeserializer = valueDeserializer;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Window instance() {
+ return new SlidingWindow();
+ }
+
+ public class SlidingWindow extends WindowSupport implements Window {
+ private final Object lock = new Object();
+ private ProcessorContext context;
+ private int slotNum; // used as a key for Kafka log compaction
+ private LinkedList list = new LinkedList();
+ private HashMap> map = new HashMap<>();
+
+ @Override
+ public void init(ProcessorContext context) {
+ this.context = context;
+ RestoreFuncImpl restoreFunc = new RestoreFuncImpl();
+ context.register(this, restoreFunc);
+
+ for (ValueList valueList : map.values()) {
+ valueList.clearDirtyValues();
+ }
+ this.slotNum = restoreFunc.slotNum;
+ }
+
+ @Override
+ public Iterator findAfter(K key, final long timestamp) {
+ return find(key, timestamp, timestamp + duration);
+ }
+
+ @Override
+ public Iterator findBefore(K key, final long timestamp) {
+ return find(key, timestamp - duration, timestamp);
+ }
+
+ @Override
+ public Iterator find(K key, final long timestamp) {
+ return find(key, timestamp - duration, timestamp + duration);
+ }
+
+ /*
+ * finds items in the window between startTime and endTime (both inclusive)
+ */
+ private Iterator find(K key, final long startTime, final long endTime) {
+ final ValueList values = map.get(key);
+
+ if (values == null) {
+ return Collections.emptyIterator();
+ } else {
+ return new FilteredIterator>(values.iterator()) {
+ @Override
+ protected V filter(Value item) {
+ if (startTime <= item.timestamp && item.timestamp <= endTime)
+ return item.value;
+ else
+ return null;
+ }
+ };
+ }
+ }
+
+ @Override
+ public void put(K key, V value, long timestamp) {
+ synchronized (lock) {
+ slotNum++;
+
+ list.offerLast(key);
+
+ ValueList values = map.get(key);
+ if (values == null) {
+ values = new ValueList<>();
+ map.put(key, values);
+ }
+
+ values.add(slotNum, value, timestamp);
+ }
+ evictExcess();
+ evictExpired(timestamp - duration);
+ }
+
+ private void evictExcess() {
+ while (list.size() > maxCount) {
+ K oldestKey = list.pollFirst();
+
+ ValueList values = map.get(oldestKey);
+ values.removeFirst();
+
+ if (values.isEmpty()) map.remove(oldestKey);
+ }
+ }
+
+ private void evictExpired(long cutoffTime) {
+ while (true) {
+ K oldestKey = list.peekFirst();
+
+ ValueList values = map.get(oldestKey);
+ Stamped oldestValue = values.first();
+
+ if (oldestValue.timestamp < cutoffTime) {
+ list.pollFirst();
+ values.removeFirst();
+
+ if (values.isEmpty()) map.remove(oldestKey);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void flush() {
+ IntegerSerializer intSerializer = new IntegerSerializer();
+ ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+
+ RecordCollector collector = ((ProcessorContextImpl) context).recordCollector();
+
+ for (Map.Entry> entry : map.entrySet()) {
+ ValueList values = entry.getValue();
+ if (values.hasDirtyValues()) {
+ K key = entry.getKey();
+
+ byte[] keyBytes = keySerializer.serialize(name, key);
+
+ Iterator> iterator = values.dirtyValueIterator();
+ while (iterator.hasNext()) {
+ Value dirtyValue = iterator.next();
+ byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
+ byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
+
+ byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
+
+ int offset = 0;
+ offset += putLong(combined, offset, dirtyValue.timestamp);
+ offset += puts(combined, offset, keyBytes);
+ offset += puts(combined, offset, valBytes);
+
+ if (offset != combined.length)
+ throw new IllegalStateException("serialized length does not match");
+
+ collector.send(new ProducerRecord<>(name, context.id(), slot, combined), byteArraySerializer, byteArraySerializer);
+ }
+ values.clearDirtyValues();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO
+ }
+
+ @Override
+ public boolean persistent() {
+ // TODO: should not be persistent, right?
+ return false;
+ }
+
+ private class RestoreFuncImpl implements RestoreFunc {
+
+ final IntegerDeserializer intDeserializer;
+ int slotNum = 0;
+
+ RestoreFuncImpl() {
+ intDeserializer = new IntegerDeserializer();
+ }
+
+ @Override
+ public void apply(byte[] slot, byte[] bytes) {
+
+ slotNum = intDeserializer.deserialize("", slot);
+
+ int offset = 0;
+ // timestamp
+ long timestamp = getLong(bytes, offset);
+ offset += 8;
+ // key
+ int length = getInt(bytes, offset);
+ offset += 4;
+ K key = deserialize(bytes, offset, length, name, keyDeserializer);
+ offset += length;
+ // value
+ length = getInt(bytes, offset);
+ offset += 4;
+ V value = deserialize(bytes, offset, length, name, valueDeserializer);
+
+ put(key, value, timestamp);
+ }
+ }
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
new file mode 100644
index 0000000000000..93fc359bcaa35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface ValueJoiner {
+
+ R apply(V1 value1, V2 value2);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
new file mode 100644
index 0000000000000..a32423d04926c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface ValueMapper {
+
+ V2 apply(V1 value);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
new file mode 100644
index 0000000000000..a1456f6c421b6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.Iterator;
+
+public interface Window extends StateStore {
+
+ void init(ProcessorContext context);
+
+ Iterator find(K key, long timestamp);
+
+ Iterator findAfter(K key, long timestamp);
+
+ Iterator findBefore(K key, long timestamp);
+
+ void put(K key, V value, long timestamp);
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
new file mode 100644
index 0000000000000..bbc5979300b79
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowDef.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface WindowDef {
+
+ String name();
+
+ Window instance();
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
new file mode 100644
index 0000000000000..54d44f01b8723
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Iterator;
+
+public abstract class FilteredIterator implements Iterator {
+
+ private Iterator inner;
+ private T nextValue = null;
+
+ public FilteredIterator(Iterator inner) {
+ this.inner = inner;
+
+ findNext();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextValue != null;
+ }
+
+ @Override
+ public T next() {
+ T value = nextValue;
+ findNext();
+
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void findNext() {
+ while (inner.hasNext()) {
+ S item = inner.next();
+ nextValue = filter(item);
+ if (nextValue != null) {
+ return;
+ }
+ }
+ nextValue = null;
+ }
+
+ protected abstract T filter(S item);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
new file mode 100644
index 0000000000000..6b661b423e60a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.kstream.Predicate;
+
+class KStreamBranch implements ProcessorDef {
+
+ private final Predicate[] predicates;
+
+ @SuppressWarnings("unchecked")
+ public KStreamBranch(Predicate... predicates) {
+ this.predicates = predicates;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamBranchProcessor();
+ }
+
+ private class KStreamBranchProcessor extends AbstractProcessor {
+ @Override
+ public void process(K key, V value) {
+ for (int i = 0; i < predicates.length; i++) {
+ if (predicates[i].apply(key, value)) {
+ // use forward with childIndex here and then break the loop
+ // so that no record is going to be piped to multiple streams
+ context().forward(key, value, i);
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
new file mode 100644
index 0000000000000..5444e70b8395e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamFilter implements ProcessorDef {
+
+ private final Predicate predicate;
+ private final boolean filterOut;
+
+ public KStreamFilter(Predicate predicate, boolean filterOut) {
+ this.predicate = predicate;
+ this.filterOut = filterOut;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamFilterProcessor();
+ }
+
+ private class KStreamFilterProcessor extends AbstractProcessor {
+ @Override
+ public void process(K key, V value) {
+ if (filterOut ^ predicate.apply(key, value)) {
+ context().forward(key, value);
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
new file mode 100644
index 0000000000000..410cfda6366f7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamFlatMap implements ProcessorDef {
+
+ private final KeyValueMapper>> mapper;
+
+ KStreamFlatMap(KeyValueMapper>> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamFlatMapProcessor();
+ }
+
+ private class KStreamFlatMapProcessor extends AbstractProcessor {
+ @Override
+ public void process(K1 key, V1 value) {
+ for (KeyValue newPair : mapper.apply(key, value)) {
+ context().forward(newPair.key, newPair.value);
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
new file mode 100644
index 0000000000000..edca42142ed2b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamFlatMapValues implements ProcessorDef {
+
+ private final ValueMapper> mapper;
+
+ KStreamFlatMapValues(ValueMapper> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamFlatMapValuesProcessor();
+ }
+
+ private class KStreamFlatMapValuesProcessor extends AbstractProcessor {
+ @Override
+ public void process(K1 key, V1 value) {
+ Iterable newValues = mapper.apply(value);
+ for (V2 v : newValues) {
+ context().forward(key, v);
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
new file mode 100644
index 0000000000000..69366481cd7b2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.kstream.KStreamWindowed;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.WindowDef;
+
+import java.lang.reflect.Array;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class KStreamImpl implements KStream {
+
+ private static final String FILTER_NAME = "KAFKA-FILTER-";
+
+ private static final String MAP_NAME = "KAFKA-MAP-";
+
+ private static final String MAPVALUES_NAME = "KAFKA-MAPVALUES-";
+
+ private static final String FLATMAP_NAME = "KAFKA-FLATMAP-";
+
+ private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-";
+
+ private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-";
+
+ private static final String BRANCH_NAME = "KAFKA-BRANCH-";
+
+ private static final String BRANCHCHILD_NAME = "KAFKA-BRANCHCHILD-";
+
+ private static final String WINDOWED_NAME = "KAFKA-WINDOWED-";
+
+ private static final String SINK_NAME = "KAFKA-SINK-";
+
+ public static final String JOINTHIS_NAME = "KAFKA-JOINTHIS-";
+
+ public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
+
+ public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
+
+ public static final String SOURCE_NAME = "KAFKA-SOURCE-";
+
+ public static final AtomicInteger INDEX = new AtomicInteger(1);
+
+ protected final TopologyBuilder topology;
+ protected final String name;
+
+ public KStreamImpl(TopologyBuilder topology, String name) {
+ this.topology = topology;
+ this.name = name;
+ }
+
+ @Override
+ public KStream filter(Predicate predicate) {
+ String name = FILTER_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public KStream filterOut(final Predicate predicate) {
+ String name = FILTER_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public KStream map(KeyValueMapper> mapper) {
+ String name = MAP_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public KStream mapValues(ValueMapper mapper) {
+ String name = MAPVALUES_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public KStream flatMap(KeyValueMapper>> mapper) {
+ String name = FLATMAP_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public KStream flatMapValues(ValueMapper> mapper) {
+ String name = FLATMAPVALUES_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+
+ @Override
+ public KStreamWindowed with(WindowDef window) {
+ String name = WINDOWED_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, new KStreamWindow<>(window), this.name);
+
+ return new KStreamWindowedImpl<>(topology, name, window);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public KStream[] branch(Predicate... predicates) {
+ String branchName = BRANCH_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
+
+ KStream[] branchChildren = (KStream[]) Array.newInstance(KStream.class, predicates.length);
+ for (int i = 0; i < predicates.length; i++) {
+ String childName = BRANCHCHILD_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(childName, new KStreamPassThrough(), branchName);
+
+ branchChildren[i] = new KStreamImpl<>(topology, childName);
+ }
+
+ return branchChildren;
+ }
+
+ @Override
+ public KStream through(String topic,
+ Serializer keySerializer,
+ Serializer valSerializer,
+ Deserializer keyDeserializer,
+ Deserializer valDeserializer) {
+ String sendName = SINK_NAME + INDEX.getAndIncrement();
+
+ topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+
+ String sourceName = SOURCE_NAME + INDEX.getAndIncrement();
+
+ topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
+
+ return new KStreamImpl<>(topology, sourceName);
+ }
+
+ @Override
+ public KStream through(String topic) {
+ return through(topic, (Serializer) null, (Serializer) null, (Deserializer) null, (Deserializer) null);
+ }
+
+ @Override
+ public void to(String topic) {
+ String name = SINK_NAME + INDEX.getAndIncrement();
+
+ topology.addSink(name, topic, this.name);
+ }
+
+ @Override
+ public void to(String topic, Serializer keySerializer, Serializer valSerializer) {
+ String name = SINK_NAME + INDEX.getAndIncrement();
+
+ topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+ }
+
+ @Override
+ public KStream process(final ProcessorDef processorDef) {
+ String name = PROCESSOR_NAME + INDEX.getAndIncrement();
+
+ topology.addProcessor(name, processorDef, this.name);
+
+ return new KStreamImpl<>(topology, name);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
new file mode 100644
index 0000000000000..4003d29f8a874
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+import java.util.Iterator;
+
+class KStreamJoin implements ProcessorDef {
+
+ private static abstract class Finder {
+ abstract Iterator find(K key, long timestamp);
+ }
+
+ private final String windowName;
+ private final ValueJoiner joiner;
+
+ KStreamJoin(String windowName, ValueJoiner joiner) {
+ this.windowName = windowName;
+ this.joiner = joiner;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamJoinProcessor(windowName);
+ }
+
+ private class KStreamJoinProcessor extends AbstractProcessor {
+
+ private final String windowName;
+ protected Finder finder;
+
+ public KStreamJoinProcessor(String windowName) {
+ this.windowName = windowName;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ // check if these two streams are joinable
+ if (!context.joinable())
+ throw new IllegalStateException("Streams are not joinable.");
+
+ final Window window = (Window) context.getStateStore(windowName);
+
+ this.finder = new Finder() {
+ Iterator find(K key, long timestamp) {
+ return window.find(key, timestamp);
+ }
+ };
+ }
+
+ @Override
+ public void process(K key, V1 value) {
+ long timestamp = context().timestamp();
+ Iterator iter = finder.find(key, timestamp);
+ if (iter != null) {
+ while (iter.hasNext()) {
+ context().forward(key, joiner.apply(value, iter.next()));
+ }
+ }
+ }
+ }
+
+ public static ValueJoiner reverseJoiner(final ValueJoiner joiner) {
+ return new ValueJoiner() {
+ @Override
+ public R apply(T2 value2, T1 value1) {
+ return joiner.apply(value1, value2);
+ }
+ };
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
new file mode 100644
index 0000000000000..4d3134874f3e2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamMap implements ProcessorDef {
+
+ private final KeyValueMapper> mapper;
+
+ public KStreamMap(KeyValueMapper> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamMapProcessor();
+ }
+
+ private class KStreamMapProcessor extends AbstractProcessor {
+ @Override
+ public void process(K1 key, V1 value) {
+ KeyValue newPair = mapper.apply(key, value);
+ context().forward(newPair.key, newPair.value);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
new file mode 100644
index 0000000000000..dac655096fcc6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamMapValues implements ProcessorDef {
+
+ private final ValueMapper mapper;
+
+ public KStreamMapValues(ValueMapper mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamMapProcessor();
+ }
+
+ private class KStreamMapProcessor extends AbstractProcessor {
+ @Override
+ public void process(K1 key, V1 value) {
+ V2 newValue = mapper.apply(value);
+ context().forward(key, newValue);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
new file mode 100644
index 0000000000000..ea395503534e4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+
+class KStreamPassThrough implements ProcessorDef {
+
+ @Override
+ public Processor instance() {
+ return new KStreamPassThroughProcessor();
+ }
+
+ public class KStreamPassThroughProcessor extends AbstractProcessor {
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, value);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
new file mode 100644
index 0000000000000..6ebcbe16e4d1b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.kstream.WindowDef;
+
+public class KStreamWindow implements ProcessorDef {
+
+ private final WindowDef windowDef;
+
+ KStreamWindow(WindowDef windowDef) {
+ this.windowDef = windowDef;
+ }
+
+ public WindowDef window() {
+ return windowDef;
+ }
+
+ @Override
+ public Processor instance() {
+ return new KStreamWindowProcessor();
+ }
+
+ private class KStreamWindowProcessor extends AbstractProcessor {
+
+ private Window window;
+
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ this.window = windowDef.instance();
+ this.window.init(context);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ synchronized (this) {
+ window.put(key, value, context().timestamp());
+ context().forward(key, value);
+ }
+ }
+
+ @Override
+ public void close() {
+ window.close();
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
new file mode 100644
index 0000000000000..a208af6cca137
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamWindowed;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.WindowDef;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+
+public final class KStreamWindowedImpl extends KStreamImpl implements KStreamWindowed {
+
+ private final WindowDef windowDef;
+
+ public KStreamWindowedImpl(TopologyBuilder topology, String name, WindowDef windowDef) {
+ super(topology, name);
+ this.windowDef = windowDef;
+ }
+
+ @Override
+ public KStream join(KStreamWindowed other, ValueJoiner valueJoiner) {
+ String thisWindowName = this.windowDef.name();
+ String otherWindowName = ((KStreamWindowedImpl) other).windowDef.name();
+
+ KStreamJoin joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
+ KStreamJoin