From ede2823b3e588b2311830ed3ccb481d7c6ef772b Mon Sep 17 00:00:00 2001 From: asutosh936 Date: Mon, 18 Feb 2019 13:01:46 -0600 Subject: [PATCH 1/2] KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. --- .../apache/kafka/streams/kstream/KGroupedStream.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 05e4ac967bcd2..0ac25b35046da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -146,7 +146,8 @@ public interface KGroupedStream { * * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be + * processed as if it was the first message. */ KTable reduce(final Reducer reducer); @@ -208,7 +209,8 @@ public interface KGroupedStream { * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be + * processed as if it was the first message. */ KTable reduce(final Reducer reducer, final Materialized> materialized); @@ -251,7 +253,8 @@ KTable reduce(final Reducer reducer, * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be + * processed as if it was the first message. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator); @@ -308,7 +311,8 @@ KTable aggregate(final Initializer initializer, * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be + * processed as if it was the first message. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator, From fcb364bbd34c625b7208b80d1d37c93db55bd535 Mon Sep 17 00:00:00 2001 From: asutosh936 Date: Wed, 20 Feb 2019 20:15:52 -0600 Subject: [PATCH 2/2] KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. Implemented review comments --- .../kafka/streams/kstream/KGroupedStream.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 0ac25b35046da..121d0a4cb9f98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -146,8 +146,9 @@ public interface KGroupedStream { * * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be - * processed as if it was the first message. + * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable reduce(final Reducer reducer); @@ -209,8 +210,9 @@ public interface KGroupedStream { * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be - * processed as if it was the first message. + * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable reduce(final Reducer reducer, final Materialized> materialized); @@ -253,8 +255,9 @@ KTable reduce(final Reducer reducer, * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be - * processed as if it was the first message. + * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator); @@ -311,8 +314,9 @@ KTable aggregate(final Initializer initializer, * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key. Incase {@link KTable} returns null the following message will be - * processed as if it was the first message. + * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator,