Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,17 +439,8 @@ private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInt
builder.addGraphNode(this.streamsGraphNode, sinkNode);
}

@Override
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
}

@Override
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
private <K1, V1> KStream<K1, V1> doFlatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
final String name = builder.newProcessorName(TRANSFORM_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
Expand All @@ -464,6 +455,20 @@ public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super
return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
}

@Override
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
}

@Override
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return doFlatTransform(transformerSupplier, stateStoreNames);
}

@Override
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Expand Down