From 9d8654b9536115c25af7f58658248cde041a4646 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 9 Oct 2015 14:36:34 -0700 Subject: [PATCH] MINOR: putting back kstream stateful transform methods --- .../apache/kafka/streams/kstream/KStream.java | 18 +++- .../kafka/streams/kstream/Transformer.java | 57 ++++++++++++ .../kafka/streams/kstream/TransformerDef.java | 24 +++++ .../streams/kstream/ValueTransformer.java | 56 +++++++++++ .../streams/kstream/ValueTransformerDef.java | 24 +++++ .../kstream/internals/KStreamImpl.java | 28 +++++- .../kstream/internals/KStreamTransform.java | 71 ++++++++++++++ .../internals/KStreamTransformValues.java | 69 ++++++++++++++ .../processor/internals/StreamThread.java | 10 +- .../kstream/internals/KStreamImplTest.java | 5 +- .../internals/KStreamTransformTest.java | 93 +++++++++++++++++++ .../internals/KStreamTransformValuesTest.java | 92 ++++++++++++++++++ 12 files changed, 536 insertions(+), 11 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/TransformerDef.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerDef.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6c488cff177c5..ec26124a47fa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -147,10 +147,26 @@ public interface KStream { */ void to(String topic, Serializer keySerializer, Serializer valSerializer); + /** + * Applies a stateful transformation to all elements in this stream. + * + * @param transformerDef the class of TransformerDef + * @return KStream + */ + KStream transform(TransformerDef> transformerDef); + + /** + * Applies a stateful transformation to all values in this stream. + * + * @param valueTransformerDef the class of TransformerDef + * @return KStream + */ + KStream transformValues(ValueTransformerDef valueTransformerDef); + /** * Processes all elements in this stream by applying a processor. * * @param processorDef the class of ProcessorDef */ - KStream process(ProcessorDef processorDef); + void process(ProcessorDef processorDef); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java new file mode 100644 index 0000000000000..b67f619b9ac33 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.ProcessorContext; + +public interface Transformer { + + /** + * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology + * that contains it is initialized. + *

+ * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * + * @param context the context; may not be null + */ + void init(ProcessorContext context); + + /** + * Transform the message with the given key and value. + * + * @param key the key for the message + * @param value the value for the message + * @return new value + */ + R transform(K key, V value); + + /** + * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context + * during {@link #init(ProcessorContext) initialization}. + * + * @param timestamp the stream time when this method is being called + */ + void punctuate(long timestamp); + + /** + * Close this processor and clean up any resources. + */ + void close(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerDef.java new file mode 100644 index 0000000000000..07359faf131d2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerDef.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface TransformerDef { + + Transformer instance(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java new file mode 100644 index 0000000000000..5b9e2ff2d1e4a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.ProcessorContext; + +public interface ValueTransformer { + + /** + * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology + * that contains it is initialized. + *

+ * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should + * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * + * @param context the context; may not be null + */ + void init(ProcessorContext context); + + /** + * Transform the message with the given key and value. + * + * @param value the value for the message + * @return new value + */ + R transform(V value); + + /** + * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context + * during {@link #init(ProcessorContext) initialization}. + * + * @param timestamp the stream time when this method is being called + */ + void punctuate(long timestamp); + + /** + * Close this processor and clean up any resources. + */ + void close(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerDef.java new file mode 100644 index 0000000000000..e98cf2d0a1232 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerDef.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface ValueTransformerDef { + + ValueTransformer instance(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 52c717feee7f2..8133693dec5cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.TransformerDef; +import org.apache.kafka.streams.kstream.ValueTransformerDef; import org.apache.kafka.streams.processor.ProcessorDef; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.kstream.KStreamWindowed; @@ -44,6 +46,10 @@ public class KStreamImpl implements KStream { private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-"; + private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-"; + + private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-"; + private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-"; private static final String BRANCH_NAME = "KAFKA-BRANCH-"; @@ -191,11 +197,27 @@ public void to(String topic, Serializer keySerializer, Serializer valSeria } @Override - public KStream process(final ProcessorDef processorDef) { - String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + public KStream transform(TransformerDef> transformerDef) { + String name = TRANSFORM_NAME + INDEX.getAndIncrement(); - topology.addProcessor(name, processorDef, this.name); + topology.addProcessor(name, new KStreamTransform<>(transformerDef), this.name); + + return new KStreamImpl<>(topology, name); + } + + @Override + public KStream transformValues(ValueTransformerDef valueTransformerDef) { + String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement(); + + topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerDef), this.name); return new KStreamImpl<>(topology, name); } + + @Override + public void process(final ProcessorDef processorDef) { + String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + + topology.addProcessor(name, processorDef, this.name); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java new file mode 100644 index 0000000000000..9eab67231fc49 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerDef; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorDef; + +public class KStreamTransform implements ProcessorDef { + + private final TransformerDef> transformerDef; + + public KStreamTransform(TransformerDef> transformerDef) { + this.transformerDef = transformerDef; + } + + @Override + public Processor instance() { + return new KStreamTransformProcessor(transformerDef.instance()); + } + + public static class KStreamTransformProcessor implements Processor { + + private final Transformer> transformer; + private ProcessorContext context; + + public KStreamTransformProcessor(Transformer> transformer) { + this.transformer = transformer; + } + + @Override + public void init(ProcessorContext context) { + transformer.init(context); + this.context = context; + } + + @Override + public void process(K1 key, V1 value) { + KeyValue pair = transformer.transform(key, value); + context.forward(pair.key, pair.value); + } + + @Override + public void punctuate(long timestamp) { + transformer.punctuate(timestamp); + } + + @Override + public void close() { + transformer.close(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java new file mode 100644 index 0000000000000..02825cad02c9b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerDef; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorDef; + +public class KStreamTransformValues implements ProcessorDef { + + private final ValueTransformerDef valueTransformerDef; + + public KStreamTransformValues(ValueTransformerDef valueTransformerDef) { + this.valueTransformerDef = valueTransformerDef; + } + + @Override + public Processor instance() { + return new KStreamTransformValuesProcessor(valueTransformerDef.instance()); + } + + public static class KStreamTransformValuesProcessor implements Processor { + + private final ValueTransformer valueTransformer; + private ProcessorContext context; + + public KStreamTransformValuesProcessor(ValueTransformer valueTransformer) { + this.valueTransformer = valueTransformer; + } + + @Override + public void init(ProcessorContext context) { + valueTransformer.init(context); + this.context = context; + } + + @Override + public void process(K key, V value) { + context.forward(key, valueTransformer.transform(value)); + } + + @Override + public void punctuate(long timestamp) { + valueTransformer.punctuate(timestamp); + } + + @Override + public void close() { + valueTransformer.close(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 95a923d11e194..4a6833254d45d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -159,7 +159,7 @@ private Consumer createConsumer() { new ByteArrayDeserializer(), new ByteArrayDeserializer()); } - + private Consumer createRestoreConsumer() { log.info("Creating restore consumer client for stream thread [" + this.getName() + "]"); return new KafkaConsumer<>(config.getConsumerConfigs(), @@ -240,9 +240,11 @@ private void runLoop() { // try to fetch some records if necessary ConsumerRecords records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); - for (StreamTask task : tasks.values()) { - for (TopicPartition partition : task.partitions()) { - task.addRecords(partition, records.records(partition)); + if (!records.isEmpty()) { + for (StreamTask task : tasks.values()) { + for (TopicPartition partition : task.partitions()) { + task.addRecords(partition, records.records(partition)); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 0660ddd87a54f..0b292c3217159 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -119,7 +119,7 @@ public Integer apply(Integer value1, Integer value2) { stream4.to("topic-5"); - stream5.through("topic-6").process(new MockProcessorDef<>()).to("topic-7"); + stream5.through("topic-6").process(new MockProcessorDef<>()); assertEquals(2 + // sources 2 + // stream1 @@ -131,8 +131,7 @@ public Integer apply(Integer value1, Integer value2) { 2 + 3 + // stream5 1 + // to 2 + // through - 1 + // process - 1, // to + 1, // process builder.build().processors().size()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java new file mode 100644 index 0000000000000..4c3324bf9214e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerDef; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamTransformTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + + @Test + public void testTransform() { + KStreamBuilder builder = new KStreamBuilder(); + + TransformerDef> transformerDef = + new TransformerDef>() { + public Transformer> instance() { + return new Transformer>() { + + private int total = 0; + + @Override + public void init(ProcessorContext context) { + } + + @Override + public KeyValue transform(Integer key, Integer value) { + total += value; + return KeyValue.pair(key * 2, total); + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream stream; + MockProcessorDef processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.transform(transformerDef).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java new file mode 100644 index 0000000000000..f73232ba6e1b2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerDef; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamTransformValuesTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + + @Test + public void testTransform() { + KStreamBuilder builder = new KStreamBuilder(); + + ValueTransformerDef valueTransformerDef = + new ValueTransformerDef() { + public ValueTransformer instance() { + return new ValueTransformer() { + + private int total = 0; + + @Override + public void init(ProcessorContext context) { + } + + @Override + public Integer transform(Integer value) { + total += value; + return total; + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream stream; + MockProcessorDef processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.transformValues(valueTransformerDef).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +}