diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java new file mode 100644 index 0000000000000..cccd298cd5dca --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; + +public class ForeachProcessor implements Processor { + + private final ForeachAction action; + + public ForeachProcessor(final ForeachAction action) { + this.action = action; + } + + @Override + public void process(final Record record) { + action.apply(record.key(), record.value()); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index 21b69f2322492..2d3fc76ab49d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -16,44 +16,44 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.To; - import java.util.List; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamBranch implements ProcessorSupplier { +class KStreamBranch implements ProcessorSupplier { private final List> predicates; private final List childNodes; KStreamBranch(final List> predicates, - final List childNodes) { + final List childNodes) { this.predicates = predicates; this.childNodes = childNodes; } @Override - public Processor get() { + public Processor get() { return new KStreamBranchProcessor(); } - private class KStreamBranchProcessor extends AbstractProcessor { + private class KStreamBranchProcessor extends ContextualProcessor { + @Override - public void process(final K key, final V value) { + public void process(final Record record) { for (int i = 0; i < predicates.size(); i++) { - if (predicates.get(i).test(key, value)) { + if (predicates.get(i).test(record.key(), record.value())) { // use forward with child here and then break the loop // so that no record is going to be piped to multiple streams - context().forward(key, value, To.child(childNodes.get(i))); + context().forward(record, childNodes.get(i)); return; } } // using default child node if supplied if (childNodes.size() > predicates.size()) { - context().forward(key, value, To.child(childNodes.get(predicates.size()))); + context().forward(record, childNodes.get(predicates.size())); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index ac03c18351bbf..ffafd10b460c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -16,12 +16,13 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamFilter implements ProcessorSupplier { +class KStreamFilter implements ProcessorSupplier { private final Predicate predicate; private final boolean filterNot; @@ -32,15 +33,15 @@ public KStreamFilter(final Predicate predicate, final boolean filterNot) { } @Override - public Processor get() { + public Processor get() { return new KStreamFilterProcessor(); } - private class KStreamFilterProcessor extends AbstractProcessor { + private class KStreamFilterProcessor extends ContextualProcessor { @Override - public void process(final K key, final V value) { - if (filterNot ^ predicate.test(key, value)) { - context().forward(key, value); + public void process(final Record record) { + if (filterNot ^ predicate.test(record.key(), record.value())) { + context().forward(record); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index e20ec90aa7a5a..b731a5c1c4f48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -18,28 +18,31 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamFlatMap implements ProcessorSupplier { +class KStreamFlatMap implements ProcessorSupplier { - private final KeyValueMapper>> mapper; + private final KeyValueMapper>> mapper; - KStreamFlatMap(final KeyValueMapper>> mapper) { + KStreamFlatMap(final KeyValueMapper>> mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamFlatMapProcessor(); } - private class KStreamFlatMapProcessor extends AbstractProcessor { + private class KStreamFlatMapProcessor extends ContextualProcessor { @Override - public void process(final K key, final V value) { - for (final KeyValue newPair : mapper.apply(key, value)) { - context().forward(newPair.key, newPair.value); + public void process(final Record record) { + final Iterable> newKeyValues = + mapper.apply(record.key(), record.value()); + for (final KeyValue newPair : newKeyValues) { + context().forward(record.withKey(newPair.key).withValue(newPair.value)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index fedfe393c63b2..1008b297b3d3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -17,29 +17,30 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamFlatMapValues implements ProcessorSupplier { +class KStreamFlatMapValues implements ProcessorSupplier { - private final ValueMapperWithKey> mapper; + private final ValueMapperWithKey> mapper; - KStreamFlatMapValues(final ValueMapperWithKey> mapper) { + KStreamFlatMapValues(final ValueMapperWithKey> mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamFlatMapValuesProcessor(); } - private class KStreamFlatMapValuesProcessor extends AbstractProcessor { + private class KStreamFlatMapValuesProcessor extends ContextualProcessor { @Override - public void process(final K key, final V value) { - final Iterable newValues = mapper.apply(key, value); - for (final V1 v : newValues) { - context().forward(key, v); + public void process(final Record record) { + final Iterable newValues = mapper.apply(record.key(), record.value()); + for (final VOut v : newValues) { + context().forward(record.withValue(v)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 9b5ed15539deb..213bebefe80f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -61,6 +61,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; +import org.apache.kafka.streams.kstream.ForeachProcessor; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; @@ -406,7 +407,7 @@ public void foreach(final ForeachAction action, final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME); final ProcessorParameters processorParameters = - new ProcessorParameters<>(new KStreamPeek<>(action, false), name); + new ProcessorParameters<>(() -> new ForeachProcessor<>(action), name); final ProcessorGraphNode foreachNode = new ProcessorGraphNode<>(name, processorParameters); @@ -426,7 +427,7 @@ public KStream peek(final ForeachAction action, final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME); final ProcessorParameters processorParameters = - new ProcessorParameters<>(new KStreamPeek<>(action, true), name); + new ProcessorParameters<>(new KStreamPeek<>(action), name); final ProcessorGraphNode peekNode = new ProcessorGraphNode<>(name, processorParameters); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index 8179ca8fe64f5..0fec71630383e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -16,30 +16,33 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamMap implements ProcessorSupplier { +class KStreamMap implements ProcessorSupplier { - private final KeyValueMapper> mapper; + private final KeyValueMapper> mapper; - public KStreamMap(final KeyValueMapper> mapper) { + public KStreamMap(final KeyValueMapper> mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor { + private class KStreamMapProcessor extends ContextualProcessor { + @Override - public void process(final K key, final V value) { - final KeyValue newPair = mapper.apply(key, value); - context().forward(newPair.key, newPair.value); + public void process(final Record record) { + final KeyValue newPair = + mapper.apply(record.key(), record.value()); + context().forward(record.withKey(newPair.key).withValue(newPair.value)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index 28c120e3d6221..f73bfdd53bc33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -17,28 +17,29 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamMapValues implements ProcessorSupplier { +class KStreamMapValues implements ProcessorSupplier { - private final ValueMapperWithKey mapper; + private final ValueMapperWithKey mapper; - public KStreamMapValues(final ValueMapperWithKey mapper) { + public KStreamMapValues(final ValueMapperWithKey mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor { + private class KStreamMapProcessor extends ContextualProcessor { @Override - public void process(final K readOnlyKey, final V value) { - final V1 newValue = mapper.apply(readOnlyKey, value); - context().forward(readOnlyKey, newValue); + public void process(final Record record) { + final VOut newValue = mapper.apply(record.key(), record.value()); + context().forward(record.withValue(newValue)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java index 44d1d60ead821..69b5e7fc3316f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java @@ -17,32 +17,29 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamPeek implements ProcessorSupplier { +class KStreamPeek implements ProcessorSupplier { - private final boolean forwardDownStream; private final ForeachAction action; - public KStreamPeek(final ForeachAction action, final boolean forwardDownStream) { + public KStreamPeek(final ForeachAction action) { this.action = action; - this.forwardDownStream = forwardDownStream; } @Override - public Processor get() { + public Processor get() { return new KStreamPeekProcessor(); } - private class KStreamPeekProcessor extends AbstractProcessor { + private class KStreamPeekProcessor extends ContextualProcessor { @Override - public void process(final K key, final V value) { - action.apply(key, value); - if (forwardDownStream) { - context().forward(key, value); - } + public void process(final Record record) { + action.apply(record.key(), record.value()); + context().forward(record); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java index 8ce698ab54631..a04662cbb8706 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java @@ -17,28 +17,28 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -public class KStreamPrint implements ProcessorSupplier { +public class KStreamPrint implements ProcessorSupplier { private final ForeachAction action; - + public KStreamPrint(final ForeachAction action) { this.action = action; } @Override - public Processor get() { + public Processor get() { return new KStreamPrintProcessor(); } - private class KStreamPrintProcessor extends AbstractProcessor { + private class KStreamPrintProcessor implements Processor { @Override - public void process(final K key, final V value) { - action.apply(key, value); + public void process(final Record record) { + action.apply(record.key(), record.value()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 7173ac796e16f..95b3c46367379 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -484,7 +484,7 @@ public KStream toStream(final Named named) { Objects.requireNonNull(named, "named can't be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME); - final ProcessorSupplier> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue); + final KStreamMapValues, V> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue); final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(kStreamMapValues, name) ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java index b83b3a405f43d..f357a464c2b23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java @@ -16,21 +16,22 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class PassThrough implements ProcessorSupplier { +class PassThrough implements ProcessorSupplier { @Override - public Processor get() { + public Processor get() { return new PassThroughProcessor<>(); } - private static final class PassThroughProcessor extends AbstractProcessor { + private static final class PassThroughProcessor extends ContextualProcessor { @Override - public void process(final K key, final V value) { - context().forward(key, value); + public void process(final Record record) { + context().forward(record); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java index 546e35321b2ca..0cd1760e69af8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java @@ -17,14 +17,14 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Printed; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; public class PrintedInternal extends Printed { public PrintedInternal(final Printed printed) { super(printed); } - public ProcessorSupplier build(final String processorName) { + public ProcessorSupplier build(final String processorName) { return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java new file mode 100644 index 0000000000000..d2522e3e075df --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +/** + * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op + * implementation of {@link #close()}. + * + * @param the type of input keys + * @param the type of input values + * @param the type of output keys + * @param the type of output values + */ +public abstract class ContextualProcessor implements Processor { + + protected ProcessorContext context; + + protected ContextualProcessor() {} + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + /** + * Get the processor's context set during {@link #init(ProcessorContext) initialization}. + * + * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}. + */ + protected final ProcessorContext context() { + return context; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java index a56dbbabc387f..212074fd23614 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java @@ -19,8 +19,9 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.internals.PrintedInternal; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -59,11 +60,11 @@ public void after() { @Test public void shouldCreateProcessorThatPrintsToFile() throws IOException { final File file = TestUtils.tempFile(); - final ProcessorSupplier processorSupplier = new PrintedInternal<>( + final ProcessorSupplier processorSupplier = new PrintedInternal<>( Printed.toFile(file.getPath())) .build("processor"); - final Processor processor = processorSupplier.get(); - processor.process("hi", 1); + final Processor processor = processorSupplier.get(); + processor.process(new Record<>("hi", 1, 0L)); processor.close(); try (final InputStream stream = Files.newInputStream(file.toPath())) { final byte[] data = new byte[stream.available()]; @@ -74,36 +75,31 @@ public void shouldCreateProcessorThatPrintsToFile() throws IOException { @Test public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException { - final ProcessorSupplier supplier = new PrintedInternal<>(sysOutPrinter).build("processor"); - final Processor processor = supplier.get(); + final ProcessorSupplier supplier = new PrintedInternal<>(sysOutPrinter).build("processor"); + final Processor processor = supplier.get(); - processor.process("good", 2); + processor.process(new Record<>("good", 2, 0L)); processor.close(); assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n")); } @Test public void shouldPrintWithLabel() throws UnsupportedEncodingException { - final Processor processor = new PrintedInternal<>(sysOutPrinter.withLabel("label")) + final Processor processor = new PrintedInternal<>(sysOutPrinter.withLabel("label")) .build("processor") .get(); - processor.process("hello", 3); + processor.process(new Record<>("hello", 3, 0L)); processor.close(); assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n")); } @Test public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException { - final Processor processor = new PrintedInternal<>(sysOutPrinter.withKeyValueMapper( - new KeyValueMapper() { - @Override - public String apply(final String key, final Integer value) { - return String.format("%s -> %d", key, value); - } - })).build("processor") - .get(); - processor.process("hello", 1); + final Processor processor = new PrintedInternal<>( + sysOutPrinter.withKeyValueMapper((key, value) -> String.format("%s -> %d", key, value)) + ).build("processor").get(); + processor.process(new Record<>("hello", 1, 0L)); processor.close(); assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index e33186e805b53..1813522bcce2c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -60,8 +60,8 @@ public void testKStreamBranch() { assertEquals(3, branches.length); final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - for (int i = 0; i < branches.length; i++) { - branches[i].process(supplier); + for (final KStream branch : branches) { + branch.process(supplier); } try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java index 990655612dd37..2915a118792a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java @@ -17,8 +17,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -33,7 +34,7 @@ public class KStreamPrintTest { private ByteArrayOutputStream byteOutStream; - private Processor printProcessor; + private Processor printProcessor; @Before public void setUp() { @@ -45,14 +46,13 @@ public void setUp() { "test-stream")); printProcessor = kStreamPrint.get(); - final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class); + final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class); EasyMock.replay(processorContext); printProcessor.init(processorContext); } @Test - @SuppressWarnings("unchecked") public void testPrintStreamWithProvidedKeyValueMapper() { final List> inputRecords = Arrays.asList( new KeyValue<>(0, "zero"), @@ -67,7 +67,8 @@ public void testPrintStreamWithProvidedKeyValueMapper() { "[test-stream]: 3, three"}; for (final KeyValue record: inputRecords) { - printProcessor.process(record.key, record.value); + final Record r = new Record<>(record.key, record.value, 0L); + printProcessor.process(r); } printProcessor.close();