From ccdf00c957c81272ad8c12040efb8adcd9302f0b Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Fri, 22 Feb 2019 22:41:54 +0100 Subject: [PATCH] KAFKA-6958: Overload methods for group and windowed stream to allow to name operation name using the new Named class Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) : - overload method for KGroupedStream to accept a Named parameter - overload method for KGroupedTable to accept a Named parameter - overload method for TimeWindowedKStream to accept a Named parameter - overload method for SessionWindowedKStream to accept a Named parameter --- .../kafka/streams/kstream/KGroupedStream.java | 200 ++++++++++++ .../kafka/streams/kstream/KGroupedTable.java | 307 ++++++++++++++++++ .../kstream/SessionWindowedKStream.java | 259 +++++++++++++++ .../streams/kstream/TimeWindowedKStream.java | 278 ++++++++++++++++ .../GroupedStreamAggregateBuilder.java | 4 +- .../internals/InternalStreamsBuilder.java | 26 +- .../kstream/internals/KGroupedStreamImpl.java | 46 ++- .../kstream/internals/KGroupedTableImpl.java | 77 ++++- .../internals/SessionWindowedKStreamImpl.java | 63 +++- .../internals/TimeWindowedKStreamImpl.java | 63 +++- .../kafka/streams/StreamsBuilderTest.java | 24 +- .../SessionWindowedKStreamImplTest.java | 13 +- .../TimeWindowedKStreamImplTest.java | 14 +- 13 files changed, 1320 insertions(+), 54 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 121d0a4cb9f98..f23ed05331f4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -70,6 +70,35 @@ public interface KGroupedStream { */ KTable count(); + /** + * Count the number of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view). + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named a {@link Named} config used to name the processor in the topology + * + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable count(final Named named); + /** * Count the number of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. @@ -115,6 +144,53 @@ public interface KGroupedStream { */ KTable count(final Materialized> materialized); + /** + * Count the number of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given store name in {@code materialized}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * String queryableStoreName = "storeName"; // the store name should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()} + * if there is no valueSerde provided + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable count(final Named named, + final Materialized> materialized); + /** * Combine the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. @@ -152,6 +228,68 @@ public interface KGroupedStream { */ KTable reduce(final Reducer reducer); + /** + * Combine the value of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given store name in {@code materialized}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+     * }
+ *

+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, Materialized)} can be used to compute aggregate functions like sum, min, or + * max. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // compute sum
+     * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-key";
+     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable reduce(final Reducer reducer, + final Materialized> materialized); + /** * Combine the value of records in this stream by the grouped key. @@ -208,6 +346,7 @@ public interface KGroupedStream { * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param named a {@link Named} config used to name the processor in the topology. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as @@ -215,6 +354,7 @@ public interface KGroupedStream { * will be handled as newly initialized value. */ KTable reduce(final Reducer reducer, + final Named named, final Materialized> materialized); /** @@ -314,12 +454,72 @@ KTable aggregate(final Initializer initializer, * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized); + + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried by the given store name in {@code materialized}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like + * count (c.f. {@link #count()}). + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as * deletion for the key, and future messages of the same key coming from upstream operators * will be handled as newly initialized value. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator, + final Named named, final Materialized> materialized); /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 30f348c2eaf34..eed6bbfae32d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -84,6 +84,50 @@ public interface KGroupedTable { */ KTable count(final Materialized> materialized); + /** + * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to + * the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named the {@link Named} config used to name the processor in the topology + * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null} + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable count(final Named named, final Materialized> materialized); + /** * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to * the same key into a new instance of {@link KTable}. @@ -112,6 +156,36 @@ public interface KGroupedTable { */ KTable count(); + + /** + * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to + * the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named the {@link Named} config used to name the processor in the topology + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable count(final Named named); + /** * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) * mapped} to the same key into a new instance of {@link KTable}. @@ -183,6 +257,82 @@ public interface KGroupedTable { KTable reduce(final Reducer adder, final Reducer subtractor, final Materialized> materialized); + + + /** + * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}). + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the + * current aggregate (first argument) and the record's value (second argument) by adding the new record to the + * aggregate. + * The specified {@link Reducer subtractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate (first argument) and the record's value (second + * argument) by "removing" the "replaced" record from the aggregate. + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum. + * For sum, the adder and subtractor would work as follows: + *

{@code
+     * public class SumAdder implements Reducer {
+     *   public Integer apply(Integer currentAgg, Integer newValue) {
+     *     return currentAgg + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubtractor implements Reducer {
+     *   public Integer apply(Integer currentAgg, Integer oldValue) {
+     *     return currentAgg - oldValue;
+     *   }
+     * }
+     * }
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param adder a {@link Reducer} that adds a new value to the aggregate result + * @param subtractor a {@link Reducer} that removed an old value from the aggregate result + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable reduce(final Reducer adder, + final Reducer subtractor, + final Named named, + final Materialized> materialized); + /** * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) * mapped} to the same key into a new instance of {@link KTable}. @@ -322,6 +472,92 @@ KTable aggregate(final Initializer initializer, final Aggregator subtractor, final Materialized> materialized); + + /** + * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable}. + * Records with {@code null} key are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized) combining via reduce(...)} as it, + * for example, allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the + * current aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value by adding the new record to the aggregate. + * The specified {@link Aggregator subtractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" + * record from the aggregate. + * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, Materialized)} can be used to compute aggregate functions + * like sum. + * For sum, the initializer, adder, and subtractor would work as follows: + *

{@code
+     * // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
+     * public class SumInitializer implements Initializer {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubtractor implements Aggregator {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that provides an initial aggregate result value + * @param adder an {@link Aggregator} that adds a new record to the aggregate result + * @param subtractor an {@link Aggregator} that removed an old record from the aggregate result + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null} + * @param the value type of the aggregated {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable aggregate(final Initializer initializer, + final Aggregator adder, + final Aggregator subtractor, + final Named named, + final Materialized> materialized); + /** * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers. @@ -391,4 +627,75 @@ KTable aggregate(final Initializer initializer, final Aggregator adder, final Aggregator subtractor); + + /** + * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) + * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers. + * Records with {@code null} key are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, Reducer) combining via reduce(...)} as it, + * for example, allows the result to have a different type than the input values. + * If the result value type does not match the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value + * serde} you should use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. + * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the + * current aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value by adding the new record to the aggregate. + * The specified {@link Aggregator subtractor} is applied for each "replaced" record of the original {@link KTable} + * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" + * record from the aggregate. + * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions + * like sum. + * For sum, the initializer, adder, and subtractor would work as follows: + *

{@code
+     * // in this example, LongSerde.class must be set as default value serde in StreamsConfig
+     * public class SumInitializer implements Initializer {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubtractor implements Aggregator {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer a {@link Initializer} that provides an initial aggregate result value + * @param adder a {@link Aggregator} that adds a new record to the aggregate result + * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result + * @param named a {@link Named} config used to name the processor in the topology + * @param the value type of the aggregated {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable aggregate(final Initializer initializer, + final Aggregator adder, + final Aggregator subtractor, + final Named named); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java index c5c44c6ff983c..67df8059356fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java @@ -69,6 +69,24 @@ public interface SessionWindowedKStream { */ KTable, Long> count(); + /** + * Count the number of records in this stream by the grouped key into {@link SessionWindows}. + * Records with {@code null} key or value are ignored. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * + * @param named a {@link Named} config used for the filter processor + * + * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values + * that represent the latest (rolling) count (i.e., number of records) for each key within a window + */ + KTable, Long> count(final Named named); + /** * Count the number of records in this stream by the grouped key into {@link SessionWindows}. * Records with {@code null} key or value are ignored. @@ -112,6 +130,51 @@ public interface SessionWindowedKStream { */ KTable, Long> count(final Materialized> materialized); + /** + * Count the number of records in this stream by the grouped key into {@link SessionWindows}. + * Records with {@code null} key or value are ignored. + * The result is written into a local {@link SessionStore} (which is basically an ever-updating + * materialized view) that can be queried using the name provided with {@link Materialized}. + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall} + *

+ * To query the local windowed {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.ReadOnlySessionStore);
+     * String key = "some-key";
+     * KeyValueIterator, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided + * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values + * that represent the latest (rolling) count (i.e., number of records) for each key within a window + */ + KTable, Long> count(final Named named, + final Materialized> materialized); + /** * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}. * Records with {@code null} key or value are ignored. @@ -149,6 +212,46 @@ KTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, final Merger sessionMerger); + + /** + * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via + * reduce(...)} as it, for example, allows the result to have a different type than the input values. + *

+ * The specified {@link Initializer} is applied once per session directly before the first input record is + * processed to provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap, + * they are merged into a single session and the old sessions are discarded. + * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute + * aggregate functions like count (c.f. {@link #count()}) + *

+ * The default value serde from config will be used for serializing the result. + * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * @param initializer the instance of {@link Initializer} + * @param aggregator the instance of {@link Aggregator} + * @param sessionMerger the instance of {@link Merger} + * @param named a {@link Named} config used to name the processor in the topology + * @param the value type of the resulting {@link KTable} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key within a window + */ + KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Named named); + /** * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}. * Records with {@code null} key or value are ignored. @@ -207,6 +310,67 @@ KTable, VR> aggregate(final Initializer initializer, final Merger sessionMerger, final Materialized> materialized); + + /** + * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}. + * Records with {@code null} key or value are ignored. + * The result is written into a local {@link SessionStore} (which is basically an ever-updating + * materialized view) that can be queried using the name provided with {@link Materialized}. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via + * reduce(...)} as it, for example, allows the result to have a different type than the input values. + *

+ * The specified {@link Initializer} is applied once per session directly before the first input record is + * processed to provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap, + * they are merged into a single session and the old sessions are discarded. + * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute + * aggregate functions like count (c.f. {@link #count()}) + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall} + *

+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer the instance of {@link Initializer} + * @param aggregator the instance of {@link Aggregator} + * @param sessionMerger the instance of {@link Merger} + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null} + * @param the value type of the resulting {@link KTable} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key within a window + */ + KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Named named, + final Materialized> materialized); + /** * Combine values of this stream by the grouped key into {@link SessionWindows}. * Records with {@code null} key or value are ignored. @@ -235,6 +399,99 @@ KTable, VR> aggregate(final Initializer initializer, */ KTable, V> reduce(final Reducer reducer); + + /** + * Combine values of this stream by the grouped key into {@link SessionWindows}. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). + * The result is written into a local {@link SessionStore} (which is basically an ever-updating + * materialized view). + *

+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate and the record's value. + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer)} can be used to compute aggregate functions like sum, min, + * or max. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param named a {@link Named} config used to name the processor in the topology + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key within a window + */ + KTable, V> reduce(final Reducer reducer, final Named named); + + /** + * Combine values of this stream by the grouped key into {@link SessionWindows}. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}). + * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view) + * provided by the given {@link Materialized} instance. + *

+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+     * }
+ *

+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, Materialized)} can be used to compute aggregate functions like + * sum, min, or max. + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall} + *

+ * To query the local windowed {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.ReadOnlySessionStore);
+     * String key = "some-key";
+     * KeyValueIterator, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param materializedAs an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null} + * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent + * the latest (rolling) aggregate for each key within a window + */ + KTable, V> reduce(final Reducer reducer, + final Materialized> materializedAs); + + /** * Combine values of this stream by the grouped key into {@link SessionWindows}. * Records with {@code null} key or value are ignored. @@ -290,10 +547,12 @@ KTable, VR> aggregate(final Initializer initializer, * * * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param named a {@link Named} config used to name the processor in the topology * @param materializedAs an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null} * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window */ KTable, V> reduce(final Reducer reducer, + final Named named, final Materialized> materializedAs); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java index d209b6dea6c75..57f5e2b9dc343 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java @@ -80,6 +80,32 @@ public interface TimeWindowedKStream { */ KTable, Long> count(); + /** + * Count the number of records in this stream by the grouped key and the defined windows. + * Records with {@code null} key or value are ignored. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same window and key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named a {@link Named} config used to name the processor in the topology + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable, Long> count(final Named named); + /** * Count the number of records in this stream by the grouped key and the defined windows. * Records with {@code null} key or value are ignored. @@ -126,6 +152,54 @@ public interface TimeWindowedKStream { */ KTable, Long> count(final Materialized> materialized); + + /** + * Count the number of records in this stream by the grouped key and the defined windows. + * Records with {@code null} key or value are ignored. + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall} + * + *

+ * To query the local windowed {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()} + * if there is no valueSerde provided + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable, Long> count(final Named named, final Materialized> materialized); + /** * Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. @@ -172,6 +246,115 @@ public interface TimeWindowedKStream { KTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator); + + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like + * count (c.f. {@link #count()}). + *

+ * The default value serde from config will be used for serializing the result. + * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * + * @param the value type of the resulting {@link KTable} + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param named a {@link Named} config used to name the processor in the topology + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Named named); + + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the store name as provided with {@link Materialized}. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like + * count (c.f. {@link #count()}). + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall} + * + *

+ * To query the local windowed {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized); + + /** * Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. @@ -222,6 +405,7 @@ KTable, VR> aggregate(final Initializer initializer, * * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param named a {@link Named} config used to name the processor in the topology * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the @@ -229,6 +413,7 @@ KTable, VR> aggregate(final Initializer initializer, */ KTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, + final Named named, final Materialized> materialized); /** @@ -266,6 +451,97 @@ KTable, VR> aggregate(final Initializer initializer, */ KTable, V> reduce(final Reducer reducer); + + /** + * Combine the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate and the record's value. + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param named a {@link Named} config used to name the processor in the topology + * @param reducer a {@link Reducer} that computes a new aggregate result + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable, V> reduce(final Reducer reducer, final Named named); + + /** + * Combine the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the store name as provided with {@link Materialized}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate and the record's value. + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. + *

+ * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to + * the same window and key if caching is enabled on the {@link Materialized} instance. + * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall} + *

+ * To query the local windowed {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the + * provide store name defined in {@code Materialized}, and "-changelog" is a fixed suffix. + * + * You can retrieve all generated internal topic names via {@link Topology#describe()}. + * + * @param reducer a {@link Reducer} that computes a new aggregate result + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable, V> reduce(final Reducer reducer, + final Materialized> materialized); + + /** * Combine the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. @@ -312,10 +588,12 @@ KTable, VR> aggregate(final Initializer initializer, * You can retrieve all generated internal topic names via {@link Topology#describe()}. * * @param reducer a {@link Reducer} that computes a new aggregate result + * @param named a {@link Named} config used to name the processor in the topology * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key */ KTable, V> reduce(final Reducer reducer, + final Named named, final Materialized> materialized); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index cd5155f4149c6..b77a2303d9d29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -68,7 +68,7 @@ class GroupedStreamAggregateBuilder { this.userProvidedRepartitionTopicName = groupedInternal.name(); } - KTable build(final String functionName, + KTable build(final NamedInternal functionName, final StoreBuilder storeBuilder, final KStreamAggProcessorSupplier aggregateSupplier, final String queryableStoreName, @@ -76,7 +76,7 @@ KTable build(final String functionName, final Serde valSerde) { assert queryableStoreName == null || queryableStoreName.equals(storeBuilder.name()); - final String aggFunctionName = builder.newProcessorName(functionName); + final String aggFunctionName = functionName.name(); String sourceName = this.name; StreamsGraphNode parentNode = streamsGraphNode; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 3a90fd222d484..ca43c566c1a14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -57,6 +57,8 @@ public class InternalStreamsBuilder implements InternalNameProvider { + private static final String TABLE_SOURCE_SUFFIX = "-source"; + final InternalTopologyBuilder internalTopologyBuilder; private final AtomicInteger index = new AtomicInteger(0); @@ -115,10 +117,15 @@ public KStream stream(final Pattern topicPattern, public KTable table(final String topic, final ConsumedInternal consumed, final MaterializedInternal> materialized) { - final String sourceName = new NamedInternal(consumed.name()) - .orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); - final String tableSourceName = new NamedInternal(consumed.name()) - .suffixWithOrElseGet("-table-source", this, KTableImpl.SOURCE_NAME); + + final NamedInternal named = new NamedInternal(consumed.name()); + + final String sourceName = named + .suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, KStreamImpl.SOURCE_NAME); + + final String tableSourceName = named + .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME); + final KTableSource tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName()); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, tableSourceName); @@ -150,8 +157,15 @@ public GlobalKTable globalTable(final String topic, Objects.requireNonNull(materialized, "materialized can't be null"); // explicitly disable logging for global stores materialized.withLoggingDisabled(); - final String sourceName = newProcessorName(KTableImpl.SOURCE_NAME); - final String processorName = newProcessorName(KTableImpl.SOURCE_NAME); + + final NamedInternal named = new NamedInternal(consumed.name()); + + final String sourceName = named + .suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, KStreamImpl.SOURCE_NAME); + + final String processorName = named + .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME); + // enforce store name as queryable name to always materialize global table stores final String storeName = materialized.storeName(); final KTableSource tableSource = new KTableSource<>(storeName, storeName); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 6f6521cceac54..57511056a9032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; @@ -67,8 +68,16 @@ public KTable reduce(final Reducer reducer) { @Override public KTable reduce(final Reducer reducer, final Materialized> materialized) { + return reduce(reducer, NamedInternal.empty(), materialized); + } + + @Override + public KTable reduce(final Reducer reducer, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + Objects.requireNonNull(named, "name can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); @@ -80,9 +89,10 @@ public KTable reduce(final Reducer reducer, materializedInternal.withValueSerde(valSerde); } + final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); return doAggregate( new KStreamReduce<>(materializedInternal.storeName(), reducer), - REDUCE_NAME, + name, materializedInternal ); } @@ -91,9 +101,18 @@ public KTable reduce(final Reducer reducer, public KTable aggregate(final Initializer initializer, final Aggregator aggregator, final Materialized> materialized) { + return aggregate(initializer, aggregator, NamedInternal.empty(), materialized); + } + + @Override + public KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + Objects.requireNonNull(named, "named can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); @@ -102,9 +121,10 @@ public KTable aggregate(final Initializer initializer, materializedInternal.withKeySerde(keySerde); } + final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), - AGGREGATE_NAME, + name, materializedInternal ); } @@ -117,11 +137,22 @@ public KTable aggregate(final Initializer initializer, @Override public KTable count() { - return doCount(Materialized.with(keySerde, Serdes.Long())); + return doCount(NamedInternal.empty(), Materialized.with(keySerde, Serdes.Long())); + } + + @Override + public KTable count(final Named named) { + Objects.requireNonNull(named, "named can't be null"); + return doCount(named, Materialized.with(keySerde, Serdes.Long())); } @Override public KTable count(final Materialized> materialized) { + return count(NamedInternal.empty(), materialized); + } + + @Override + public KTable count(final Named named, final Materialized> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); // TODO: remove this when we do a topology-incompatible release @@ -130,10 +161,10 @@ public KTable count(final Materialized doCount(final Materialized> materialized) { + private KTable doCount(final Named named, final Materialized> materialized) { final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); @@ -144,9 +175,10 @@ private KTable doCount(final Materialized(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), - AGGREGATE_NAME, + name, materializedInternal); } @@ -184,7 +216,7 @@ private KTable doAggregate(final KStreamAggProcessorSupplier> materializedInternal) { return aggregateBuilder.build( - functionName, + new NamedInternal(functionName), new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(), aggregateSupplier, materializedInternal.queryableStoreName(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 75c998a231b9b..e3d03a5bc2e59 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; @@ -68,12 +69,13 @@ public class KGroupedTableImpl extends AbstractStream implements KGr } private KTable doAggregate(final ProcessorSupplier> aggregateSupplier, + final NamedInternal named, final String functionName, final MaterializedInternal> materialized) { - final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME); - final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); - final String funcName = builder.newProcessorName(functionName); + final String sinkName = named.suffixWithOrElseGet("-sink", builder, KStreamImpl.SINK_NAME); + final String sourceName = named.suffixWithOrElseGet("-source", builder, KStreamImpl.SOURCE_NAME); + final String funcName = named.orElseGenerateWithPrefix(builder, functionName); final String repartitionTopic = (userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : materialized.storeName()) + KStreamImpl.REPARTITION_TOPIC_SUFFIX; @@ -122,8 +124,17 @@ private GroupedTableOperationRepartitionNode createRepartitionNode(final S public KTable reduce(final Reducer adder, final Reducer subtractor, final Materialized> materialized) { + return reduce(adder, subtractor, NamedInternal.empty(), materialized); + } + + @Override + public KTable reduce(final Reducer adder, + final Reducer subtractor, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(adder, "adder can't be null"); Objects.requireNonNull(subtractor, "subtractor can't be null"); + Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); @@ -134,10 +145,11 @@ public KTable reduce(final Reducer adder, if (materializedInternal.valueSerde() == null) { materializedInternal.withValueSerde(valSerde); } - final ProcessorSupplier> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(), - adder, - subtractor); - return doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal); + final ProcessorSupplier> aggregateSupplier = new KTableReduce<>( + materializedInternal.storeName(), + adder, + subtractor); + return doAggregate(aggregateSupplier, new NamedInternal(named), REDUCE_NAME, materializedInternal); } @Override @@ -146,8 +158,14 @@ public KTable reduce(final Reducer adder, return reduce(adder, subtractor, Materialized.with(keySerde, valSerde)); } + @Override public KTable count(final Materialized> materialized) { + return count(NamedInternal.empty(), materialized); + } + + @Override + public KTable count(final Named named, final Materialized> materialized) { final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); @@ -158,12 +176,13 @@ public KTable count(final Materialized> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(), - countInitializer, - countAdder, - countSubtractor); + final ProcessorSupplier> aggregateSupplier = new KTableAggregate<>( + materializedInternal.storeName(), + countInitializer, + countAdder, + countSubtractor); - return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal); + return doAggregate(aggregateSupplier, new NamedInternal(named), AGGREGATE_NAME, materializedInternal); } @Override @@ -171,14 +190,29 @@ public KTable count() { return count(Materialized.with(keySerde, Serdes.Long())); } + @Override + public KTable count(final Named named) { + return count(named, Materialized.with(keySerde, Serdes.Long())); + } + @Override public KTable aggregate(final Initializer initializer, final Aggregator adder, final Aggregator subtractor, final Materialized> materialized) { + return aggregate(initializer, adder, subtractor, NamedInternal.empty(), materialized); + } + + @Override + public KTable aggregate(final Initializer initializer, + final Aggregator adder, + final Aggregator subtractor, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(adder, "adder can't be null"); Objects.requireNonNull(subtractor, "subtractor can't be null"); + Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = @@ -187,11 +221,20 @@ public KTable aggregate(final Initializer initializer, if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } - final ProcessorSupplier> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(), - initializer, - adder, - subtractor); - return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal); + final ProcessorSupplier> aggregateSupplier = new KTableAggregate<>( + materializedInternal.storeName(), + initializer, + adder, + subtractor); + return doAggregate(aggregateSupplier, new NamedInternal(named), AGGREGATE_NAME, materializedInternal); + } + + @Override + public KTable aggregate(final Initializer initializer, + final Aggregator adder, + final Aggregator subtractor, + final Named named) { + return aggregate(initializer, adder, subtractor, named, Materialized.with(keySerde, null)); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 2106b1a28e3ec..ac49f9187cf76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; @@ -63,11 +64,21 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple @Override public KTable, Long> count() { - return doCount(Materialized.with(keySerde, Serdes.Long())); + return count(NamedInternal.empty()); + } + + @Override + public KTable, Long> count(final Named named) { + return doCount(named, Materialized.with(keySerde, Serdes.Long())); } @Override public KTable, Long> count(final Materialized> materialized) { + return count(NamedInternal.empty(), materialized); + } + + @Override + public KTable, Long> count(final Named named, final Materialized> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); // TODO: remove this when we do a topology-incompatible release @@ -76,12 +87,14 @@ public KTable, Long> count(final Materialized, Long> doCount(final Materialized> materialized) { + private KTable, Long> doCount(final Named named, + final Materialized> materialized) { final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } @@ -89,8 +102,9 @@ private KTable, Long> doCount(final Materialized( windows, @@ -105,13 +119,26 @@ private KTable, Long> doCount(final Materialized, V> reduce(final Reducer reducer) { - return reduce(reducer, Materialized.with(keySerde, valSerde)); + return reduce(reducer, NamedInternal.empty()); + } + + @Override + public KTable, V> reduce(final Reducer reducer, final Named named) { + return reduce(reducer, named, Materialized.with(keySerde, valSerde)); + } + + @Override + public KTable, V> reduce(final Reducer reducer, + final Materialized> materialized) { + return reduce(reducer, NamedInternal.empty(), materialized); } @Override public KTable, V> reduce(final Reducer reducer, + final Named named, final Materialized> materialized) { Objects.requireNonNull(reducer, "reducer can't be null"); + Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final Aggregator reduceAggregator = aggregatorForReducer(reducer); final MaterializedInternal> materializedInternal = @@ -123,8 +150,9 @@ public KTable, V> reduce(final Reducer reducer, materializedInternal.withValueSerde(valSerde); } + final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); return aggregateBuilder.build( - REDUCE_NAME, + new NamedInternal(reduceName), materialize(materializedInternal), new KStreamSessionWindowAggregate<>( windows, @@ -142,13 +170,30 @@ public KTable, V> reduce(final Reducer reducer, public KTable, T> aggregate(final Initializer initializer, final Aggregator aggregator, final Merger sessionMerger) { - return aggregate(initializer, aggregator, sessionMerger, Materialized.with(keySerde, null)); + return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty()); + } + + @Override + public KTable, T> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Named named) { + return aggregate(initializer, aggregator, sessionMerger, named, Materialized.with(keySerde, null)); + } + + @Override + public KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger, + final Materialized> materialized) { + return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty(), materialized); } @Override public KTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, final Merger sessionMerger, + final Named named, final Materialized> materialized) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); @@ -161,8 +206,10 @@ public KTable, VR> aggregate(final Initializer initializer, materializedInternal.withKeySerde(keySerde); } + final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + return aggregateBuilder.build( - AGGREGATE_NAME, + new NamedInternal(aggregateName), materialize(materializedInternal), new KStreamSessionWindowAggregate<>( windows, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index a257603564bf6..84316cbe9ccf4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Window; @@ -63,11 +64,22 @@ public class TimeWindowedKStreamImpl extends AbstractStr @Override public KTable, Long> count() { - return doCount(Materialized.with(keySerde, Serdes.Long())); + return count(NamedInternal.empty()); } + @Override + public KTable, Long> count(final Named named) { + return doCount(named, Materialized.with(keySerde, Serdes.Long())); + } + + @Override public KTable, Long> count(final Materialized> materialized) { + return count(NamedInternal.empty(), materialized); + } + + @Override + public KTable, Long> count(final Named named, final Materialized> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); // TODO: remove this when we do a topology-incompatible release @@ -76,10 +88,11 @@ public KTable, Long> count(final Materialized, Long> doCount(final Materialized> materialized) { + private KTable, Long> doCount(final Named named, + final Materialized> materialized) { final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); @@ -90,8 +103,9 @@ private KTable, Long> doCount(final Materialized(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), materializedInternal.queryableStoreName(), @@ -108,6 +122,22 @@ public KTable, VR> aggregate(final Initializer initializer, @Override public KTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, + final Named named) { + return aggregate(initializer, aggregator, named, Materialized.with(keySerde, null)); + } + + + @Override + public KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized) { + return aggregate(initializer, aggregator, NamedInternal.empty(), materialized); + } + + @Override + public KTable, VR> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Named named, final Materialized> materialized) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); @@ -117,8 +147,10 @@ public KTable, VR> aggregate(final Initializer initializer, if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } + + final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return aggregateBuilder.build( - AGGREGATE_NAME, + new NamedInternal(aggregateName), materialize(materializedInternal), new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator), materializedInternal.queryableStoreName(), @@ -128,12 +160,26 @@ public KTable, VR> aggregate(final Initializer initializer, @Override public KTable, V> reduce(final Reducer reducer) { - return reduce(reducer, Materialized.with(keySerde, valSerde)); + return reduce(reducer, NamedInternal.empty()); + } + + @Override + public KTable, V> reduce(final Reducer reducer, final Named named) { + return reduce(reducer, named, Materialized.with(keySerde, valSerde)); + } + + @Override + public KTable, V> reduce(final Reducer reducer, + final Materialized> materialized) { + return reduce(reducer, NamedInternal.empty(), materialized); } @Override - public KTable, V> reduce(final Reducer reducer, final Materialized> materialized) { + public KTable, V> reduce(final Reducer reducer, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(reducer, "reducer can't be null"); + Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = @@ -146,8 +192,9 @@ public KTable, V> reduce(final Reducer reducer, final Materialize materializedInternal.withValueSerde(valSerde); } + final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); return aggregateBuilder.build( - REDUCE_NAME, + new NamedInternal(reduceName), materialize(materializedInternal), new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), materializedInternal.queryableStoreName(), diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 92b471c77ad53..197e2344153dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; @@ -450,8 +451,8 @@ public void shouldUseSpecifiedNameForTableSourceProcessor() { assertSpecifiedNameForOperation( topology, + expected + "-source", expected, - expected + "-table-source", "KSTREAM-SOURCE-0000000004", "KTABLE-SOURCE-0000000005"); } @@ -738,6 +739,27 @@ public void shouldUseSpecifiedNameForToStreamWithMapper() { "KSTREAM-KEY-SELECT-0000000004"); } + @Test + public void shouldUseSpecifiedNameForAggregateOperationGivenTable() { + builder.table(STREAM_TOPIC).groupBy(KeyValue::pair, Grouped.as("group-operation")).count(Named.as(STREAM_OPERATION_NAME)); + builder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + assertSpecifiedNameForStateStore( + topology.stateStores(), + STREAM_TOPIC + "-STATE-STORE-0000000000", + "KTABLE-AGGREGATE-STATE-STORE-0000000004"); + + assertSpecifiedNameForOperation( + topology, + "KSTREAM-SOURCE-0000000001", + "KTABLE-SOURCE-0000000002", + "group-operation", + STREAM_OPERATION_NAME + "-sink", + STREAM_OPERATION_NAME + "-source", + STREAM_OPERATION_NAME); + } + + private static void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) { final List processors = topology.processors(); assertEquals("Invalid number of expected processors", expected.length, processors.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index d1e5448fad069..b693dc7640fb5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; @@ -277,14 +278,20 @@ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() { } @Test(expected = NullPointerException.class) + @SuppressWarnings("unchecked") public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() { - stream.reduce(MockReducer.STRING_ADDER, - null); + stream.reduce(MockReducer.STRING_ADDER, (Materialized) null); + } + + @Test(expected = NullPointerException.class) + @SuppressWarnings("unchecked") + public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() { + stream.reduce(MockReducer.STRING_ADDER, (Named) null); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnCountIfMaterializedIsNull() { - stream.count(null); + stream.count((Materialized>) null); } private void processData(final TopologyTestDriver driver) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 0c4685a45e3a4..0327ebd20cadb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; @@ -292,15 +293,24 @@ public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() { } @Test(expected = NullPointerException.class) + @SuppressWarnings("unchecked") public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() { windowedStream.reduce( MockReducer.STRING_ADDER, - null); + (Materialized) null); + } + + @Test(expected = NullPointerException.class) + @SuppressWarnings("unchecked") + public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() { + windowedStream.reduce( + MockReducer.STRING_ADDER, + (Named) null); } @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnCountIfMaterializedIsNull() { - windowedStream.count(null); + windowedStream.count((Materialized>) null); } private void processData(final TopologyTestDriver driver) {