Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;

public class ForeachProcessor<K, V> implements Processor<K, V, Void, Void> {

private final ForeachAction<K, V> action;

public ForeachProcessor(final ForeachAction<K, V> action) {
this.action = action;
}

@Override
public void process(final Record<K, V> record) {
action.apply(record.key(), record.value());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,44 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;

import java.util.List;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
class KStreamBranch<K, V> implements ProcessorSupplier<K, V, K, V> {

private final List<Predicate<? super K, ? super V>> predicates;
private final List<String> childNodes;

KStreamBranch(final List<Predicate<? super K, ? super V>> predicates,
final List<String> childNodes) {
final List<String> childNodes) {
this.predicates = predicates;
this.childNodes = childNodes;
}

@Override
public Processor<K, V> get() {
public Processor<K, V, K, V> get() {
return new KStreamBranchProcessor();
}

private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
private class KStreamBranchProcessor extends ContextualProcessor<K, V, K, V> {

@Override
public void process(final K key, final V value) {
public void process(final Record<K, V> record) {
for (int i = 0; i < predicates.size(); i++) {
if (predicates.get(i).test(key, value)) {
if (predicates.get(i).test(record.key(), record.value())) {
// use forward with child here and then break the loop
// so that no record is going to be piped to multiple streams
context().forward(key, value, To.child(childNodes.get(i)));
context().forward(record, childNodes.get(i));
return;
}
}
// using default child node if supplied
if (childNodes.size() > predicates.size()) {
context().forward(key, value, To.child(childNodes.get(predicates.size())));
context().forward(record, childNodes.get(predicates.size()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
class KStreamFilter<K, V> implements ProcessorSupplier<K, V, K, V> {

private final Predicate<K, V> predicate;
private final boolean filterNot;
Expand All @@ -32,15 +33,15 @@ public KStreamFilter(final Predicate<K, V> predicate, final boolean filterNot) {
}

@Override
public Processor<K, V> get() {
public Processor<K, V, K, V> get() {
return new KStreamFilterProcessor();
}

private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
private class KStreamFilterProcessor extends ContextualProcessor<K, V, K, V> {
@Override
public void process(final K key, final V value) {
if (filterNot ^ predicate.test(key, value)) {
context().forward(key, value);
public void process(final Record<K, V> record) {
if (filterNot ^ predicate.test(record.key(), record.value())) {
context().forward(record);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
class KStreamFlatMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

private final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper;
private final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper;

KStreamFlatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
KStreamFlatMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) {
this.mapper = mapper;
}

@Override
public Processor<K, V> get() {
public Processor<KIn, VIn, KOut, VOut> get() {
return new KStreamFlatMapProcessor();
}

private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
private class KStreamFlatMapProcessor extends ContextualProcessor<KIn, VIn, KOut, VOut> {
@Override
public void process(final K key, final V value) {
for (final KeyValue<? extends K1, ? extends V1> newPair : mapper.apply(key, value)) {
context().forward(newPair.key, newPair.value);
public void process(final Record<KIn, VIn> record) {
final Iterable<? extends KeyValue<? extends KOut, ? extends VOut>> newKeyValues =
mapper.apply(record.key(), record.value());
for (final KeyValue<? extends KOut, ? extends VOut> newPair : newKeyValues) {
context().forward(record.withKey(newPair.key).withValue(newPair.value));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,30 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
class KStreamFlatMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {

private final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper;
private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends Iterable<? extends VOut>> mapper;

KStreamFlatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper) {
KStreamFlatMapValues(final ValueMapperWithKey<? super KIn, ? super VIn, ? extends Iterable<? extends VOut>> mapper) {
this.mapper = mapper;
}

@Override
public Processor<K, V> get() {
public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamFlatMapValuesProcessor();
}

private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
private class KStreamFlatMapValuesProcessor extends ContextualProcessor<KIn, VIn, KIn, VOut> {
@Override
public void process(final K key, final V value) {
final Iterable<? extends V1> newValues = mapper.apply(key, value);
for (final V1 v : newValues) {
context().forward(key, v);
public void process(final Record<KIn, VIn> record) {
final Iterable<? extends VOut> newValues = mapper.apply(record.key(), record.value());
for (final VOut v : newValues) {
context().forward(record.withValue(v));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.kstream.ForeachProcessor;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -406,7 +407,7 @@ public void foreach(final ForeachAction<? super K, ? super V> action,

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
new ProcessorParameters<>(new KStreamPeek<>(action, false), name);
new ProcessorParameters<>(() -> new ForeachProcessor<>(action), name);
final ProcessorGraphNode<? super K, ? super V> foreachNode =
new ProcessorGraphNode<>(name, processorParameters);

Expand All @@ -426,7 +427,7 @@ public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action,

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
new ProcessorParameters<>(new KStreamPeek<>(action, true), name);
new ProcessorParameters<>(new KStreamPeek<>(action), name);
final ProcessorGraphNode<? super K, ? super V> peekNode =
new ProcessorGraphNode<>(name, processorParameters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
class KStreamMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

private final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper;
private final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper;

public KStreamMap(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
public KStreamMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) {
this.mapper = mapper;
}

@Override
public Processor<K, V> get() {
public Processor<KIn, VIn, KOut, VOut> get() {
return new KStreamMapProcessor();
}

private class KStreamMapProcessor extends AbstractProcessor<K, V> {
private class KStreamMapProcessor extends ContextualProcessor<KIn, VIn, KOut, VOut> {

@Override
public void process(final K key, final V value) {
final KeyValue<? extends K1, ? extends V1> newPair = mapper.apply(key, value);
context().forward(newPair.key, newPair.value);
public void process(final Record<KIn, VIn> record) {
final KeyValue<? extends KOut, ? extends VOut> newPair =
mapper.apply(record.key(), record.value());
context().forward(record.withKey(newPair.key).withValue(newPair.value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
class KStreamMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {

private final ValueMapperWithKey<K, V, V1> mapper;
private final ValueMapperWithKey<KIn, VIn, VOut> mapper;

public KStreamMapValues(final ValueMapperWithKey<K, V, V1> mapper) {
public KStreamMapValues(final ValueMapperWithKey<KIn, VIn, VOut> mapper) {
this.mapper = mapper;
}

@Override
public Processor<K, V> get() {
public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamMapProcessor();
}

private class KStreamMapProcessor extends AbstractProcessor<K, V> {
private class KStreamMapProcessor extends ContextualProcessor<KIn, VIn, KIn, VOut> {
@Override
public void process(final K readOnlyKey, final V value) {
final V1 newValue = mapper.apply(readOnlyKey, value);
context().forward(readOnlyKey, newValue);
public void process(final Record<KIn, VIn> record) {
final VOut newValue = mapper.apply(record.key(), record.value());
context().forward(record.withValue(newValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,29 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
class KStreamPeek<K, V> implements ProcessorSupplier<K, V, K, V> {

private final boolean forwardDownStream;
private final ForeachAction<K, V> action;

public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownStream) {
public KStreamPeek(final ForeachAction<K, V> action) {
this.action = action;
this.forwardDownStream = forwardDownStream;
}

@Override
public Processor<K, V> get() {
public Processor<K, V, K, V> get() {
return new KStreamPeekProcessor();
}

private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
private class KStreamPeekProcessor extends ContextualProcessor<K, V, K, V> {
@Override
public void process(final K key, final V value) {
action.apply(key, value);
if (forwardDownStream) {
context().forward(key, value);
}
public void process(final Record<K, V> record) {
action.apply(record.key(), record.value());
context().forward(record);
}
}

Expand Down
Loading