From 9310ca82f4ca96f531cd51875ac39c5c5867bfc6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 24 Jan 2025 22:59:32 -0800 Subject: [PATCH 1/3] MINOR: cleanup KStream JavaDocs (7/N) - repartition/to/toTable --- .../apache/kafka/streams/kstream/KStream.java | 199 ++++++------------ 1 file changed, 66 insertions(+), 133 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 304ce8bb709bf..b59d417074664 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -653,185 +653,118 @@ KStream flatMapValues(final ValueMapperWithKey - * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * from the auto-generated topic. + * + *

The created topic is considered an internal topic and is meant to be used only by the current + * Kafka Streams instance. + * The topic will be named as "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, + * or to customize the name of the repartition topic, use {@link #repartition(Repartitioned)}. * - * @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * @return A {@code KStream} that contains the exact same, but repartitioned records as this {@code KStream}. */ KStream repartition(); /** - * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} - * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, - * number of partitions, and topic name part as defined by {@link Repartitioned}. - *

- * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. - * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. - * The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is either provided via {@link Repartitioned#as(String)} or an internally - * generated name, and "-repartition" is a fixed suffix. - * - * @param repartitioned the {@link Repartitioned} instance used to specify {@link Serdes}, - * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, - * part of the topic name, and number of partitions for a repartition topic. - * @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. + * See {@link #repartition()}. */ KStream repartition(final Repartitioned repartitioned); /** - * Materialize this stream to a topic using default serializers specified in the config and producer's - * default partitioning strategy. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is + * Materialize this stream to a topic. + * The topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * - * @param topic the topic name + *

To explicitly set key/value serdes or the partitioning strategy, use {@link #to(String, Produced)}. + * + * @param topic + * the output topic name + * + * @see #to(TopicNameExtractor) */ void to(final String topic); /** - * Materialize this stream to a topic using the provided {@link Produced} instance. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - * - * @param topic the topic name - * @param produced the options to use when producing to the topic + * See {@link #to(String).} */ void to(final String topic, final Produced produced); /** - * Dynamically materialize this stream to topics using default serializers specified in the config and producer's - * default partitioning strategy. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * Materialize the record of this stream to different topics. + * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name. + * All topics should be manually created before they are use (i.e., before the Kafka Streams application is started). + * + *

To explicitly set key/value serdes or the partitioning strategy, use {@link #to(TopicNameExtractor, Produced)}. * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record + * @param topicExtractor + * the extractor to determine the name of the Kafka topic to write to for each record */ void to(final TopicNameExtractor topicExtractor); /** - * Dynamically materialize this stream to topics using the provided {@link Produced} instance. - * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. - * - * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record - * @param produced the options to use when producing to the topic + * See {@link #to(TopicNameExtractor)}. */ void to(final TopicNameExtractor topicExtractor, final Produced produced); /** * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * The conversion is a logical operation and only changes the "interpretation" of the records, i.e., each record of + * this stream is a "fact/event" and is re-interpreted as a "change/update-per-key" now + * (cf. {@link KStream} vs {@link KTable}). The resulting {@link KTable} is essentially a changelog stream. + * To "upsert" the records of this stream into a materialized {@link KTable} (i.e., into a state store), + * use {@link #toTable(Materialized)}. + * + *

Note that {@code null} keys are not supported by {@code KTables} and records with {@code null} key will be dropped. + * + *

If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, + * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or {@link #process(ProcessorSupplier, String...)}) + * Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in + * Kafka and write and re-read the data via this topic such that the resulting {@link KTable} is correctly + * partitioned by its key. + * + *

This internal repartitioning topic will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of + * The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. + * Furthermore, the topic will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

Note: If the result {@link KTable} is materialized, it is not possible to apply + * {@link StreamsConfig#REUSE_KTABLE_SOURCE_TOPICS "source topic optimization"}, because + * repartition topics are considered transient and don't allow to recover the result {@link KTable} in case of * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). * - * @return a {@link KTable} that contains the same records as this {@code KStream} + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To customize the name of the repartition topic, use {@link #toTable(Named)}. + * For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code toTable()}. + * + * @return A {@link KTable} that contains the same records as this {@code KStream}. */ KTable toTable(); /** - * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). - * - * @param named a {@link Named} config used to name the processor in the topology - * @return a {@link KTable} that contains the same records as this {@code KStream} + * See {@link #toTable()}. */ KTable toTable(final Named named); - /** - * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). - * - * @param materialized an instance of {@link Materialized} used to describe how the state store of the - * resulting table should be materialized. - * @return a {@link KTable} that contains the same records as this {@code KStream} + /** + * See {@link #toTable()}. */ KTable toTable(final Materialized> materialized); /** - * Convert this stream to a {@link KTable}. - *

- * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, - * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or - * {@link #process(ProcessorSupplier, String...)}}) an internal repartitioning topic will be created in Kafka. - * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in - * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, - * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned - * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because - * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of - * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). - * - * @param named a {@link Named} config used to name the processor in the topology - * @param materialized an instance of {@link Materialized} used to describe how the state store of the - * resulting table should be materialized. - * @return a {@link KTable} that contains the same records as this {@code KStream} + * See {@link #toTable()}. */ KTable toTable(final Named named, final Materialized> materialized); From 5e28e7af3463b45f2be78eb48fd1c7059cf0b1fe Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 31 Jan 2025 15:53:30 -0800 Subject: [PATCH 2/3] spotless --- .../src/main/java/org/apache/kafka/streams/kstream/KStream.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index b59d417074664..84c3fb6cd28c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -17,14 +17,12 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.ConnectedStoreProvider; -import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; From 2142d8813bca83570425fa9b0db62071834cb4dd Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 4 Feb 2025 20:57:40 -0800 Subject: [PATCH 3/3] some github comments --- .../main/java/org/apache/kafka/streams/kstream/KStream.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 84c3fb6cd28c6..b84ea0f1a9fae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -699,12 +699,14 @@ void to(final String topic, /** * Materialize the record of this stream to different topics. * The provided {@link TopicNameExtractor} is applied to each input record to compute the output topic name. - * All topics should be manually created before they are use (i.e., before the Kafka Streams application is started). + * All topics should be manually created before they are used (i.e., before the Kafka Streams application is started). * *

To explicitly set key/value serdes or the partitioning strategy, use {@link #to(TopicNameExtractor, Produced)}. * * @param topicExtractor * the extractor to determine the name of the Kafka topic to write to for each record + * + * @see #to(String) */ void to(final TopicNameExtractor topicExtractor);