Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class AbstractStream<K, V> {

protected final String name;
protected final Serde<K> keySerde;
protected final Serde<V> valSerde;
protected final Serde<V> valueSerde;
protected final Set<String> subTopologySourceNodes;
protected final StreamsGraphNode streamsGraphNode;
protected final InternalStreamsBuilder builder;
Expand All @@ -57,14 +57,14 @@ public AbstractStream(final AbstractStream<K, V> stream) {
this.name = stream.name;
this.builder = stream.builder;
this.keySerde = stream.keySerde;
this.valSerde = stream.valSerde;
this.valueSerde = stream.valueSerde;
this.subTopologySourceNodes = stream.subTopologySourceNodes;
this.streamsGraphNode = stream.streamsGraphNode;
}

AbstractStream(final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final Set<String> subTopologySourceNodes,
final StreamsGraphNode streamsGraphNode,
final InternalStreamsBuilder builder) {
Expand All @@ -75,7 +75,7 @@ public AbstractStream(final AbstractStream<K, V> stream) {
this.name = name;
this.builder = builder;
this.keySerde = keySerde;
this.valSerde = valSerde;
this.valueSerde = valueSerde;
this.subTopologySourceNodes = subTopologySourceNodes;
this.streamsGraphNode = streamsGraphNode;
}
Expand Down Expand Up @@ -143,6 +143,6 @@ public Serde<K> keySerde() {
}

public Serde<V> valueSerde() {
return valSerde;
return valueSerde;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valSerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows,
final SessionWindows sessionWindows,
Expand All @@ -68,7 +68,7 @@ <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<
final String repartionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();

createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valSerde);
createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);

if (!parentNodes.containsKey(repartitionReqs)) {
final StreamsGraphNode repartitionNode = repartitionNodeBuilder.build();
Expand Down Expand Up @@ -118,7 +118,7 @@ <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<
return new KTableImpl<KR, VIn, VOut>(
mergeProcessorName,
keySerde,
valSerde,
valueSerde,
Collections.singleton(mergeNode.nodeName()),
queryableName,
passThrough,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public ConsumedInternal(final Consumed<K, V> consumed) {


public ConsumedInternal(final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset offsetReset) {
this(Consumed.with(keySerde, valSerde, timestampExtractor, offsetReset));
this(Consumed.with(keySerde, valueSerde, timestampExtractor, offsetReset));
}

public ConsumedInternal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
final String queryableStoreName,
final Serde<KR> keySerde,
final Serde<VR> valSerde) {
final Serde<VR> valueSerde) {
assert queryableStoreName == null || queryableStoreName.equals(storeBuilder.name());

final String aggFunctionName = functionName.name();
Expand Down Expand Up @@ -107,7 +107,7 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,

return new KTableImpl<>(aggFunctionName,
keySerde,
valSerde,
valueSerde,
sourceName.equals(this.name) ? subTopologySourceNodes : Collections.singleton(sourceName),
queryableStoreName,
aggregateSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS

@Override
public KTable<K, V> reduce(final Reducer<V> reducer) {
return reduce(reducer, Materialized.with(keySerde, valSerde));
return reduce(reducer, Materialized.with(keySerde, valueSerde));
}

@Override
Expand All @@ -91,7 +91,7 @@ public KTable<K, V> reduce(final Reducer<V> reducer,
materializedInternal.withKeySerde(keySerde);
}
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
materializedInternal.withValueSerde(valueSerde);
}

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
Expand Down Expand Up @@ -196,7 +196,7 @@ public <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W>
subTopologySourceNodes,
name,
keySerde,
valSerde,
valueSerde,
aggregateBuilder,
streamsGraphNode
);
Expand All @@ -211,7 +211,7 @@ public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
subTopologySourceNodes,
name,
keySerde,
valSerde,
valueSerde,
aggregateBuilder,
streamsGraphNode
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final S
.withSinkName(sinkName)
.withSourceName(sourceName)
.withKeySerde(keySerde)
.withValueSerde(valSerde)
.withValueSerde(valueSerde)
.withNodeName(sourceName).build();
}

Expand All @@ -143,7 +143,7 @@ public KTable<K, V> reduce(final Reducer<V> adder,
materializedInternal.withKeySerde(keySerde);
}
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
materializedInternal.withValueSerde(valueSerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(),
Expand All @@ -155,7 +155,7 @@ public KTable<K, V> reduce(final Reducer<V> adder,
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor) {
return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
return reduce(adder, subtractor, Materialized.with(keySerde, valueSerde));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate,
return new KStreamImpl<>(
name,
keySerde,
valSerde,
valueSerde,
subTopologySourceNodes,
repartitionRequired,
filterProcessorNode,
Expand Down Expand Up @@ -193,7 +193,7 @@ public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
return new KStreamImpl<>(
name,
keySerde,
valSerde,
valueSerde,
subTopologySourceNodes,
repartitionRequired,
filterNotProcessorNode,
Expand All @@ -220,7 +220,7 @@ public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V,
return new KStreamImpl<>(
selectKeyProcessorNode.nodeName(),
null,
valSerde,
valueSerde,
subTopologySourceNodes,
true,
selectKeyProcessorNode,
Expand Down Expand Up @@ -432,7 +432,7 @@ public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action,
return new KStreamImpl<>(
name,
keySerde,
valSerde,
valueSerde,
subTopologySourceNodes,
repartitionRequired,
peekNode,
Expand Down Expand Up @@ -485,7 +485,7 @@ private KStream<K, V>[] doBranch(final NamedInternal named,
new ProcessorGraphNode<>(childNames[i], innerProcessorParameters);

builder.addGraphNode(branchNode, branchChildNode);
branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder);
branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valueSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder);
}

return branchChildren;
Expand Down Expand Up @@ -537,7 +537,7 @@ private KStream<K, V> merge(final InternalStreamsBuilder builder,
@Deprecated
@Override
public KStream<K, V> through(final String topic) {
return through(topic, Produced.with(keySerde, valSerde, null));
return through(topic, Produced.with(keySerde, valueSerde, null));
}

@Deprecated
Expand All @@ -552,7 +552,7 @@ public KStream<K, V> through(final String topic,
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
producedInternal.withValueSerde(valueSerde);
}
to(topic, producedInternal);

Expand Down Expand Up @@ -585,7 +585,7 @@ private KStream<K, V> doRepartition(final Repartitioned<K, V> repartitioned) {
final String name = repartitionedInternal.name() != null ? repartitionedInternal.name() : builder
.newProcessorName(REPARTITION_NAME);

final Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? valSerde : repartitionedInternal.valueSerde();
final Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? this.valueSerde : repartitionedInternal.valueSerde();
final Serde<K> keySerde = repartitionedInternal.keySerde() == null ? this.keySerde : repartitionedInternal.keySerde();

final UnoptimizableRepartitionNodeBuilder<K, V> unoptimizableRepartitionNodeBuilder = UnoptimizableRepartitionNode
Expand Down Expand Up @@ -622,7 +622,7 @@ private KStream<K, V> doRepartition(final Repartitioned<K, V> repartitioned) {

@Override
public void to(final String topic) {
to(topic, Produced.with(keySerde, valSerde, null));
to(topic, Produced.with(keySerde, valueSerde, null));
}

@Override
Expand All @@ -636,14 +636,14 @@ public void to(final String topic,
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
producedInternal.withValueSerde(valueSerde);
}
to(new StaticTopicNameExtractor<>(topic), producedInternal);
}

@Override
public void to(final TopicNameExtractor<K, V> topicExtractor) {
to(topicExtractor, Produced.with(keySerde, valSerde, null));
to(topicExtractor, Produced.with(keySerde, valueSerde, null));
}

@Override
Expand All @@ -657,7 +657,7 @@ public void to(final TopicNameExtractor<K, V> topicExtractor,
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
producedInternal.withValueSerde(valueSerde);
}
to(topicExtractor, producedInternal);
}
Expand All @@ -676,12 +676,12 @@ private void to(final TopicNameExtractor<K, V> topicExtractor,

@Override
public KTable<K, V> toTable() {
return toTable(NamedInternal.empty(), Materialized.with(keySerde, valSerde));
return toTable(NamedInternal.empty(), Materialized.with(keySerde, valueSerde));
}

@Override
public KTable<K, V> toTable(final Named named) {
return toTable(named, Materialized.with(keySerde, valSerde));
return toTable(named, Materialized.with(keySerde, valueSerde));
}

@Override
Expand All @@ -705,7 +705,7 @@ public KTable<K, V> toTable(final Named named,
? keySerde
: materializedInternal.keySerde();
final Serde<V> valueSerdeOverride = materializedInternal.valueSerde() == null
? valSerde
? valueSerde
: materializedInternal.valueSerde();

final Set<String> subTopologySourceNodes;
Expand Down Expand Up @@ -757,7 +757,7 @@ public KTable<K, V> toTable(final Named named,

@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector) {
return groupBy(keySelector, Grouped.with(null, valSerde));
return groupBy(keySelector, Grouped.with(null, valueSerde));
}

@Override
Expand Down Expand Up @@ -795,7 +795,7 @@ public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? supe

@Override
public KGroupedStream<K, V> groupByKey() {
return groupByKey(Grouped.with(keySerde, valSerde));
return groupByKey(Grouped.with(keySerde, valueSerde));
}

@Override
Expand Down Expand Up @@ -982,7 +982,7 @@ private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
final Serde<K> keySerdeOverride,
final Serde<V> valueSerdeOverride) {
final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde;
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
// we still need to create the repartitioned source each time
Expand Down Expand Up @@ -1013,7 +1013,7 @@ private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,

static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartitionedSource(final InternalStreamsBuilder builder,
final Serde<K1> keySerde,
final Serde<V1> valSerde,
final Serde<V1> valueSerde,
final String repartitionTopicNamePrefix,
final StreamPartitioner<K1, V1> streamPartitioner,
final BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) {
Expand Down Expand Up @@ -1048,7 +1048,7 @@ static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartition
);

baseRepartitionNodeBuilder.withKeySerde(keySerde)
.withValueSerde(valSerde)
.withValueSerde(valueSerde)
.withSourceName(sourceName)
.withRepartitionTopic(repartitionTopicName)
.withSinkName(sinkName)
Expand Down
Loading