diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index ecec882493382..915cf1c19ceaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -23,7 +23,7 @@ /** * KStream is an abstraction of a stream of key-value pairs. - * + * * @param the type of keys * @param the type of values */ @@ -150,11 +150,28 @@ public interface KStream { */ void to(String topic, Serializer keySerializer, Serializer valSerializer); + /** + * Applies a stateful transformation to all elements in this stream. + * + * @param transformerSupplier the class of TransformerDef + * @return KStream + */ + KStream transform(TransformerSupplier> transformerSupplier); + + /** + * Applies a stateful transformation to all values in this stream. + * + * @param valueTransformerSupplier the class of TransformerDef + * @return KStream + */ + KStream transformValues(ValueTransformerSupplier valueTransformerSupplier); + /** * Processes all elements in this stream by applying a processor. * * @param processorSupplier the supplier of the Processor to use * @return the new stream containing the processed output */ - KStream process(ProcessorSupplier processorSupplier); + void process(ProcessorSupplier processorSupplier); + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java new file mode 100644 index 0000000000000..b67f619b9ac33 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.ProcessorContext; + +public interface Transformer { + + /** + * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology + * that contains it is initialized. + *

+ * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * + * @param context the context; may not be null + */ + void init(ProcessorContext context); + + /** + * Transform the message with the given key and value. + * + * @param key the key for the message + * @param value the value for the message + * @return new value + */ + R transform(K key, V value); + + /** + * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context + * during {@link #init(ProcessorContext) initialization}. + * + * @param timestamp the stream time when this method is being called + */ + void punctuate(long timestamp); + + /** + * Close this processor and clean up any resources. + */ + void close(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java new file mode 100644 index 0000000000000..2c2d8dd28c924 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface TransformerSupplier { + + Transformer get(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java new file mode 100644 index 0000000000000..5b9e2ff2d1e4a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.ProcessorContext; + +public interface ValueTransformer { + + /** + * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology + * that contains it is initialized. + *

+ * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * + * @param context the context; may not be null + */ + void init(ProcessorContext context); + + /** + * Transform the message with the given key and value. + * + * @param value the value for the message + * @return new value + */ + R transform(V value); + + /** + * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context + * during {@link #init(ProcessorContext) initialization}. + * + * @param timestamp the stream time when this method is being called + */ + void punctuate(long timestamp); + + /** + * Close this processor and clean up any resources. + */ + void close(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java new file mode 100644 index 0000000000000..5c053c714d465 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface ValueTransformerSupplier { + + ValueTransformer get(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index cff97d6c31f9e..8f56e0968b9d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -19,9 +19,11 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamWindowed; -import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; @@ -44,6 +46,10 @@ public class KStreamImpl implements KStream { private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-"; + private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-"; + + private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-"; + private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-"; private static final String BRANCH_NAME = "KAFKA-BRANCH-"; @@ -191,11 +197,27 @@ public void to(String topic, Serializer keySerializer, Serializer valSeria } @Override - public KStream process(final ProcessorSupplier processorSupplier) { - String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + public KStream transform(TransformerSupplier> transformerSupplier) { + String name = TRANSFORM_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, processorSupplier, this.name); + topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name); + + return new KStreamImpl<>(topology, name); + } + + @Override + public KStream transformValues(ValueTransformerSupplier valueTransformerSupplier) { + String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement(); + + topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name); return new KStreamImpl<>(topology, name); } + + @Override + public void process(final ProcessorSupplier processorSupplier) { + String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + + topology.addProcessor(name, processorSupplier, this.name); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java new file mode 100644 index 0000000000000..7ebab0eae4419 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public class KStreamTransform implements ProcessorSupplier { + + private final TransformerSupplier> transformerSupplier; + + public KStreamTransform(TransformerSupplier> transformerSupplier) { + this.transformerSupplier = transformerSupplier; + } + + @Override + public Processor get() { + return new KStreamTransformProcessor(transformerSupplier.get()); + } + + public static class KStreamTransformProcessor implements Processor { + + private final Transformer> transformer; + private ProcessorContext context; + + public KStreamTransformProcessor(Transformer> transformer) { + this.transformer = transformer; + } + + @Override + public void init(ProcessorContext context) { + transformer.init(context); + this.context = context; + } + + @Override + public void process(K1 key, V1 value) { + KeyValue pair = transformer.transform(key, value); + context.forward(pair.key, pair.value); + } + + @Override + public void punctuate(long timestamp) { + transformer.punctuate(timestamp); + } + + @Override + public void close() { + transformer.close(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java new file mode 100644 index 0000000000000..6f989e6833cfc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public class KStreamTransformValues implements ProcessorSupplier { + + private final ValueTransformerSupplier valueTransformerSupplier; + + public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) { + this.valueTransformerSupplier = valueTransformerSupplier; + } + + @Override + public Processor get() { + return new KStreamTransformValuesProcessor(valueTransformerSupplier.get()); + } + + public static class KStreamTransformValuesProcessor implements Processor { + + private final ValueTransformer valueTransformer; + private ProcessorContext context; + + public KStreamTransformValuesProcessor(ValueTransformer valueTransformer) { + this.valueTransformer = valueTransformer; + } + + @Override + public void init(ProcessorContext context) { + valueTransformer.init(context); + this.context = context; + } + + @Override + public void process(K key, V value) { + context.forward(key, valueTransformer.transform(value)); + } + + @Override + public void punctuate(long timestamp) { + valueTransformer.punctuate(timestamp); + } + + @Override + public void close() { + valueTransformer.close(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 95a923d11e194..4a6833254d45d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -159,7 +159,7 @@ private Consumer createConsumer() { new ByteArrayDeserializer(), new ByteArrayDeserializer()); } - + private Consumer createRestoreConsumer() { log.info("Creating restore consumer client for stream thread [" + this.getName() + "]"); return new KafkaConsumer<>(config.getConsumerConfigs(), @@ -240,9 +240,11 @@ private void runLoop() { // try to fetch some records if necessary ConsumerRecords records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); - for (StreamTask task : tasks.values()) { - for (TopicPartition partition : task.partitions()) { - task.addRecords(partition, records.records(partition)); + if (!records.isEmpty()) { + for (StreamTask task : tasks.values()) { + for (TopicPartition partition : task.partitions()) { + task.addRecords(partition, records.records(partition)); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 875712a04ef6b..2db488c790b0c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -119,7 +119,7 @@ public Integer apply(Integer value1, Integer value2) { stream4.to("topic-5"); - stream5.through("topic-6").process(new MockProcessorSupplier<>()).to("topic-7"); + stream5.through("topic-6").process(new MockProcessorSupplier<>()); assertEquals(2 + // sources 2 + // stream1 @@ -131,8 +131,7 @@ public Integer apply(Integer value1, Integer value2) { 2 + 3 + // stream5 1 + // to 2 + // through - 1 + // process - 1, // to + 1, // process builder.build().processors().size()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java new file mode 100644 index 0000000000000..e397dd1fe20dd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamTransformTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + + @Test + public void testTransform() { + KStreamBuilder builder = new KStreamBuilder(); + + TransformerSupplier> transformerSupplier = + new TransformerSupplier>() { + public Transformer> get() { + return new Transformer>() { + + private int total = 0; + + @Override + public void init(ProcessorContext context) { + } + + @Override + public KeyValue transform(Integer key, Integer value) { + total += value; + return KeyValue.pair(key * 2, total); + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream stream; + MockProcessorSupplier processor = new MockProcessorSupplier<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.transform(transformerSupplier).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java new file mode 100644 index 0000000000000..c5c9b39893c70 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamTransformValuesTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + + @Test + public void testTransform() { + KStreamBuilder builder = new KStreamBuilder(); + + ValueTransformerSupplier valueTransformerSupplier = + new ValueTransformerSupplier() { + public ValueTransformer get() { + return new ValueTransformer() { + + private int total = 0; + + @Override + public void init(ProcessorContext context) { + } + + @Override + public Integer transform(Integer value) { + total += value; + return total; + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream stream; + MockProcessorSupplier processor = new MockProcessorSupplier<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.transformValues(valueTransformerSupplier).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +}