Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
Objects.requireNonNull(materialized, "materialized can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");

return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
}

Expand Down Expand Up @@ -280,8 +282,12 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
Comment thread
fhussonnois marked this conversation as resolved.
Outdated

return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
}

Expand All @@ -307,8 +313,10 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");

final ConsumedInternal<K, V> consumedInternal =
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde()));

Expand Down Expand Up @@ -336,8 +344,11 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");

return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
}
Expand Down Expand Up @@ -403,6 +414,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
// always use the serdes from consumed
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,37 @@
* @param <K> type of record key
* @param <V> type of record value
*/
public class Consumed<K, V> {
public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {

protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected TimestampExtractor timestampExtractor;
protected Topology.AutoOffsetReset resetPolicy;
protected String processorName;

private Consumed(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
final Topology.AutoOffsetReset resetPolicy,
final String processorName) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.timestampExtractor = timestampExtractor;
this.resetPolicy = resetPolicy;
this.processorName = processorName;
}

/**
* Create an instance of {@link Consumed} from an existing instance.
* @param consumed the instance of {@link Consumed} to copy
*/
protected Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde, consumed.valueSerde, consumed.timestampExtractor, consumed.resetPolicy);
this(consumed.keySerde,
consumed.valueSerde,
consumed.timestampExtractor,
consumed.resetPolicy,
consumed.processorName
);
}

/**
Expand All @@ -90,7 +98,7 @@ public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null);

}

Expand All @@ -105,7 +113,7 @@ public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Consumed<>(keySerde, valueSerde, null, null);
return new Consumed<>(keySerde, valueSerde, null, null, null);
}

/**
Expand All @@ -117,7 +125,7 @@ public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
return new Consumed<>(null, null, timestampExtractor, null);
return new Consumed<>(null, null, timestampExtractor, null, null);
}

/**
Expand All @@ -129,7 +137,19 @@ public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtra
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, resetPolicy);
return new Consumed<>(null, null, null, resetPolicy, null);
}

/**
* Create an instance of {@link Consumed} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
return new Consumed<>(null, null, null, null, processorName);
}

/**
Expand Down Expand Up @@ -176,6 +196,18 @@ public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset reset
return this;
}

/**
* Configure the instance of {@link Consumed} with a processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @return this
*/
@Override
public Consumed<K, V> withName(final String processorName) {
Comment thread
fhussonnois marked this conversation as resolved.
Outdated
this.processorName = processorName;
return this;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
* @param <K> the key type
* @param <V> the value type
*/
public class Grouped<K, V> {
public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {

protected final Serde<K> keySerde;
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
protected final String name;

Expand Down Expand Up @@ -128,9 +128,10 @@ public static <K, V> Grouped<K, V> with(final Serde<K> keySerde,
* Perform the grouping operation with the name for a repartition topic if required. Note
* that Kafka Streams does not always create repartition topics for grouping operations.
*
* @param name the name used as part of the repartition topic name if required
* @param name the name used for the processor name and as part of the repartition topic name if required
* @return a new {@link Grouped} instance configured with the name
* */
@Override
public Grouped<K, V> withName(final String name) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Grouped has withName already, should it be part of #6409 instead?

return new Grouped<>(name, keySerde, valueSerde);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
Expand Down Expand Up @@ -100,7 +99,7 @@ protected Materialized(final Materialized<K, V, S> materialized) {
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
Topic.validate(storeName);
Named.validate(storeName);
return new Materialized<>(storeName);
}

Expand Down
83 changes: 83 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/kstream/Named.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.errors.TopologyException;

import java.util.Objects;

public class Named implements NamedOperation<Named> {

private static final int MAX_NAME_LENGTH = 249;

protected String name;

protected Named(final String name) {
this.name = name;
if (name != null) {
validate(name);
}
}

/**
* Create a Named instance with provided name.
*
* @param name the processor name to be used. If {@code null} a default processor name will be generated.
* @return A new {@link Named} instance configured with name
*
* @throws TopologyException if an invalid name is specified; valid characters are ASCII alphanumerics, '.', '_' and '-'.
*/
public static Named as(final String name) {
Objects.requireNonNull(name, "name can't be null");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not to align with the JavaDoc. I think passing in null should be fine.

return new Named(name);
}

@Override
public Named withName(final String name) {
return new Named(name);
}

static void validate(final String name) {
if (name.isEmpty())
throw new TopologyException("Name is illegal, it can't be empty");
if (name.equals(".") || name.equals(".."))
throw new TopologyException("Name cannot be \".\" or \"..\"");
if (name.length() > MAX_NAME_LENGTH)
throw new TopologyException("Name is illegal, it can't be longer than " + MAX_NAME_LENGTH +
" characters, name: " + name);
if (!containsValidPattern(name))
throw new TopologyException("Name \"" + name + "\" is illegal, it contains a character other than " +
"ASCII alphanumerics, '.', '_' and '-'");
}

/**
* Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-'
*/
private static boolean containsValidPattern(final String topic) {
for (int i = 0; i < topic.length(); ++i) {
final char c = topic.charAt(i);

// We don't use Character.isLetterOrDigit(c) because it's slower
final boolean validLetterOrDigit = (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z');
final boolean validChar = validLetterOrDigit || c == '.' || c == '_' || c == '-';
if (!validChar) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
* @param <V> value type
* @see KStream#print(Printed)
*/
public class Printed<K, V> {
public class Printed<K, V> implements NamedOperation<Printed<K, V>> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the KIP Printed should add a Printed.as(final String name) method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck If we add a method Printed.as then currenlty there is not method withFile or withSysOut to set the the outputstream.

I think we should not add the Printed.as method and update the KIP ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, and if I recall correctly we had similar reasons for not adding as to Suppressed.

\cc @guozhangwang @mjsax @vvcephei @ableegoldman

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

If we update the KIP, we should send a follow up email to the VOTE thread summarizing the changes (we can do this after all PRs are merged in case there is more)

protected final OutputStream outputStream;
protected String label;
protected String processorName;
protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() {
@Override
public String apply(final K key, final V value) {
Expand All @@ -53,6 +54,7 @@ protected Printed(final Printed<K, V> printed) {
this.outputStream = printed.outputStream;
this.label = printed.label;
this.mapper = printed.mapper;
this.processorName = printed.processorName;
}

/**
Expand Down Expand Up @@ -122,4 +124,16 @@ public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super
this.mapper = mapper;
return this;
}

/**
* Print the records of a {@link KStream} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
** @return this
*/
@Override
public Printed<K, V> withName(final String processorName) {
Comment thread
fhussonnois marked this conversation as resolved.
Outdated
this.processorName = processorName;
return this;
}
}
Loading