diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
index 4c382b6932ce6..ca84f5bb71801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -19,7 +19,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
/**
* Interface that specifies how an exception from source node deserialization
@@ -31,14 +32,14 @@ public interface DeserializationExceptionHandler extends Configurable {
* Inspect a record and the exception received.
*
* Note, that the passed in {@link ProcessorContext} only allows to access metadata like the task ID.
- * However, it cannot be used to emit records via {@link ProcessorContext#forward(Object, Object)};
+ * However, it cannot be used to emit records via {@link ProcessorContext#forward(Record)};
* calling {@code forward()} (and some other methods) would result in a runtime exception.
*
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
*/
- DeserializationHandlerResponse handle(final ProcessorContext context,
+ DeserializationHandlerResponse handle(final ProcessorContext, ?> context,
final ConsumerRecord record,
final Exception exception);
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index 4f9a0964405a2..8ac66db59aa17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -18,7 +18,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
@Override
- public DeserializationHandlerResponse handle(final ProcessorContext context,
+ public DeserializationHandlerResponse handle(final ProcessorContext, ?> context,
final ConsumerRecord record,
final Exception exception) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 61d210649ba9a..3375149a85496 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
@Override
- public DeserializationHandlerResponse handle(final ProcessorContext context,
+ public DeserializationHandlerResponse handle(final ProcessorContext, ?> context,
final ConsumerRecord record,
final Exception exception) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 00b9ee8991477..4b82e54ab00ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -26,29 +26,29 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
/**
- * {@code KStream} is an abstraction of a record stream of {@link KeyValue} pairs, i.e., each record is an
- * independent entity/event in the real world.
- * For example a user X might buy two items I1 and I2, and thus there might be two records {@code , }
- * in the stream.
+ * {@code KStream} is an abstraction of a record stream of {@link KeyValue} pairs, i.e., each
+ * record is an independent entity/event in the real world. For example a user X might buy two items
+ * I1 and I2, and thus there might be two records {@code , } in the stream.
*
- * A {@code KStream} is either {@link StreamsBuilder#stream(String) defined from one or multiple Kafka topics} that
- * are consumed message by message or the result of a {@code KStream} transformation.
- * A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code KStream}.
+ * A {@code KStream} is either {@link StreamsBuilder#stream(String) defined from one or multiple
+ * Kafka topics} that are consumed message by message or the result of a {@code KStream}
+ * transformation. A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code
+ * KStream}.
*
- * A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable},
- * {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
- * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via
- * {@link #process(ProcessorSupplier, String...) process(...)},
- * {@link #transform(TransformerSupplier, String...) transform(...)}, and
- * {@link #transformValues(ValueTransformerSupplier, String...) transformValues(...)}.
+ * A {@code KStream} can be transformed record by record, joined with another {@code KStream},
+ * {@link KTable}, {@link GlobalKTable}, or can be aggregated into a {@link KTable}. Kafka Streams
+ * DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via {@link
+ * #process(org.apache.kafka.streams.processor.ProcessorSupplier, String...) process(...)}, {@link
+ * #transform(TransformerSupplier, String...) transform(...)}, and {@link
+ * #transformValues(ValueTransformerSupplier, String...) transformValues(...)}.
*
* @param Type of keys
* @param Type of values
@@ -59,9 +59,9 @@
public interface KStream {
/**
- * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
- * All records that do not satisfy the predicate are dropped.
- * This is a stateless record-by-record operation.
+ * Create a new {@code KStream} that consists of all records of this stream which satisfy the
+ * given predicate. All records that do not satisfy the predicate are dropped. This is a
+ * stateless record-by-record operation.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @return a {@code KStream} that contains only those records that satisfy the given predicate
@@ -70,9 +70,9 @@ public interface KStream {
KStream filter(final Predicate super K, ? super V> predicate);
/**
- * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
- * All records that do not satisfy the predicate are dropped.
- * This is a stateless record-by-record operation.
+ * Create a new {@code KStream} that consists of all records of this stream which satisfy the
+ * given predicate. All records that do not satisfy the predicate are dropped. This is a
+ * stateless record-by-record operation.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
@@ -82,39 +82,39 @@ public interface KStream {
KStream filter(final Predicate super K, ? super V> predicate, final Named named);
/**
- * Create a new {@code KStream} that consists all records of this stream which do not satisfy the given
- * predicate.
- * All records that do satisfy the predicate are dropped.
+ * Create a new {@code KStream} that consists all records of this stream which do not
+ * satisfy the given predicate. All records that do satisfy the predicate are dropped.
* This is a stateless record-by-record operation.
*
* @param predicate a filter {@link Predicate} that is applied to each record
- * @return a {@code KStream} that contains only those records that do not satisfy the given predicate
+ * @return a {@code KStream} that contains only those records that do not satisfy the
+ * given predicate
* @see #filter(Predicate)
*/
KStream filterNot(final Predicate super K, ? super V> predicate);
/**
- * Create a new {@code KStream} that consists all records of this stream which do not satisfy the given
- * predicate.
- * All records that do satisfy the predicate are dropped.
+ * Create a new {@code KStream} that consists all records of this stream which do not
+ * satisfy the given predicate. All records that do satisfy the predicate are dropped.
* This is a stateless record-by-record operation.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
- * @return a {@code KStream} that contains only those records that do not satisfy the given predicate
+ * @return a {@code KStream} that contains only those records that do not satisfy the
+ * given predicate
* @see #filter(Predicate)
*/
KStream filterNot(final Predicate super K, ? super V> predicate, final Named named);
/**
- * Set a new key (with possibly new type) for each input record.
- * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation.
- *
- * For example, you can use this transformation to set a key for a key-less input record {@code } by
- * extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
- * length of the value string.
+ * Set a new key (with possibly new type) for each input record. The provided {@link
+ * KeyValueMapper} is applied to each input record and computes a new key for it. Thus, an input
+ * record {@code } can be transformed into an output record {@code }. This is a
+ * stateless record-by-record operation.
+ *
+ * For example, you can use this transformation to set a key for a key-less input record {@code
+ * } by extracting a key from the value within your {@link KeyValueMapper}. The example
+ * below computes the new key as the length of the value string.
* {@code
* KStream keyLessStream = builder.stream("key-less-topic");
* KStream keyedStream = keyLessStream.selectKey(new KeyValueMapper {
@@ -123,12 +123,13 @@ public interface KStream {
* }
* });
* }
- * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}.
+ * Setting a new key might result in an internal data redistribution if a key based operator
+ * (like an aggregation or join) is applied to the result {@code KStream}.
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
- * @param the new key type of the result stream
- * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
+ * @param the new key type of the result stream
+ * @return a {@code KStream} that contains records with new key (possibly of different type) and
+ * unmodified value
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
@@ -136,17 +137,18 @@ public interface KStream {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
*/
- KStream selectKey(final KeyValueMapper super K, ? super V, ? extends KR> mapper);
+ KStream selectKey(
+ final KeyValueMapper super K, ? super V, ? extends KOut> mapper);
/**
- * Set a new key (with possibly new type) for each input record.
- * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation.
- *
- * For example, you can use this transformation to set a key for a key-less input record {@code } by
- * extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
- * length of the value string.
+ * Set a new key (with possibly new type) for each input record. The provided {@link
+ * KeyValueMapper} is applied to each input record and computes a new key for it. Thus, an input
+ * record {@code } can be transformed into an output record {@code }. This is a
+ * stateless record-by-record operation.
+ *
+ * For example, you can use this transformation to set a key for a key-less input record {@code
+ * } by extracting a key from the value within your {@link KeyValueMapper}. The example
+ * below computes the new key as the length of the value string.
* {@code
* KStream keyLessStream = builder.stream("key-less-topic");
* KStream keyedStream = keyLessStream.selectKey(new KeyValueMapper {
@@ -155,13 +157,14 @@ public interface KStream {
* }
* });
* }
- * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}.
+ * Setting a new key might result in an internal data redistribution if a key based operator
+ * (like an aggregation or join) is applied to the result {@code KStream}.
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
* @param named a {@link Named} config used to name the processor in the topology
- * @param the new key type of the result stream
- * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
+ * @param the new key type of the result stream
+ * @return a {@code KStream} that contains records with new key (possibly of different type) and
+ * unmodified value
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
@@ -169,18 +172,20 @@ public interface KStream {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
*/
- KStream selectKey(final KeyValueMapper super K, ? super V, ? extends KR> mapper,
- final Named named);
+ KStream selectKey(
+ final KeyValueMapper super K, ? super V, ? extends KOut> mapper,
+ final Named named);
/**
- * Transform each record of the input stream into a new record in the output stream (both key and value type can be
- * altered arbitrarily).
- * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
- * stateful record transformation).
- *
- * The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
+ * Transform each record of the input stream into a new record in the output stream (both key
+ * and value type can be altered arbitrarily). The provided {@link KeyValueMapper} is applied to
+ * each input record and computes a new output record. Thus, an input record {@code } can
+ * be transformed into an output record {@code }. This is a stateless record-by-record
+ * operation (cf. {@link #transform(TransformerSupplier, String...)} for stateful record
+ * transformation).
+ *
+ * The example below normalizes the String key to upper-case letters and counts the number of
+ * token of the value string.
*
{@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.map(new KeyValueMapper> {
@@ -189,15 +194,18 @@ KStream selectKey(final KeyValueMapper super K, ? super V, ? exten
* }
* });
* }
- * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
+ * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return
+ * {@code null}.
*
- * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
+ * Mapping records might result in an internal data redistribution if a key based operator (like
+ * an aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
- * @param the key type of the result stream
- * @param the value type of the result stream
- * @return a {@code KStream} that contains records with new key and value (possibly both of different type)
+ * @param the key type of the result stream
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains records with new key and value (possibly both of
+ * different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
@@ -208,17 +216,19 @@ KStream selectKey(final KeyValueMapper super K, ? super V, ? exten
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream map(final KeyValueMapper super K, ? super V, ? extends KeyValue extends KR, ? extends VR>> mapper);
+ KStream map(
+ final KeyValueMapper super K, ? super V, ? extends KeyValue extends KOut, ? extends VOut>> mapper);
/**
- * Transform each record of the input stream into a new record in the output stream (both key and value type can be
- * altered arbitrarily).
- * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
- * stateful record transformation).
- *
- * The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
+ * Transform each record of the input stream into a new record in the output stream (both key
+ * and value type can be altered arbitrarily). The provided {@link KeyValueMapper} is applied to
+ * each input record and computes a new output record. Thus, an input record {@code } can
+ * be transformed into an output record {@code }. This is a stateless record-by-record
+ * operation (cf. {@link #transform(TransformerSupplier, String...)} for stateful record
+ * transformation).
+ *
+ * The example below normalizes the String key to upper-case letters and counts the number of
+ * token of the value string.
*
{@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.map(new KeyValueMapper> {
@@ -227,16 +237,19 @@ KStream selectKey(final KeyValueMapper super K, ? super V, ? exten
* }
* });
* }
- * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
+ * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return
+ * {@code null}.
*
- * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
- * join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
+ * Mapping records might result in an internal data redistribution if a key based operator (like
+ * an aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param named a {@link Named} config used to name the processor in the topology
- * @param the key type of the result stream
- * @param the value type of the result stream
- * @return a {@code KStream} that contains records with new key and value (possibly both of different type)
+ * @param the key type of the result stream
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains records with new key and value (possibly both of
+ * different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
@@ -247,15 +260,16 @@ KStream selectKey(final KeyValueMapper super K, ? super V, ? exten
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream map(final KeyValueMapper super K, ? super V, ? extends KeyValue extends KR, ? extends VR>> mapper,
- final Named named);
+ KStream map(
+ final KeyValueMapper super K, ? super V, ? extends KeyValue extends KOut, ? extends VOut>> mapper,
+ final Named named);
/**
- * Transform the value of each input record into a new value (with possible new type) of the output record.
- * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+ * Transform the value of each input record into a new value (with possible new type) of the
+ * output record. The provided {@link ValueMapper} is applied to each input record value and
+ * computes a new value for it. Thus, an input record {@code } can be transformed into an
+ * output record {@code }. This is a stateless record-by-record operation (cf. {@link
+ * #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
*
* The example below counts the number of token of the value string.
*
{@code
@@ -266,13 +280,14 @@ KStream map(final KeyValueMapper super K, ? super V, ? extend
* }
* });
* }
- * Setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ * Setting a new value preserves data co-location with respect to the key. Thus, no
+ * internal data redistribution is required if a key based operator (like an aggregation or
+ * join) is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
*
* @param mapper a {@link ValueMapper} that computes a new output value
- * @param the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly
+ * of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -282,14 +297,14 @@ KStream map(final KeyValueMapper super K, ? super V, ? extend
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream mapValues(final ValueMapper super V, ? extends VR> mapper);
+ KStream mapValues(final ValueMapper super V, ? extends VOut> mapper);
/**
- * Transform the value of each input record into a new value (with possible new type) of the output record.
- * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+ * Transform the value of each input record into a new value (with possible new type) of the
+ * output record. The provided {@link ValueMapper} is applied to each input record value and
+ * computes a new value for it. Thus, an input record {@code } can be transformed into an
+ * output record {@code }. This is a stateless record-by-record operation (cf. {@link
+ * #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
*
* The example below counts the number of token of the value string.
*
{@code
@@ -300,14 +315,15 @@ KStream map(final KeyValueMapper super K, ? super V, ? extend
* }
* });
* }
- * Setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ * Setting a new value preserves data co-location with respect to the key. Thus, no
+ * internal data redistribution is required if a key based operator (like an aggregation or
+ * join) is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
- * @param the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly
+ * of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -317,15 +333,16 @@ KStream map(final KeyValueMapper super K, ? super V, ? extend
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream mapValues(final ValueMapper super V, ? extends VR> mapper,
- final Named named);
+ KStream mapValues(final ValueMapper super V, ? extends VOut> mapper,
+ final Named named);
/**
- * Transform the value of each input record into a new value (with possible new type) of the output record.
- * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value transformation).
+ * Transform the value of each input record into a new value (with possible new type) of the
+ * output record. The provided {@link ValueMapperWithKey} is applied to each input record value
+ * and computes a new value for it. Thus, an input record {@code } can be transformed into
+ * an output record {@code }. This is a stateless record-by-record operation (cf. {@link
+ * #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value
+ * transformation).
*
* The example below counts the number of tokens of key and value strings.
*
{@code
@@ -336,14 +353,16 @@ KStream mapValues(final ValueMapper super V, ? extends VR> mapper,
* }
* });
* }
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt
+ * partitioning. So, setting a new value preserves data co-location with respect to the key.
+ * Thus, no internal data redistribution is required if a key based operator (like an
+ * aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #map(KeyValueMapper)})
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
- * @param the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly
+ * of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -353,14 +372,16 @@ KStream mapValues(final ValueMapper super V, ? extends VR> mapper,
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream mapValues(final ValueMapperWithKey super K, ? super V, ? extends VR> mapper);
+ KStream mapValues(
+ final ValueMapperWithKey super K, ? super V, ? extends VOut> mapper);
/**
- * Transform the value of each input record into a new value (with possible new type) of the output record.
- * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
- * Thus, an input record {@code } can be transformed into an output record {@code }.
- * This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value transformation).
+ * Transform the value of each input record into a new value (with possible new type) of the
+ * output record. The provided {@link ValueMapperWithKey} is applied to each input record value
+ * and computes a new value for it. Thus, an input record {@code } can be transformed into
+ * an output record {@code }. This is a stateless record-by-record operation (cf. {@link
+ * #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value
+ * transformation).
*
* The example below counts the number of tokens of key and value strings.
*
{@code
@@ -371,15 +392,17 @@ KStream mapValues(final ValueMapper super V, ? extends VR> mapper,
* }
* });
* }
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt
+ * partitioning. So, setting a new value preserves data co-location with respect to the key.
+ * Thus, no internal data redistribution is required if a key based operator (like an
+ * aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #map(KeyValueMapper)})
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
- * @param the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly
+ * of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -389,19 +412,20 @@ KStream mapValues(final ValueMapper super V, ? extends VR> mapper,
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream mapValues(final ValueMapperWithKey super K, ? super V, ? extends VR> mapper,
- final Named named);
+ KStream mapValues(
+ final ValueMapperWithKey super K, ? super V, ? extends VOut> mapper,
+ final Named named);
/**
- * Transform each record of the input stream into zero or more records in the output stream (both key and value type
- * can be altered arbitrarily).
- * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
- * stateful record transformation).
- *
- * The example below splits input records {@code } containing sentences as values into their words
- * and emit a record {@code } for each word.
+ * Transform each record of the input stream into zero or more records in the output stream
+ * (both key and value type can be altered arbitrarily). The provided {@link KeyValueMapper} is
+ * applied to each input record and computes zero or more output records. Thus, an input record
+ * {@code } can be transformed into output records {@code , , ...}. This is
+ * a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier,
+ * String...)} for stateful record transformation).
+ *
+ * The example below splits input records {@code } containing sentences as values
+ * into their words and emit a record {@code } for each word.
* {@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.flatMap(
@@ -418,16 +442,18 @@ KStream mapValues(final ValueMapperWithKey super K, ? super V, ? e
* }
* });
* }
- * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
- * and the return value must not be {@code null}.
+ * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link
+ * java.util.Collection} type) and the return value must not be {@code null}.
*
- * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
- * or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)})
+ * Flat-mapping records might result in an internal data redistribution if a key based operator
+ * (like an aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #flatMapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes the new output records
- * @param the key type of the result stream
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+ * @param the key type of the result stream
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with new key and value (possibly
+ * of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #mapValues(ValueMapper)
@@ -441,18 +467,19 @@ KStream mapValues(final ValueMapperWithKey super K, ? super V, ? e
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream flatMap(final KeyValueMapper super K, ? super V, ? extends Iterable extends KeyValue extends KR, ? extends VR>>> mapper);
+ KStream flatMap(
+ final KeyValueMapper super K, ? super V, ? extends Iterable extends KeyValue extends KOut, ? extends VOut>>> mapper);
/**
- * Transform each record of the input stream into zero or more records in the output stream (both key and value type
- * can be altered arbitrarily).
- * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
- * stateful record transformation).
- *
- * The example below splits input records {@code } containing sentences as values into their words
- * and emit a record {@code } for each word.
+ * Transform each record of the input stream into zero or more records in the output stream
+ * (both key and value type can be altered arbitrarily). The provided {@link KeyValueMapper} is
+ * applied to each input record and computes zero or more output records. Thus, an input record
+ * {@code } can be transformed into output records {@code , , ...}. This is
+ * a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier,
+ * String...)} for stateful record transformation).
+ *
+ * The example below splits input records {@code } containing sentences as values
+ * into their words and emit a record {@code } for each word.
* {@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.flatMap(
@@ -469,17 +496,19 @@ KStream mapValues(final ValueMapperWithKey super K, ? super V, ? e
* }
* });
* }
- * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
- * and the return value must not be {@code null}.
+ * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link
+ * java.util.Collection} type) and the return value must not be {@code null}.
*
- * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
- * or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)})
+ * Flat-mapping records might result in an internal data redistribution if a key based operator
+ * (like an aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #flatMapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes the new output records
* @param named a {@link Named} config used to name the processor in the topology
- * @param the key type of the result stream
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+ * @param the key type of the result stream
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with new key and value (possibly
+ * of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #mapValues(ValueMapper)
@@ -493,20 +522,22 @@ KStream mapValues(final ValueMapperWithKey super K, ? super V, ? e
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream flatMap(final KeyValueMapper super K, ? super V, ? extends Iterable extends KeyValue extends KR, ? extends VR>>> mapper,
- final Named named);
+ KStream flatMap(
+ final KeyValueMapper super K, ? super V, ? extends Iterable extends KeyValue extends KOut, ? extends VOut>>> mapper,
+ final Named named);
/**
- * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
- * with the same key in the new stream.
- * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
- * stream (value type can be altered arbitrarily).
- * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
+ * Create a new {@code KStream} by transforming the value of each record in this stream into
+ * zero or more values with the same key in the new stream. Transform the value of each input
+ * record into zero or more records with the same (unmodified) key in the output stream (value
+ * type can be altered arbitrarily). The provided {@link ValueMapper} is applied to each input
+ * record and computes zero or more output values. Thus, an input record {@code } can be
+ * transformed into output records {@code , , ...}. This is a stateless
+ * record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
* for stateful value transformation).
*
- * The example below splits input records {@code } containing sentences as values into their words.
+ * The example below splits input records {@code } containing sentences as values
+ * into their words.
* {@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
@@ -515,16 +546,18 @@ KStream flatMap(final KeyValueMapper super K, ? super V, ? ex
* }
* });
* }
- * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
- * and the return value must not be {@code null}.
+ * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link
+ * java.util.Collection} type) and the return value must not be {@code null}.
*
- * Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ * Splitting a record into multiple records with the same key preserves data co-location with
+ * respect to the key. Thus, no internal data redistribution is required if a key based
+ * operator (like an aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #flatMap(KeyValueMapper)})
*
* @param mapper a {@link ValueMapper} the computes the new output values
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new
+ * values of different type
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -537,19 +570,21 @@ KStream flatMap(final KeyValueMapper super K, ? super V, ? ex
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream flatMapValues(final ValueMapper super V, ? extends Iterable extends VR>> mapper);
+ KStream flatMapValues(
+ final ValueMapper super V, ? extends Iterable extends VOut>> mapper);
/**
- * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
- * with the same key in the new stream.
- * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
- * stream (value type can be altered arbitrarily).
- * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
+ * Create a new {@code KStream} by transforming the value of each record in this stream into
+ * zero or more values with the same key in the new stream. Transform the value of each input
+ * record into zero or more records with the same (unmodified) key in the output stream (value
+ * type can be altered arbitrarily). The provided {@link ValueMapper} is applied to each input
+ * record and computes zero or more output values. Thus, an input record {@code } can be
+ * transformed into output records {@code , , ...}. This is a stateless
+ * record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
* for stateful value transformation).
*
- * The example below splits input records {@code } containing sentences as values into their words.
+ * The example below splits input records {@code } containing sentences as values
+ * into their words.
* {@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
@@ -558,17 +593,19 @@ KStream flatMap(final KeyValueMapper super K, ? super V, ? ex
* }
* });
* }
- * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
- * and the return value must not be {@code null}.
+ * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link
+ * java.util.Collection} type) and the return value must not be {@code null}.
*
- * Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ * Splitting a record into multiple records with the same key preserves data co-location with
+ * respect to the key. Thus, no internal data redistribution is required if a key based
+ * operator (like an aggregation or join) is applied to the result {@code KStream}. (cf. {@link
+ * #flatMap(KeyValueMapper)})
*
* @param mapper a {@link ValueMapper} the computes the new output values
* @param named a {@link Named} config used to name the processor in the topology
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new
+ * values of different type
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -581,20 +618,22 @@ KStream flatMap(final KeyValueMapper super K, ? super V, ? ex
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream flatMapValues(final ValueMapper super V, ? extends Iterable extends VR>> mapper,
- final Named named);
+ KStream flatMapValues(
+ final ValueMapper super V, ? extends Iterable extends VOut>> mapper,
+ final Named named);
+
/**
- * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
- * with the same key in the new stream.
- * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
- * stream (value type can be altered arbitrarily).
- * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
- * for stateful value transformation).
- *
- * The example below splits input records {@code }, with key=1, containing sentences as values
- * into their words.
+ * Create a new {@code KStream} by transforming the value of each record in this stream into
+ * zero or more values with the same key in the new stream. Transform the value of each input
+ * record into zero or more records with the same (unmodified) key in the output stream (value
+ * type can be altered arbitrarily). The provided {@link ValueMapperWithKey} is applied to each
+ * input record and computes zero or more output values. Thus, an input record {@code } can
+ * be transformed into output records {@code , , ...}. This is a stateless
+ * record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier,
+ * String...)} for stateful value transformation).
+ *
+ * The example below splits input records {@code }, with key=1, containing
+ * sentences as values into their words.
* {@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
@@ -607,17 +646,19 @@ KStream flatMapValues(final ValueMapper super V, ? extends Iterabl
* }
* });
* }
- * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
- * and the return value must not be {@code null}.
+ * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link
+ * java.util.Collection} type) and the return value must not be {@code null}.
*
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt
+ * partitioning. So, splitting a record into multiple records with the same key preserves data
+ * co-location with respect to the key. Thus, no internal data redistribution is
+ * required if a key based operator (like an aggregation or join) is applied to the result
+ * {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
*
* @param mapper a {@link ValueMapperWithKey} the computes the new output values
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new
+ * values of different type
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -630,20 +671,21 @@ KStream flatMapValues(final ValueMapper super V, ? extends Iterabl
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream flatMapValues(final ValueMapperWithKey super K, ? super V, ? extends Iterable extends VR>> mapper);
+ KStream flatMapValues(
+ final ValueMapperWithKey super K, ? super V, ? extends Iterable extends VOut>> mapper);
/**
- * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
- * with the same key in the new stream.
- * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
- * stream (value type can be altered arbitrarily).
- * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
- * for stateful value transformation).
- *
- * The example below splits input records {@code }, with key=1, containing sentences as values
- * into their words.
+ * Create a new {@code KStream} by transforming the value of each record in this stream into
+ * zero or more values with the same key in the new stream. Transform the value of each input
+ * record into zero or more records with the same (unmodified) key in the output stream (value
+ * type can be altered arbitrarily). The provided {@link ValueMapperWithKey} is applied to each
+ * input record and computes zero or more output values. Thus, an input record {@code } can
+ * be transformed into output records {@code , , ...}. This is a stateless
+ * record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier,
+ * String...)} for stateful value transformation).
+ *
+ * The example below splits input records {@code }, with key=1, containing
+ * sentences as values into their words.
* {@code
* KStream inputStream = builder.stream("topic");
* KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
@@ -656,18 +698,20 @@ KStream flatMapValues(final ValueMapper super V, ? extends Iterabl
* }
* });
* }
- * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
- * and the return value must not be {@code null}.
+ * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link
+ * java.util.Collection} type) and the return value must not be {@code null}.
*
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt
+ * partitioning. So, splitting a record into multiple records with the same key preserves data
+ * co-location with respect to the key. Thus, no internal data redistribution is
+ * required if a key based operator (like an aggregation or join) is applied to the result
+ * {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
*
* @param mapper a {@link ValueMapperWithKey} the computes the new output values
* @param named a {@link Named} config used to name the processor in the topology
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @param the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new
+ * values of different type
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
@@ -680,79 +724,85 @@ KStream flatMapValues(final ValueMapper super V, ? extends Iterabl
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
- KStream flatMapValues(final ValueMapperWithKey super K, ? super V, ? extends Iterable extends VR>> mapper,
- final Named named);
+ KStream flatMapValues(
+ final ValueMapperWithKey super K, ? super V, ? extends Iterable extends VOut>> mapper,
+ final Named named);
/**
- * Print the records of this KStream using the options provided by {@link Printed}
- * Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print.
- * It SHOULD NOT be used for production usage if performance requirements are concerned.
+ * Print the records of this KStream using the options provided by {@link Printed} Note that
+ * this is mainly for debugging/testing purposes, and it will try to flush on each record print.
+ * It SHOULD NOT be used for production usage if performance requirements are
+ * concerned.
*
* @param printed options for printing
*/
void print(final Printed printed);
/**
- * Perform an action on each record of {@code KStream}.
- * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
- * Note that this is a terminal operation that returns void.
+ * Perform an action on each record of {@code KStream}. This is a stateless record-by-record
+ * operation (cf. {@link #process(org.apache.kafka.streams.processor.ProcessorSupplier,
+ * String...)}). Note that this is a terminal operation that returns void.
*
* @param action an action to perform on each record
- * @see #process(ProcessorSupplier, String...)
+ * @see #process(org.apache.kafka.streams.processor.ProcessorSupplier, String...)
*/
void foreach(final ForeachAction super K, ? super V> action);
/**
- * Perform an action on each record of {@code KStream}.
- * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
- * Note that this is a terminal operation that returns void.
+ * Perform an action on each record of {@code KStream}. This is a stateless record-by-record
+ * operation (cf. {@link #process(org.apache.kafka.streams.processor.ProcessorSupplier,
+ * String...)}). Note that this is a terminal operation that returns void.
*
* @param action an action to perform on each record
* @param named a {@link Named} config used to name the processor in the topology
- * @see #process(ProcessorSupplier, String...)
+ * @see #process(org.apache.kafka.streams.processor.ProcessorSupplier, String...)
*/
void foreach(final ForeachAction super K, ? super V> action, final Named named);
/**
- * Perform an action on each record of {@code KStream}.
- * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+ * Perform an action on each record of {@code KStream}. This is a stateless record-by-record
+ * operation (cf. {@link #process(org.apache.kafka.streams.processor.ProcessorSupplier,
+ * String...)}).
*
- * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
- * and returns an unchanged stream.
+ * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics
+ * collection) and returns an unchanged stream.
*
- * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
+ * Note that since this operation is stateless, it may execute multiple times for a single
+ * record in failure cases.
*
* @param action an action to perform on each record
- * @see #process(ProcessorSupplier, String...)
* @return itself
+ * @see #process(org.apache.kafka.streams.processor.ProcessorSupplier, String...)
*/
KStream peek(final ForeachAction super K, ? super V> action);
/**
- * Perform an action on each record of {@code KStream}.
- * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+ * Perform an action on each record of {@code KStream}. This is a stateless record-by-record
+ * operation (cf. {@link #process(org.apache.kafka.streams.processor.ProcessorSupplier,
+ * String...)}).
*
- * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
- * and returns an unchanged stream.
+ * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics
+ * collection) and returns an unchanged stream.
*
- * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
+ * Note that since this operation is stateless, it may execute multiple times for a single
+ * record in failure cases.
*
* @param action an action to perform on each record
* @param named a {@link Named} config used to name the processor in the topology
- * @see #process(ProcessorSupplier, String...)
* @return itself
+ * @see #process(org.apache.kafka.streams.processor.ProcessorSupplier, String...)
*/
KStream peek(final ForeachAction super K, ? super V> action, final Named named);
/**
- * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
- * the supplied predicates.
- * Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
- * Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
- * The branching happens on first-match: A record in the original stream is assigned to the corresponding result
- * stream for the first predicate that evaluates to true, and is assigned to this stream only.
- * A record will be dropped if none of the predicates evaluate to true.
- * This is a stateless record-by-record operation.
+ * Creates an array of {@code KStream} from this stream by branching the records in the original
+ * stream based on the supplied predicates. Each record is evaluated against the supplied
+ * predicates, and predicates are evaluated in order. Each stream in the result array
+ * corresponds position-wise (index) to the predicate in the supplied predicates. The branching
+ * happens on first-match: A record in the original stream is assigned to the corresponding
+ * result stream for the first predicate that evaluates to true, and is assigned to this stream
+ * only. A record will be dropped if none of the predicates evaluate to true. This is a
+ * stateless record-by-record operation.
*
* @param predicates the ordered list of {@link Predicate} instances
* @return multiple distinct substreams of this {@code KStream}
@@ -763,16 +813,16 @@