diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 9e89d7af54ae0..abb5aa1bbd95c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -229,8 +229,10 @@ public synchronized KTable table(final String topic, Objects.requireNonNull(materialized, "materialized can't be null"); final ConsumedInternal consumedInternal = new ConsumedInternal<>(consumed); materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); + final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); + return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal); } @@ -280,8 +282,12 @@ public synchronized KTable table(final String topic, Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); final ConsumedInternal consumedInternal = new ConsumedInternal<>(consumed); + final MaterializedInternal> materializedInternal = - new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-"); + new MaterializedInternal<>( + Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), + internalStreamsBuilder, topic + "-"); + return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal); } @@ -307,8 +313,10 @@ public synchronized KTable table(final String topic, final Materialized> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); + final ConsumedInternal consumedInternal = new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde())); @@ -336,8 +344,11 @@ public synchronized GlobalKTable globalTable(final String topic, Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); final ConsumedInternal consumedInternal = new ConsumedInternal<>(consumed); + final MaterializedInternal> materializedInternal = - new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-"); + new MaterializedInternal<>( + Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), + internalStreamsBuilder, topic + "-"); return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal); } @@ -403,6 +414,7 @@ public synchronized GlobalKTable globalTable(final String topic, final ConsumedInternal consumedInternal = new ConsumedInternal<>(consumed); // always use the serdes from consumed materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); + final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java index 667b621cf2295..423ca6022d865 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java @@ -50,21 +50,24 @@ * @param type of record key * @param type of record value */ -public class Consumed { +public class Consumed implements NamedOperation> { protected Serde keySerde; protected Serde valueSerde; protected TimestampExtractor timestampExtractor; protected Topology.AutoOffsetReset resetPolicy; + protected String processorName; private Consumed(final Serde keySerde, final Serde valueSerde, final TimestampExtractor timestampExtractor, - final Topology.AutoOffsetReset resetPolicy) { + final Topology.AutoOffsetReset resetPolicy, + final String processorName) { this.keySerde = keySerde; this.valueSerde = valueSerde; this.timestampExtractor = timestampExtractor; this.resetPolicy = resetPolicy; + this.processorName = processorName; } /** @@ -72,7 +75,12 @@ private Consumed(final Serde keySerde, * @param consumed the instance of {@link Consumed} to copy */ protected Consumed(final Consumed consumed) { - this(consumed.keySerde, consumed.valueSerde, consumed.timestampExtractor, consumed.resetPolicy); + this(consumed.keySerde, + consumed.valueSerde, + consumed.timestampExtractor, + consumed.resetPolicy, + consumed.processorName + ); } /** @@ -90,7 +98,7 @@ public static Consumed with(final Serde keySerde, final Serde valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset resetPolicy) { - return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy); + return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null); } @@ -105,7 +113,7 @@ public static Consumed with(final Serde keySerde, */ public static Consumed with(final Serde keySerde, final Serde valueSerde) { - return new Consumed<>(keySerde, valueSerde, null, null); + return new Consumed<>(keySerde, valueSerde, null, null, null); } /** @@ -117,7 +125,7 @@ public static Consumed with(final Serde keySerde, * @return a new instance of {@link Consumed} */ public static Consumed with(final TimestampExtractor timestampExtractor) { - return new Consumed<>(null, null, timestampExtractor, null); + return new Consumed<>(null, null, timestampExtractor, null, null); } /** @@ -129,7 +137,19 @@ public static Consumed with(final TimestampExtractor timestampExtra * @return a new instance of {@link Consumed} */ public static Consumed with(final Topology.AutoOffsetReset resetPolicy) { - return new Consumed<>(null, null, null, resetPolicy); + return new Consumed<>(null, null, null, resetPolicy, null); + } + + /** + * Create an instance of {@link Consumed} with provided processor name. + * + * @param processorName the processor name to be used. If {@code null} a default processor name will be generated + * @param key type + * @param value type + * @return a new instance of {@link Consumed} + */ + public static Consumed as(final String processorName) { + return new Consumed<>(null, null, null, null, processorName); } /** @@ -176,6 +196,18 @@ public Consumed withOffsetResetPolicy(final Topology.AutoOffsetReset reset return this; } + /** + * Configure the instance of {@link Consumed} with a processor name. + * + * @param processorName the processor name to be used. If {@code null} a default processor name will be generated + * @return this + */ + @Override + public Consumed withName(final String processorName) { + this.processorName = processorName; + return this; + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java index 3380fc8013eae..c196d933c40ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java @@ -29,9 +29,9 @@ * @param the key type * @param the value type */ -public class Grouped { +public class Grouped implements NamedOperation> { - protected final Serde keySerde; + protected final Serde keySerde; protected final Serde valueSerde; protected final String name; @@ -128,9 +128,10 @@ public static Grouped with(final Serde keySerde, * Perform the grouping operation with the name for a repartition topic if required. Note * that Kafka Streams does not always create repartition topics for grouping operations. * - * @param name the name used as part of the repartition topic name if required + * @param name the name used for the processor name and as part of the repartition topic name if required * @return a new {@link Grouped} instance configured with the name * */ + @Override public Grouped withName(final String name) { return new Grouped<>(name, keySerde, valueSerde); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index 15bdf92848101..38049329ecb76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream; -import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.internals.ApiUtils; @@ -100,7 +99,7 @@ protected Materialized(final Materialized materialized) { * @return a new {@link Materialized} instance with the given storeName */ public static Materialized as(final String storeName) { - Topic.validate(storeName); + Named.validate(storeName); return new Materialized<>(storeName); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java new file mode 100644 index 0000000000000..1db031a027955 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.errors.TopologyException; + +import java.util.Objects; + +public class Named implements NamedOperation { + + private static final int MAX_NAME_LENGTH = 249; + + protected String name; + + protected Named(final String name) { + this.name = name; + if (name != null) { + validate(name); + } + } + + /** + * Create a Named instance with provided name. + * + * @param name the processor name to be used. If {@code null} a default processor name will be generated. + * @return A new {@link Named} instance configured with name + * + * @throws TopologyException if an invalid name is specified; valid characters are ASCII alphanumerics, '.', '_' and '-'. + */ + public static Named as(final String name) { + Objects.requireNonNull(name, "name can't be null"); + return new Named(name); + } + + @Override + public Named withName(final String name) { + return new Named(name); + } + + static void validate(final String name) { + if (name.isEmpty()) + throw new TopologyException("Name is illegal, it can't be empty"); + if (name.equals(".") || name.equals("..")) + throw new TopologyException("Name cannot be \".\" or \"..\""); + if (name.length() > MAX_NAME_LENGTH) + throw new TopologyException("Name is illegal, it can't be longer than " + MAX_NAME_LENGTH + + " characters, name: " + name); + if (!containsValidPattern(name)) + throw new TopologyException("Name \"" + name + "\" is illegal, it contains a character other than " + + "ASCII alphanumerics, '.', '_' and '-'"); + } + + /** + * Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-' + */ + private static boolean containsValidPattern(final String topic) { + for (int i = 0; i < topic.length(); ++i) { + final char c = topic.charAt(i); + + // We don't use Character.isLetterOrDigit(c) because it's slower + final boolean validLetterOrDigit = (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z'); + final boolean validChar = validLetterOrDigit || c == '.' || c == '_' || c == '-'; + if (!validChar) { + return false; + } + } + return true; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java index 5002efc76d299..53c00c523035e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java @@ -31,9 +31,10 @@ * @param value type * @see KStream#print(Printed) */ -public class Printed { +public class Printed implements NamedOperation> { protected final OutputStream outputStream; protected String label; + protected String processorName; protected KeyValueMapper mapper = new KeyValueMapper() { @Override public String apply(final K key, final V value) { @@ -53,6 +54,7 @@ protected Printed(final Printed printed) { this.outputStream = printed.outputStream; this.label = printed.label; this.mapper = printed.mapper; + this.processorName = printed.processorName; } /** @@ -122,4 +124,16 @@ public Printed withKeyValueMapper(final KeyValueMapper withName(final String processorName) { + this.processorName = processorName; + return this; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java index a3d96bddb935f..6c3ed8ca8c6db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java @@ -30,24 +30,28 @@ * @param key type * @param value type */ -public class Produced { +public class Produced implements NamedOperation> { protected Serde keySerde; protected Serde valueSerde; protected StreamPartitioner partitioner; + protected String processorName; private Produced(final Serde keySerde, final Serde valueSerde, - final StreamPartitioner partitioner) { + final StreamPartitioner partitioner, + final String processorName) { this.keySerde = keySerde; this.valueSerde = valueSerde; this.partitioner = partitioner; + this.processorName = processorName; } protected Produced(final Produced produced) { this.keySerde = produced.keySerde; this.valueSerde = produced.valueSerde; this.partitioner = produced.partitioner; + this.processorName = produced.processorName; } /** @@ -62,7 +66,7 @@ protected Produced(final Produced produced) { */ public static Produced with(final Serde keySerde, final Serde valueSerde) { - return new Produced<>(keySerde, valueSerde, null); + return new Produced<>(keySerde, valueSerde, null, null); } /** @@ -82,7 +86,19 @@ public static Produced with(final Serde keySerde, public static Produced with(final Serde keySerde, final Serde valueSerde, final StreamPartitioner partitioner) { - return new Produced<>(keySerde, valueSerde, partitioner); + return new Produced<>(keySerde, valueSerde, partitioner, null); + } + + /** + * Create an instance of {@link Produced} with provided processor name. + * + * @param processorName the processor name to be used. If {@code null} a default processor name will be generated + * @param key type + * @param value type + * @return a new instance of {@link Produced} + */ + public static Produced as(final String processorName) { + return new Produced<>(null, null, null, processorName); } /** @@ -95,7 +111,7 @@ public static Produced with(final Serde keySerde, * @see KStream#to(String, Produced) */ public static Produced keySerde(final Serde keySerde) { - return new Produced<>(keySerde, null, null); + return new Produced<>(keySerde, null, null, null); } /** @@ -108,7 +124,7 @@ public static Produced keySerde(final Serde keySerde) { * @see KStream#to(String, Produced) */ public static Produced valueSerde(final Serde valueSerde) { - return new Produced<>(null, valueSerde, null); + return new Produced<>(null, valueSerde, null, null); } /** @@ -123,7 +139,7 @@ public static Produced valueSerde(final Serde valueSerde) { * @see KStream#to(String, Produced) */ public static Produced streamPartitioner(final StreamPartitioner partitioner) { - return new Produced<>(null, null, partitioner); + return new Produced<>(null, null, partitioner, null); } /** @@ -176,4 +192,10 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(keySerde, valueSerde, partitioner); } + + @Override + public Produced withName(final String name) { + this.processorName = name; + return this; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java index f33b0d8085d3b..94ceaffde08ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class ConsumedInternal extends Consumed { + public ConsumedInternal(final Consumed consumed) { super(consumed); } @@ -62,4 +63,8 @@ public TimestampExtractor timestampExtractor() { public Topology.AutoOffsetReset offsetResetPolicy() { return resetPolicy; } + + public String name() { + return processorName; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java index 8d8ebfc917e87..bc35d68590889 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; public interface InternalNameProvider { - String newProcessorName(String prefix); + String newProcessorName(final String prefix); - String newStoreName(String prefix); + String newStoreName(final String prefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index c06b988eaf4d2..060f4061a36fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -78,7 +78,8 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil public KStream stream(final Collection topics, final ConsumedInternal consumed) { - final String name = newProcessorName(KStreamImpl.SOURCE_NAME); + + final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); final StreamSourceNode streamSourceNode = new StreamSourceNode<>(name, topics, consumed); addGraphNode(root, streamSourceNode); @@ -111,8 +112,10 @@ public KStream stream(final Pattern topicPattern, public KTable table(final String topic, final ConsumedInternal consumed, final MaterializedInternal> materialized) { - final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME); - final String tableSourceName = newProcessorName(KTableImpl.SOURCE_NAME); + final String sourceName = new NamedInternal(consumed.name()) + .orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); + final String tableSourceName = new NamedInternal(consumed.name()) + .suffixWithOrElseGet("-table-source", () -> newProcessorName(KTableImpl.SOURCE_NAME)); final KTableSource tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName()); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, tableSourceName); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 41260c5a0162f..87ad4f2630312 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -168,7 +168,7 @@ public KStream filterNot(final Predicate predicate) public KStream selectKey(final KeyValueMapper mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - final ProcessorGraphNode selectKeyProcessorNode = internalSelectKey(mapper); + final ProcessorGraphNode selectKeyProcessorNode = internalSelectKey(mapper, NamedInternal.empty()); selectKeyProcessorNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode); @@ -178,9 +178,9 @@ public KStream selectKey(final KeyValueMapper ProcessorGraphNode internalSelectKey(final KeyValueMapper mapper) { - final String name = builder.newProcessorName(KEY_SELECT_NAME); - + private ProcessorGraphNode internalSelectKey(final KeyValueMapper mapper, + final NamedInternal named) { + final String name = named.orElseGenerateWithPrefix(builder, KEY_SELECT_NAME); final KStreamMap kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value)); final ProcessorParameters processorParameters = new ProcessorParameters<>(kStreamMap, name); @@ -241,8 +241,7 @@ public KStream mapValues(final ValueMapperWithKey printed) { Objects.requireNonNull(printed, "printed can't be null"); final PrintedInternal printedInternal = new PrintedInternal<>(printed); - final String name = builder.newProcessorName(PRINTING_NAME); - + final String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(builder, PRINTING_NAME); final ProcessorParameters processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name); final ProcessorGraphNode printNode = new ProcessorGraphNode<>(name, processorParameters); @@ -428,8 +427,7 @@ public void to(final TopicNameExtractor topicExtractor, final Produced topicExtractor, final ProducedInternal produced) { - final String name = builder.newProcessorName(SINK_NAME); - + final String name = new NamedInternal(produced.name()).orElseGenerateWithPrefix(builder, SINK_NAME); final StreamSinkNode sinkNode = new StreamSinkNode<>( name, topicExtractor, @@ -819,7 +817,7 @@ public KGroupedStream groupBy(final KeyValueMapper groupedInternal = new GroupedInternal<>(grouped); - final ProcessorGraphNode selectKeyMapNode = internalSelectKey(selector); + final ProcessorGraphNode selectKeyMapNode = internalSelectKey(selector, new NamedInternal(groupedInternal.name())); selectKeyMapNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java new file mode 100644 index 0000000000000..e83728e92404c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Named; +import java.util.Optional; +import java.util.function.Supplier; + +public class NamedInternal extends Named { + + public static NamedInternal empty() { + return new NamedInternal(null); + } + + public static NamedInternal with(final String name) { + return new NamedInternal(name); + } + + /** + * Creates a new {@link NamedInternal} instance. + * + * @param internal the internal name. + */ + NamedInternal(final String internal) { + super(internal); + } + + /** + * @return a string name. + */ + public String name() { + return name; + } + + @Override + public NamedInternal withName(final String name) { + return new NamedInternal(name); + } + + /** + * Check whether an internal name is defined. + * @return {@code false} if no name is set. + */ + public boolean isDefined() { + return name != null; + } + + String suffixWithOrElseGet(final String suffix, final Supplier supplier) { + final Optional suffixed = Optional.ofNullable(this.name).map(s -> s + suffix); + // Creating a new named will re-validate generated name as suffixed string could be too large. + return new NamedInternal(suffixed.orElseGet(supplier)).name(); + } + + String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) { + return orElseGet(() -> provider.newProcessorName(prefix)); + } + + /** + * Returns the internal name or the value returns from the supplier. + * + * @param supplier the supplier to be used if internal name is empty. + * @return an internal string name. + */ + private String orElseGet(final Supplier supplier) { + return Optional.ofNullable(this.name).orElseGet(supplier); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java index fe961ad4fa792..546e35321b2ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java @@ -27,4 +27,8 @@ public PrintedInternal(final Printed printed) { public ProcessorSupplier build(final String processorName) { return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName)); } + + public String name() { + return processorName; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java index 358982b9e46fc..0f0620c990961 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner; public class ProducedInternal extends Produced { + public ProducedInternal(final Produced produced) { super(produced); } @@ -36,4 +37,9 @@ public Serde valueSerde() { public StreamPartitioner streamPartitioner() { return partitioner; } + + public String name() { + return processorName; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index 1540b2df604c3..d24988dcbf204 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -106,11 +106,12 @@ public int hashCode() { @Override public String toString() { - return "SuppressedInternal{name='" + name + '\'' + - ", bufferConfig=" + bufferConfig + - ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + - ", timeDefinition=" + timeDefinition + - ", safeToDropTombstones=" + safeToDropTombstones + - '}'; + return "SuppressedInternal{" + + "name='" + name + '\'' + + ", bufferConfig=" + bufferConfig + + ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + + ", timeDefinition=" + timeDefinition + + ", safeToDropTombstones=" + safeToDropTombstones + + '}'; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b51eac8b9a697..0c91dd882c314 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -22,11 +22,15 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.test.ConsumerRecordFactory; @@ -35,11 +39,13 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Assert; import org.junit.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -51,7 +57,14 @@ public class StreamsBuilderTest { + private static final String STREAM_TOPIC = "stream-topic"; + + private static final String STREAM_TOPIC_TWO = "stream-topic-two"; + + private static final String TABLE_TOPIC = "table-topic"; + private final StreamsBuilder builder = new StreamsBuilder(); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Test @@ -65,10 +78,10 @@ public void shouldNotThrowNullPointerIfOptimizationsNotSpecified() { @Test public void shouldAllowJoinUnmaterializedFilteredKTable() { final KTable filteredKTable = builder - .table("table-topic") + .table(TABLE_TOPIC) .filter(MockPredicate.allGoodPredicate()); builder - .stream("stream-topic") + .stream(STREAM_TOPIC) .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); @@ -88,10 +101,10 @@ public void shouldAllowJoinUnmaterializedFilteredKTable() { @Test public void shouldAllowJoinMaterializedFilteredKTable() { final KTable filteredKTable = builder - .table("table-topic") + .table(TABLE_TOPIC) .filter(MockPredicate.allGoodPredicate(), Materialized.as("store")); builder - .stream("stream-topic") + .stream(STREAM_TOPIC) .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); @@ -112,10 +125,10 @@ public void shouldAllowJoinMaterializedFilteredKTable() { @Test public void shouldAllowJoinUnmaterializedMapValuedKTable() { final KTable mappedKTable = builder - .table("table-topic") + .table(TABLE_TOPIC) .mapValues(MockMapper.noOpValueMapper()); builder - .stream("stream-topic") + .stream(STREAM_TOPIC) .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); @@ -135,10 +148,10 @@ public void shouldAllowJoinUnmaterializedMapValuedKTable() { @Test public void shouldAllowJoinMaterializedMapValuedKTable() { final KTable mappedKTable = builder - .table("table-topic") + .table(TABLE_TOPIC) .mapValues(MockMapper.noOpValueMapper(), Materialized.as("store")); builder - .stream("stream-topic") + .stream(STREAM_TOPIC) .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); @@ -161,7 +174,7 @@ public void shouldAllowJoinUnmaterializedJoinedKTable() { final KTable table1 = builder.table("table-topic1"); final KTable table2 = builder.table("table-topic2"); builder - .stream("stream-topic") + .stream(STREAM_TOPIC) .join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER); builder.build(); @@ -183,7 +196,7 @@ public void shouldAllowJoinMaterializedJoinedKTable() { final KTable table1 = builder.table("table-topic1"); final KTable table2 = builder.table("table-topic2"); builder - .stream("stream-topic") + .stream(STREAM_TOPIC) .join( table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER); @@ -205,8 +218,8 @@ public void shouldAllowJoinMaterializedJoinedKTable() { @Test public void shouldAllowJoinMaterializedSourceKTable() { - final KTable table = builder.table("table-topic"); - builder.stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER); + final KTable table = builder.table(TABLE_TOPIC); + builder.stream(STREAM_TOPIC).join(table, MockValueJoiner.TOSTRING_JOINER); builder.build(); final ProcessorTopology topology = @@ -403,4 +416,72 @@ public void shouldThrowExceptionWhenTopicNamesAreNull() { builder.stream(Arrays.asList(null, null)); builder.build(); } + + @Test + public void shouldUseSpecifiedNameForStreamSourceProcessor() { + final String expected = "source-node"; + builder.stream(STREAM_TOPIC, Consumed.as(expected)); + builder.stream(STREAM_TOPIC_TWO); + builder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000000"); + } + + @Test + public void shouldUseSpecifiedNameForTableSourceProcessor() { + final String expected = "source-node"; + builder.table(STREAM_TOPIC, Consumed.as(expected)); + builder.table(STREAM_TOPIC_TWO); + builder.build(); + + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + + assertSpecifiedNameForOperation( + topology, + expected, + expected + "-table-source", + "KSTREAM-SOURCE-0000000002", + "KTABLE-SOURCE-0000000003"); + } + + @Test + public void shouldUseSpecifiedNameForGlobalTableSourceProcessor() { + final String expected = "source-processor"; + builder.globalTable(STREAM_TOPIC, Consumed.as(expected)); + builder.globalTable(STREAM_TOPIC_TWO); + builder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + + assertSpecifiedNameForStateStore( + topology.globalStateStores(), + "stream-topic-STATE-STORE-0000000000", + "stream-topic-two-STATE-STORE-0000000003" + ); + } + + @Test + public void shouldUseSpecifiedNameForSinkProcessor() { + final String expected = "sink-processor"; + final KStream stream = builder.stream(STREAM_TOPIC); + stream.to(STREAM_TOPIC_TWO, Produced.as(expected)); + stream.to(STREAM_TOPIC_TWO); + builder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000001"); + } + + private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) { + final List processors = topology.processors(); + Assert.assertEquals("Invalid number of expected processors", expected.length, processors.size()); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processors.get(i).name()); + } + } + + private void assertSpecifiedNameForStateStore(final List stores, final String... expected) { + Assert.assertEquals("Invalid number of expected state stores", expected.length, stores.size()); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], stores.get(i).name()); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java index de3e503fa0b77..8434e0f296933 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream; -import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; @@ -32,7 +32,7 @@ public void shouldAllowValidTopicNamesAsStoreName() { Materialized.as("valid_name"); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotAllowInvalidTopicNames() { Materialized.as("not:valid"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/NamedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/NamedTest.java new file mode 100644 index 0000000000000..d959ec59c693a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/NamedTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.errors.TopologyException; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.fail; + +public class NamedTest { + + @Test(expected = NullPointerException.class) + public void shouldThrowExceptionGivenNullName() { + Named.as(null); + } + + @Test + public void shouldThrowExceptionOnInvalidTopicNames() { + final char[] longString = new char[250]; + Arrays.fill(longString, 'a'); + final String[] invalidNames = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)}; + + for (final String name : invalidNames) { + try { + Named.validate(name); + fail("No exception was thrown for named with invalid name: " + name); + } catch (final TopologyException e) { + // success + } + } + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 9bdea13dfeb6e..acdbb39d205de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -18,13 +18,13 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; @@ -85,7 +85,7 @@ public void shouldNotHaveNullReducerOnReduce() { groupedStream.reduce(null); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotHaveInvalidStoreNameOnReduce() { groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)); } @@ -102,7 +102,7 @@ public void shouldNotHaveNullWindowsWithWindowedReduce() { groupedStream.windowedBy((Windows) null); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotHaveInvalidStoreNameWithWindowedReduce() { groupedStream .windowedBy(TimeWindows.of(ofMillis(10))) @@ -119,7 +119,7 @@ public void shouldNotHaveNullAdderOnAggregate() { groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotHaveInvalidStoreNameOnAggregate() { groupedStream.aggregate( MockInitializer.STRING_INIT, @@ -146,7 +146,7 @@ public void shouldNotHaveNullWindowsOnWindowedAggregate() { groupedStream.windowedBy((Windows) null); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() { groupedStream .windowedBy(TimeWindows.of(ofMillis(10))) @@ -284,7 +284,7 @@ public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() { groupedStream.windowedBy((SessionWindows) null); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() { groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) @@ -349,7 +349,7 @@ public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() { Materialized.with(Serdes.String(), Serdes.String())); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() { groupedStream .windowedBy(SessionWindows.with(ofMillis(10))) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index 09f93e7dfefae..34002aee9c39a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.DoubleSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; @@ -24,6 +23,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedTable; @@ -64,7 +64,7 @@ public void before() { .groupBy(MockMapper.selectValueKeyValueMapper()); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotAllowInvalidStoreNameOnAggregate() { groupedTable.aggregate( MockInitializer.STRING_INIT, @@ -116,7 +116,7 @@ public void shouldNotAllowNullSubtractorOnReduce() { Materialized.as("store")); } - @Test(expected = InvalidTopicException.class) + @Test(expected = TopologyException.class) public void shouldNotAllowInvalidStoreNameOnReduce() { groupedTable.reduce( MockReducer.STRING_ADDER, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java new file mode 100644 index 0000000000000..98b3a4d776176 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class NamedInternalTest { + + private static final String TEST_VALUE = "default-value"; + private static final String TEST_SUFFIX = "-suffix"; + + @Test + public void shouldSuffixNameOrReturnProviderValue() { + final String name = "foo"; + assertEquals( + name + TEST_SUFFIX, + NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE) + ); + assertEquals( + TEST_VALUE, + NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE) + ); + } + + @Test + public void shouldGenerateWithPrefixGivenEmptyName() { + final String prefix = "KSTREAM-MAP-"; + assertEquals(prefix + "PROCESSOR-NAME", NamedInternal.with(null).orElseGenerateWithPrefix( + new InternalNameProvider() { + @Override + public String newProcessorName(final String prefix) { + return prefix + "PROCESSOR-NAME"; + } + + @Override + public String newStoreName(final String prefix) { + return null; + } + }, + prefix) + ); + } + + @Test + public void shouldNotGenerateWithPrefixGivenValidName() { + final String validName = "validName"; + assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(null, "KSTREAM-MAP-") + ); + } +} \ No newline at end of file