Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ class GroupedStreamAggregateBuilder<K, V> {
this.userProvidedRepartitionTopicName = groupedInternal.name();
}

<KR, VR> KTable<KR, VR> build(final String functionName,
<KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
final StoreBuilder<? extends StateStore> storeBuilder,
final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
final String queryableStoreName,
final Serde<KR> keySerde,
final Serde<VR> valSerde) {
assert queryableStoreName == null || queryableStoreName.equals(storeBuilder.name());

final String aggFunctionName = builder.newProcessorName(functionName);
final String aggFunctionName = functionName.name();

String sourceName = this.name;
StreamsGraphNode parentNode = streamsGraphNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@

public class InternalStreamsBuilder implements InternalNameProvider {

private static final String TABLE_SOURCE_SUFFIX = "-source";

final InternalTopologyBuilder internalTopologyBuilder;
private final AtomicInteger index = new AtomicInteger(0);

Expand Down Expand Up @@ -115,10 +117,15 @@ public <K, V> KStream<K, V> stream(final Pattern topicPattern,
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
final String sourceName = new NamedInternal(consumed.name())
.orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
final String tableSourceName = new NamedInternal(consumed.name())
.suffixWithOrElseGet("-table-source", this, KTableImpl.SOURCE_NAME);

final NamedInternal named = new NamedInternal(consumed.name());

final String sourceName = named
.suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, KStreamImpl.SOURCE_NAME);

final String tableSourceName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);

final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);

Expand Down Expand Up @@ -150,8 +157,15 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
Objects.requireNonNull(materialized, "materialized can't be null");
// explicitly disable logging for global stores
materialized.withLoggingDisabled();
final String sourceName = newProcessorName(KTableImpl.SOURCE_NAME);
final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);

final NamedInternal named = new NamedInternal(consumed.name());

final String sourceName = named
.suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, KStreamImpl.SOURCE_NAME);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this result in a different name for the source than the prior code? (Not sure if it matters...)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @fhussonnois or @bbejeck , what do you think about this?


final String processorName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);

// enforce store name as queryable name to always materialize global table stores
final String storeName = materialized.storeName();
final KTableSource<K, V> tableSource = new KTableSource<>(storeName, storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
Expand Down Expand Up @@ -67,8 +68,16 @@ public KTable<K, V> reduce(final Reducer<V> reducer) {
@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return reduce(reducer, NamedInternal.empty(), materialized);
}

@Override
public KTable<K, V> reduce(final Reducer<V> reducer,
final Named named,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you want to add a null check for this as well (like you did for aggregate)?

final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "name can't be null");

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
Expand All @@ -80,9 +89,10 @@ public KTable<K, V> reduce(final Reducer<V> reducer,
materializedInternal.withValueSerde(valSerde);
}

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
return doAggregate(
new KStreamReduce<>(materializedInternal.storeName(), reducer),
REDUCE_NAME,
name,
materializedInternal
);
}
Expand All @@ -91,9 +101,18 @@ public KTable<K, V> reduce(final Reducer<V> reducer,
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
}

@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null");

final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
Expand All @@ -102,9 +121,10 @@ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
materializedInternal.withKeySerde(keySerde);
}

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
name,
materializedInternal
);
}
Expand All @@ -117,11 +137,22 @@ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,

@Override
public KTable<K, Long> count() {
return doCount(Materialized.with(keySerde, Serdes.Long()));
return doCount(NamedInternal.empty(), Materialized.with(keySerde, Serdes.Long()));
}

@Override
public KTable<K, Long> count(final Named named) {
Objects.requireNonNull(named, "named can't be null");
return doCount(named, Materialized.with(keySerde, Serdes.Long()));
}

@Override
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return count(NamedInternal.empty(), materialized);
}

@Override
public KTable<K, Long> count(final Named named, final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");

// TODO: remove this when we do a topology-incompatible release
Expand All @@ -130,10 +161,10 @@ public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, by
builder.newStoreName(AGGREGATE_NAME);
}

return doCount(materialized);
return doCount(named, materialized);
}

private KTable<K, Long> doCount(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
private KTable<K, Long> doCount(final Named named, final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);

Expand All @@ -144,9 +175,10 @@ private KTable<K, Long> doCount(final Materialized<K, Long, KeyValueStore<Bytes,
materializedInternal.withValueSerde(Serdes.Long());
}

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
AGGREGATE_NAME,
name,
materializedInternal);
}

Expand Down Expand Up @@ -184,7 +216,7 @@ private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
return aggregateBuilder.build(
functionName,
new NamedInternal(functionName),
new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
aggregateSupplier,
materializedInternal.queryableStoreName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
Expand Down Expand Up @@ -68,12 +69,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
}

private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {

final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
final String funcName = builder.newProcessorName(functionName);
final String sinkName = named.suffixWithOrElseGet("-sink", builder, KStreamImpl.SINK_NAME);
final String sourceName = named.suffixWithOrElseGet("-source", builder, KStreamImpl.SOURCE_NAME);
final String funcName = named.orElseGenerateWithPrefix(builder, functionName);
final String repartitionTopic = (userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : materialized.storeName())
+ KStreamImpl.REPARTITION_TOPIC_SUFFIX;

Expand Down Expand Up @@ -122,8 +124,17 @@ private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final S
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return reduce(adder, subtractor, NamedInternal.empty(), materialized);
}

@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
Expand All @@ -134,10 +145,11 @@ public KTable<K, V> reduce(final Reducer<V> adder,
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(),
adder,
subtractor);
return doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal);
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(),
adder,
subtractor);
return doAggregate(aggregateSupplier, new NamedInternal(named), REDUCE_NAME, materializedInternal);
}

@Override
Expand All @@ -146,8 +158,14 @@ public KTable<K, V> reduce(final Reducer<V> adder,
return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
}


@Override
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return count(NamedInternal.empty(), materialized);
}

@Override
public KTable<K, Long> count(final Named named, final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);

Expand All @@ -158,27 +176,43 @@ public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, by
materializedInternal.withValueSerde(Serdes.Long());
}

final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
countInitializer,
countAdder,
countSubtractor);
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
countInitializer,
countAdder,
countSubtractor);

return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
return doAggregate(aggregateSupplier, new NamedInternal(named), AGGREGATE_NAME, materializedInternal);
}

@Override
public KTable<K, Long> count() {
return count(Materialized.with(keySerde, Serdes.Long()));
}

@Override
public KTable<K, Long> count(final Named named) {
return count(named, Materialized.with(keySerde, Serdes.Long()));
}

@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return aggregate(initializer, adder, subtractor, NamedInternal.empty(), materialized);
}

@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(adder, "adder can't be null");
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");

final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
Expand All @@ -187,11 +221,20 @@ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
initializer,
adder,
subtractor);
return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
initializer,
adder,
subtractor);
return doAggregate(aggregateSupplier, new NamedInternal(named), AGGREGATE_NAME, materializedInternal);
}

@Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor,
final Named named) {
return aggregate(initializer, adder, subtractor, named, Materialized.with(keySerde, null));
}

@Override
Expand Down
Loading