From a10e11f0bac30d86a740af10e4a73a06406a094e Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 19 Mar 2021 13:14:15 +0000 Subject: [PATCH 01/16] add new abstract processor --- .../processor/api/AbstractProcessor.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java new file mode 100644 index 0000000000000..f4c03c7e0ec09 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +/** + * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op + * implementation of {@link #close()}. + * + * @param the type of input keys + * @param the type of input values + * @param the type of output keys + * @param the type of output values + */ +public abstract class AbstractProcessor implements Processor { + + protected ProcessorContext context; + + protected AbstractProcessor() {} + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + /** + * Close this processor and clean up any resources. + *

+ * This method does nothing by default; if desired, subclasses should override it with custom functionality. + *

+ */ + @Override + public void close() { + // do nothing + } + + /** + * Get the processor's context set during {@link #init(ProcessorContext) initialization}. + * + * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}. + */ + protected final ProcessorContext context() { + return context; + } +} From 7ddf7ca0aa628a5944964bc29aad605409e1db87 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 11:00:49 +0000 Subject: [PATCH 02/16] migrate kstream branch to new processor --- .../kstream/internals/KStreamBranch.java | 28 +++++++++---------- .../kstream/internals/KStreamBranchTest.java | 4 +-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index 21b69f2322492..e07265c2290a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -16,44 +16,44 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.To; - import java.util.List; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamBranch implements ProcessorSupplier { +class KStreamBranch implements ProcessorSupplier { private final List> predicates; private final List childNodes; KStreamBranch(final List> predicates, - final List childNodes) { + final List childNodes) { this.predicates = predicates; this.childNodes = childNodes; } @Override - public Processor get() { + public Processor get() { return new KStreamBranchProcessor(); } - private class KStreamBranchProcessor extends AbstractProcessor { + private class KStreamBranchProcessor extends AbstractProcessor { + @Override - public void process(final K key, final V value) { + public void process(final Record record) { for (int i = 0; i < predicates.size(); i++) { - if (predicates.get(i).test(key, value)) { + if (predicates.get(i).test(record.key(), record.value())) { // use forward with child here and then break the loop // so that no record is going to be piped to multiple streams - context().forward(key, value, To.child(childNodes.get(i))); + context().forward(record, childNodes.get(i)); return; } } // using default child node if supplied if (childNodes.size() > predicates.size()) { - context().forward(key, value, To.child(childNodes.get(predicates.size()))); + context().forward(record, childNodes.get(predicates.size())); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index e33186e805b53..1813522bcce2c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -60,8 +60,8 @@ public void testKStreamBranch() { assertEquals(3, branches.length); final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - for (int i = 0; i < branches.length; i++) { - branches[i].process(supplier); + for (final KStream branch : branches) { + branch.process(supplier); } try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { From 349f524d916619099f42d75204799afa0cf1e3de Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 11:51:42 +0000 Subject: [PATCH 03/16] migrate kstream filter to new processor --- .../kstream/internals/KStreamFilter.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index ac03c18351bbf..c2f3466455af4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -16,12 +16,13 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamFilter implements ProcessorSupplier { +class KStreamFilter implements ProcessorSupplier { private final Predicate predicate; private final boolean filterNot; @@ -32,15 +33,15 @@ public KStreamFilter(final Predicate predicate, final boolean filterNot) { } @Override - public Processor get() { + public Processor get() { return new KStreamFilterProcessor(); } - private class KStreamFilterProcessor extends AbstractProcessor { + private class KStreamFilterProcessor extends AbstractProcessor { @Override - public void process(final K key, final V value) { - if (filterNot ^ predicate.test(key, value)) { - context().forward(key, value); + public void process(final Record record) { + if (filterNot ^ predicate.test(record.key(), record.value())) { + context().forward(record); } } } From b7854ec46e7bdd91a4dad6b8a1a9f3f956d3087a Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 11:51:57 +0000 Subject: [PATCH 04/16] migrate kstream flat map to new processor --- .../kstream/internals/KStreamFlatMap.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index e20ec90aa7a5a..013df38f16fba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -18,28 +18,35 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamFlatMap implements ProcessorSupplier { +class KStreamFlatMap implements ProcessorSupplier { - private final KeyValueMapper>> mapper; + private final KeyValueMapper>> mapper; - KStreamFlatMap(final KeyValueMapper>> mapper) { + KStreamFlatMap(final KeyValueMapper>> mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamFlatMapProcessor(); } - private class KStreamFlatMapProcessor extends AbstractProcessor { + private class KStreamFlatMapProcessor extends AbstractProcessor { @Override - public void process(final K key, final V value) { - for (final KeyValue newPair : mapper.apply(key, value)) { - context().forward(newPair.key, newPair.value); + public void process(final Record record) { + for (final KeyValue newPair : + mapper.apply(record.key(), record.value())) { + final Record newRecord = new Record<>( + newPair.key, + newPair.value, + record.timestamp(), + record.headers()); + context().forward(newRecord); } } } From 5d5216288a63692a91f17114774672360631dd03 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 11:58:56 +0000 Subject: [PATCH 05/16] migrate kstream flat map values to new processor --- .../internals/KStreamFlatMapValues.java | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index fedfe393c63b2..1ecace0a20d39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -17,29 +17,35 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamFlatMapValues implements ProcessorSupplier { +class KStreamFlatMapValues implements ProcessorSupplier { - private final ValueMapperWithKey> mapper; + private final ValueMapperWithKey> mapper; - KStreamFlatMapValues(final ValueMapperWithKey> mapper) { + KStreamFlatMapValues(final ValueMapperWithKey> mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamFlatMapValuesProcessor(); } - private class KStreamFlatMapValuesProcessor extends AbstractProcessor { + private class KStreamFlatMapValuesProcessor extends AbstractProcessor { @Override - public void process(final K key, final V value) { - final Iterable newValues = mapper.apply(key, value); - for (final V1 v : newValues) { - context().forward(key, v); + public void process(final Record record) { + final Iterable newValues = mapper.apply(record.key(), record.value()); + for (final VOut v : newValues) { + final Record newRecord = new Record<>( + record.key(), + v, + record.timestamp(), + record.headers()); + context().forward(newRecord); } } } From 4f4f970278339337ce9b77fbfd7910c673b2dbde Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 12:06:11 +0000 Subject: [PATCH 06/16] fix flatmap to reuse record with new values --- .../kafka/streams/kstream/internals/KStreamFlatMap.java | 7 +------ .../streams/kstream/internals/KStreamFlatMapValues.java | 7 +------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index 013df38f16fba..76a07094cff60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -41,12 +41,7 @@ private class KStreamFlatMapProcessor extends AbstractProcessor record) { for (final KeyValue newPair : mapper.apply(record.key(), record.value())) { - final Record newRecord = new Record<>( - newPair.key, - newPair.value, - record.timestamp(), - record.headers()); - context().forward(newRecord); + context().forward(record.withKey(newPair.key).withValue(newPair.value)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index 1ecace0a20d39..0c9a418f0fda3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -40,12 +40,7 @@ private class KStreamFlatMapValuesProcessor extends AbstractProcessor record) { final Iterable newValues = mapper.apply(record.key(), record.value()); for (final VOut v : newValues) { - final Record newRecord = new Record<>( - record.key(), - v, - record.timestamp(), - record.headers()); - context().forward(newRecord); + context().forward(record.withValue(v)); } } } From 315ed7784e2bc16e71230016fa3b36466f955be7 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 12:21:48 +0000 Subject: [PATCH 07/16] migrate kstream map ops to new processor --- .../streams/kstream/internals/KStreamMap.java | 25 +++++++++++-------- .../kstream/internals/KStreamMapValues.java | 23 +++++++++-------- .../streams/kstream/internals/KTableImpl.java | 2 +- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index 8179ca8fe64f5..c50b342e0fc9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -16,30 +16,33 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamMap implements ProcessorSupplier { +class KStreamMap implements ProcessorSupplier { - private final KeyValueMapper> mapper; + private final KeyValueMapper> mapper; - public KStreamMap(final KeyValueMapper> mapper) { + public KStreamMap(final KeyValueMapper> mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor { + private class KStreamMapProcessor extends AbstractProcessor { + @Override - public void process(final K key, final V value) { - final KeyValue newPair = mapper.apply(key, value); - context().forward(newPair.key, newPair.value); + public void process(final Record record) { + final KeyValue newPair = + mapper.apply(record.key(), record.value()); + context().forward(record.withKey(newPair.key).withValue(newPair.value)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index 28c120e3d6221..1b73622dce0d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -17,28 +17,29 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamMapValues implements ProcessorSupplier { +class KStreamMapValues implements ProcessorSupplier { - private final ValueMapperWithKey mapper; + private final ValueMapperWithKey mapper; - public KStreamMapValues(final ValueMapperWithKey mapper) { + public KStreamMapValues(final ValueMapperWithKey mapper) { this.mapper = mapper; } @Override - public Processor get() { + public Processor get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor { + private class KStreamMapProcessor extends AbstractProcessor { @Override - public void process(final K readOnlyKey, final V value) { - final V1 newValue = mapper.apply(readOnlyKey, value); - context().forward(readOnlyKey, newValue); + public void process(final Record record) { + final VOut newValue = mapper.apply(record.key(), record.value()); + context().forward(record.withValue(newValue)); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 7173ac796e16f..95b3c46367379 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -484,7 +484,7 @@ public KStream toStream(final Named named) { Objects.requireNonNull(named, "named can't be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME); - final ProcessorSupplier> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue); + final KStreamMapValues, V> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue); final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(kStreamMapValues, name) ); From 430964fd524abd8b59710c29c6d1464f7dabc7e5 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 12:25:29 +0000 Subject: [PATCH 08/16] migrate kstream peek to new processor --- .../kstream/internals/KStreamPeek.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java index 44d1d60ead821..c29f10e5c0844 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java @@ -17,11 +17,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class KStreamPeek implements ProcessorSupplier { +class KStreamPeek implements ProcessorSupplier { private final boolean forwardDownStream; private final ForeachAction action; @@ -32,16 +33,16 @@ public KStreamPeek(final ForeachAction action, final boolean forwardDownSt } @Override - public Processor get() { + public Processor get() { return new KStreamPeekProcessor(); } - private class KStreamPeekProcessor extends AbstractProcessor { + private class KStreamPeekProcessor extends AbstractProcessor { @Override - public void process(final K key, final V value) { - action.apply(key, value); + public void process(final Record record) { + action.apply(record.key(), record.value()); if (forwardDownStream) { - context().forward(key, value); + context().forward(record); } } } From 23919e56b7d7b7106e82f318c5b7bad421516afd Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 12:39:49 +0000 Subject: [PATCH 09/16] migrate kstream print to new processor --- .../kstream/internals/KStreamPrint.java | 17 +++++----- .../kstream/internals/PrintedInternal.java | 4 +-- .../kafka/streams/kstream/PrintedTest.java | 34 ++++++++----------- .../kstream/internals/KStreamPrintTest.java | 12 ++++--- 4 files changed, 33 insertions(+), 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java index 8ce698ab54631..f5a42e602b367 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java @@ -17,11 +17,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -public class KStreamPrint implements ProcessorSupplier { +public class KStreamPrint implements ProcessorSupplier { private final ForeachAction action; @@ -30,15 +31,15 @@ public KStreamPrint(final ForeachAction action) { } @Override - public Processor get() { + public Processor get() { return new KStreamPrintProcessor(); } - private class KStreamPrintProcessor extends AbstractProcessor { + private class KStreamPrintProcessor extends AbstractProcessor { @Override - public void process(final K key, final V value) { - action.apply(key, value); + public void process(final Record record) { + action.apply(record.key(), record.value()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java index 546e35321b2ca..a8c679ef4fa31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java @@ -17,14 +17,14 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Printed; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; public class PrintedInternal extends Printed { public PrintedInternal(final Printed printed) { super(printed); } - public ProcessorSupplier build(final String processorName) { + public ProcessorSupplier build(final String processorName) { return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java index a56dbbabc387f..9b803c31601c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java @@ -19,8 +19,9 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.internals.PrintedInternal; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -59,11 +60,11 @@ public void after() { @Test public void shouldCreateProcessorThatPrintsToFile() throws IOException { final File file = TestUtils.tempFile(); - final ProcessorSupplier processorSupplier = new PrintedInternal<>( + final ProcessorSupplier processorSupplier = new PrintedInternal<>( Printed.toFile(file.getPath())) .build("processor"); - final Processor processor = processorSupplier.get(); - processor.process("hi", 1); + final Processor processor = processorSupplier.get(); + processor.process(new Record<>("hi", 1, 0L)); processor.close(); try (final InputStream stream = Files.newInputStream(file.toPath())) { final byte[] data = new byte[stream.available()]; @@ -74,36 +75,31 @@ public void shouldCreateProcessorThatPrintsToFile() throws IOException { @Test public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException { - final ProcessorSupplier supplier = new PrintedInternal<>(sysOutPrinter).build("processor"); - final Processor processor = supplier.get(); + final ProcessorSupplier supplier = new PrintedInternal<>(sysOutPrinter).build("processor"); + final Processor processor = supplier.get(); - processor.process("good", 2); + processor.process(new Record<>("good", 2, 0L)); processor.close(); assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n")); } @Test public void shouldPrintWithLabel() throws UnsupportedEncodingException { - final Processor processor = new PrintedInternal<>(sysOutPrinter.withLabel("label")) + final Processor processor = new PrintedInternal<>(sysOutPrinter.withLabel("label")) .build("processor") .get(); - processor.process("hello", 3); + processor.process(new Record<>("hello", 3, 0L)); processor.close(); assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n")); } @Test public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException { - final Processor processor = new PrintedInternal<>(sysOutPrinter.withKeyValueMapper( - new KeyValueMapper() { - @Override - public String apply(final String key, final Integer value) { - return String.format("%s -> %d", key, value); - } - })).build("processor") - .get(); - processor.process("hello", 1); + final Processor processor = new PrintedInternal<>( + sysOutPrinter.withKeyValueMapper((key, value) -> String.format("%s -> %d", key, value)) + ).build("processor").get(); + processor.process(new Record<>("hello", 1, 0L)); processor.close(); assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java index 990655612dd37..84195b1b14e48 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java @@ -17,8 +17,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -33,7 +34,7 @@ public class KStreamPrintTest { private ByteArrayOutputStream byteOutStream; - private Processor printProcessor; + private Processor printProcessor; @Before public void setUp() { @@ -45,7 +46,7 @@ public void setUp() { "test-stream")); printProcessor = kStreamPrint.get(); - final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class); + final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class); EasyMock.replay(processorContext); printProcessor.init(processorContext); @@ -67,7 +68,8 @@ public void testPrintStreamWithProvidedKeyValueMapper() { "[test-stream]: 3, three"}; for (final KeyValue record: inputRecords) { - printProcessor.process(record.key, record.value); + final Record r = new Record<>(record.key, record.value, 0L); + printProcessor.process(r); } printProcessor.close(); From ba676dd1c52da814a08b02892cec45209e279156 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 15:37:07 +0000 Subject: [PATCH 10/16] migrate kstream passtrhrough to new processor --- .../streams/kstream/internals/PassThrough.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java index b83b3a405f43d..2278f54021282 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java @@ -16,21 +16,22 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; -class PassThrough implements ProcessorSupplier { +class PassThrough implements ProcessorSupplier { @Override - public Processor get() { + public Processor get() { return new PassThroughProcessor<>(); } - private static final class PassThroughProcessor extends AbstractProcessor { + private static final class PassThroughProcessor extends AbstractProcessor { @Override - public void process(final K key, final V value) { - context().forward(key, value); + public void process(Record record) { + context().forward(record); } } } From 727b3e47e0ee83152aceb2c863a2753c1eb775b4 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 16:30:37 +0000 Subject: [PATCH 11/16] fix final var --- .../org/apache/kafka/streams/kstream/internals/PassThrough.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java index 2278f54021282..51d8ea823e3a4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java @@ -30,7 +30,7 @@ public Processor get() { private static final class PassThroughProcessor extends AbstractProcessor { @Override - public void process(Record record) { + public void process(final Record record) { context().forward(record); } } From e174691db2c63072aeda1fea59bce94501c7bbcb Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 30 Mar 2021 00:00:46 +0100 Subject: [PATCH 12/16] apply suggestions --- .../streams/kstream/ForeachProcessor.java | 34 +++++++++++++++++++ .../kstream/internals/KStreamBranch.java | 4 +-- .../kstream/internals/KStreamFilter.java | 4 +-- .../kstream/internals/KStreamFlatMap.java | 9 ++--- .../internals/KStreamFlatMapValues.java | 4 +-- .../kstream/internals/KStreamImpl.java | 5 +-- .../streams/kstream/internals/KStreamMap.java | 4 +-- .../kstream/internals/KStreamMapValues.java | 4 +-- .../kstream/internals/KStreamPeek.java | 12 +++---- .../kstream/internals/KStreamPrint.java | 9 +++-- .../kstream/internals/PassThrough.java | 4 +-- .../kstream/internals/PrintedInternal.java | 2 +- ...rocessor.java => ContextualProcessor.java} | 15 ++------ ...va => ContextualProcessorContextTest.java} | 0 14 files changed, 65 insertions(+), 45 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java rename streams/src/main/java/org/apache/kafka/streams/processor/api/{AbstractProcessor.java => ContextualProcessor.java} (80%) rename streams/src/test/java/org/apache/kafka/streams/processor/internals/{AbstractProcessorContextTest.java => ContextualProcessorContextTest.java} (100%) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java new file mode 100644 index 0000000000000..5ca5271508710 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; + +public class ForeachProcessor implements Processor { + + private final ForeachAction action; + + public ForeachProcessor(ForeachAction action) { + this.action = action; + } + + @Override + public void process(Record record) { + action.apply(record.key(), record.value()); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java index e07265c2290a9..2d3fc76ab49d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java @@ -18,7 +18,7 @@ import java.util.List; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -39,7 +39,7 @@ public Processor get() { return new KStreamBranchProcessor(); } - private class KStreamBranchProcessor extends AbstractProcessor { + private class KStreamBranchProcessor extends ContextualProcessor { @Override public void process(final Record record) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java index c2f3466455af4..ffafd10b460c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -37,7 +37,7 @@ public Processor get() { return new KStreamFilterProcessor(); } - private class KStreamFilterProcessor extends AbstractProcessor { + private class KStreamFilterProcessor extends ContextualProcessor { @Override public void process(final Record record) { if (filterNot ^ predicate.test(record.key(), record.value())) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index 76a07094cff60..995c20ef483a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -18,7 +18,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -36,11 +36,12 @@ public Processor get() { return new KStreamFlatMapProcessor(); } - private class KStreamFlatMapProcessor extends AbstractProcessor { + private class KStreamFlatMapProcessor extends ContextualProcessor { @Override public void process(final Record record) { - for (final KeyValue newPair : - mapper.apply(record.key(), record.value())) { + Iterable> newKeyValues = + mapper.apply(record.key(), record.value()); + for (final KeyValue newPair : newKeyValues) { context().forward(record.withKey(newPair.key).withValue(newPair.value)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java index 0c9a418f0fda3..1008b297b3d3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -35,7 +35,7 @@ public Processor get() { return new KStreamFlatMapValuesProcessor(); } - private class KStreamFlatMapValuesProcessor extends AbstractProcessor { + private class KStreamFlatMapValuesProcessor extends ContextualProcessor { @Override public void process(final Record record) { final Iterable newValues = mapper.apply(record.key(), record.value()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 9b5ed15539deb..213bebefe80f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -61,6 +61,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; +import org.apache.kafka.streams.kstream.ForeachProcessor; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; @@ -406,7 +407,7 @@ public void foreach(final ForeachAction action, final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME); final ProcessorParameters processorParameters = - new ProcessorParameters<>(new KStreamPeek<>(action, false), name); + new ProcessorParameters<>(() -> new ForeachProcessor<>(action), name); final ProcessorGraphNode foreachNode = new ProcessorGraphNode<>(name, processorParameters); @@ -426,7 +427,7 @@ public KStream peek(final ForeachAction action, final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME); final ProcessorParameters processorParameters = - new ProcessorParameters<>(new KStreamPeek<>(action, true), name); + new ProcessorParameters<>(new KStreamPeek<>(action), name); final ProcessorGraphNode peekNode = new ProcessorGraphNode<>(name, processorParameters); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index c50b342e0fc9f..0fec71630383e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -18,7 +18,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -36,7 +36,7 @@ public Processor get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor { + private class KStreamMapProcessor extends ContextualProcessor { @Override public void process(final Record record) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java index 1b73622dce0d7..f73bfdd53bc33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -35,7 +35,7 @@ public Processor get() { return new KStreamMapProcessor(); } - private class KStreamMapProcessor extends AbstractProcessor { + private class KStreamMapProcessor extends ContextualProcessor { @Override public void process(final Record record) { final VOut newValue = mapper.apply(record.key(), record.value()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java index c29f10e5c0844..69b5e7fc3316f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java @@ -17,19 +17,17 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; class KStreamPeek implements ProcessorSupplier { - private final boolean forwardDownStream; private final ForeachAction action; - public KStreamPeek(final ForeachAction action, final boolean forwardDownStream) { + public KStreamPeek(final ForeachAction action) { this.action = action; - this.forwardDownStream = forwardDownStream; } @Override @@ -37,13 +35,11 @@ public Processor get() { return new KStreamPeekProcessor(); } - private class KStreamPeekProcessor extends AbstractProcessor { + private class KStreamPeekProcessor extends ContextualProcessor { @Override public void process(final Record record) { action.apply(record.key(), record.value()); - if (forwardDownStream) { - context().forward(record); - } + context().forward(record); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java index f5a42e602b367..a04662cbb8706 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java @@ -17,25 +17,24 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.api.AbstractProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; -public class KStreamPrint implements ProcessorSupplier { +public class KStreamPrint implements ProcessorSupplier { private final ForeachAction action; - + public KStreamPrint(final ForeachAction action) { this.action = action; } @Override - public Processor get() { + public Processor get() { return new KStreamPrintProcessor(); } - private class KStreamPrintProcessor extends AbstractProcessor { + private class KStreamPrintProcessor implements Processor { @Override public void process(final Record record) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java index 51d8ea823e3a4..f357a464c2b23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.processor.api.AbstractProcessor; +import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -28,7 +28,7 @@ public Processor get() { return new PassThroughProcessor<>(); } - private static final class PassThroughProcessor extends AbstractProcessor { + private static final class PassThroughProcessor extends ContextualProcessor { @Override public void process(final Record record) { context().forward(record); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java index a8c679ef4fa31..0cd1760e69af8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java @@ -24,7 +24,7 @@ public PrintedInternal(final Printed printed) { super(printed); } - public ProcessorSupplier build(final String processorName) { + public ProcessorSupplier build(final String processorName) { return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java similarity index 80% rename from streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java index f4c03c7e0ec09..d2522e3e075df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java @@ -25,28 +25,17 @@ * @param the type of output keys * @param the type of output values */ -public abstract class AbstractProcessor implements Processor { +public abstract class ContextualProcessor implements Processor { protected ProcessorContext context; - protected AbstractProcessor() {} + protected ContextualProcessor() {} @Override public void init(final ProcessorContext context) { this.context = context; } - /** - * Close this processor and clean up any resources. - *

- * This method does nothing by default; if desired, subclasses should override it with custom functionality. - *

- */ - @Override - public void close() { - // do nothing - } - /** * Get the processor's context set during {@link #init(ProcessorContext) initialization}. * diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ContextualProcessorContextTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/ContextualProcessorContextTest.java From 4bb5299c17b0f13d742d745dea401da9bfe5c919 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 31 Mar 2021 14:13:22 +0100 Subject: [PATCH 13/16] rollback change in test class --- ...rocessorContextTest.java => AbstractProcessorContextTest.java} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename streams/src/test/java/org/apache/kafka/streams/processor/internals/{ContextualProcessorContextTest.java => AbstractProcessorContextTest.java} (100%) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ContextualProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/ContextualProcessorContextTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java From e5dea3d18f8116b5667bb7cef3098e3fcb2d7049 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 31 Mar 2021 14:21:06 +0100 Subject: [PATCH 14/16] adjust tests to new types --- .../apache/kafka/streams/kstream/PrintedTest.java | 12 ++++++------ .../streams/kstream/internals/KStreamPrintTest.java | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java index 9b803c31601c6..212074fd23614 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java @@ -60,10 +60,10 @@ public void after() { @Test public void shouldCreateProcessorThatPrintsToFile() throws IOException { final File file = TestUtils.tempFile(); - final ProcessorSupplier processorSupplier = new PrintedInternal<>( + final ProcessorSupplier processorSupplier = new PrintedInternal<>( Printed.toFile(file.getPath())) .build("processor"); - final Processor processor = processorSupplier.get(); + final Processor processor = processorSupplier.get(); processor.process(new Record<>("hi", 1, 0L)); processor.close(); try (final InputStream stream = Files.newInputStream(file.toPath())) { @@ -75,8 +75,8 @@ public void shouldCreateProcessorThatPrintsToFile() throws IOException { @Test public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException { - final ProcessorSupplier supplier = new PrintedInternal<>(sysOutPrinter).build("processor"); - final Processor processor = supplier.get(); + final ProcessorSupplier supplier = new PrintedInternal<>(sysOutPrinter).build("processor"); + final Processor processor = supplier.get(); processor.process(new Record<>("good", 2, 0L)); processor.close(); @@ -85,7 +85,7 @@ public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncoding @Test public void shouldPrintWithLabel() throws UnsupportedEncodingException { - final Processor processor = new PrintedInternal<>(sysOutPrinter.withLabel("label")) + final Processor processor = new PrintedInternal<>(sysOutPrinter.withLabel("label")) .build("processor") .get(); @@ -96,7 +96,7 @@ public void shouldPrintWithLabel() throws UnsupportedEncodingException { @Test public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException { - final Processor processor = new PrintedInternal<>( + final Processor processor = new PrintedInternal<>( sysOutPrinter.withKeyValueMapper((key, value) -> String.format("%s -> %d", key, value)) ).build("processor").get(); processor.process(new Record<>("hello", 1, 0L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java index 84195b1b14e48..2df1bf4cd0fe8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java @@ -34,7 +34,7 @@ public class KStreamPrintTest { private ByteArrayOutputStream byteOutStream; - private Processor printProcessor; + private Processor printProcessor; @Before public void setUp() { @@ -46,7 +46,7 @@ public void setUp() { "test-stream")); printProcessor = kStreamPrint.get(); - final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class); + final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class); EasyMock.replay(processorContext); printProcessor.init(processorContext); From 3def9830017a8469e324757cf8d6263c21d5cd57 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 31 Mar 2021 14:22:21 +0100 Subject: [PATCH 15/16] adjust tests to new types --- .../apache/kafka/streams/kstream/internals/KStreamPrintTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java index 2df1bf4cd0fe8..2915a118792a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java @@ -53,7 +53,6 @@ public void setUp() { } @Test - @SuppressWarnings("unchecked") public void testPrintStreamWithProvidedKeyValueMapper() { final List> inputRecords = Arrays.asList( new KeyValue<>(0, "zero"), From 42b79e789b9f54f20cdea3ea9d9c41ba2d9ff08f Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 1 Apr 2021 19:23:27 +0100 Subject: [PATCH 16/16] fix checkstyle --- .../kafka/streams/kstream/ForeachProcessor.java | 16 ++++++++-------- .../kstream/internals/KStreamFlatMap.java | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java index 5ca5271508710..cccd298cd5dca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java @@ -21,14 +21,14 @@ public class ForeachProcessor implements Processor { - private final ForeachAction action; + private final ForeachAction action; - public ForeachProcessor(ForeachAction action) { - this.action = action; - } + public ForeachProcessor(final ForeachAction action) { + this.action = action; + } - @Override - public void process(Record record) { - action.apply(record.key(), record.value()); - } + @Override + public void process(final Record record) { + action.apply(record.key(), record.value()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index 995c20ef483a1..b731a5c1c4f48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -39,7 +39,7 @@ public Processor get() { private class KStreamFlatMapProcessor extends ContextualProcessor { @Override public void process(final Record record) { - Iterable> newKeyValues = + final Iterable> newKeyValues = mapper.apply(record.key(), record.value()); for (final KeyValue newPair : newKeyValues) { context().forward(record.withKey(newPair.key).withValue(newPair.value));