diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0eda64ff8abca..856536cc78831 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -439,17 +439,8 @@ private void to(final TopicNameExtractor topicExtractor, final ProducedInt builder.addGraphNode(this.streamsGraphNode, sinkNode); } - @Override - public KStream transform(final TransformerSupplier> transformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); - return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames); - } - - @Override - public KStream flatTransform(final TransformerSupplier>> transformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); + private KStream doFlatTransform(final TransformerSupplier>> transformerSupplier, + final String... stateStoreNames) { final String name = builder.newProcessorName(TRANSFORM_NAME); final StatefulProcessorNode transformNode = new StatefulProcessorNode<>( name, @@ -464,6 +455,20 @@ public KStream flatTransform(final TransformerSupplier(name, null, null, sourceNodes, true, transformNode, builder); } + @Override + public KStream transform(final TransformerSupplier> transformerSupplier, + final String... stateStoreNames) { + Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); + return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames); + } + + @Override + public KStream flatTransform(final TransformerSupplier>> transformerSupplier, + final String... stateStoreNames) { + Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); + return doFlatTransform(transformerSupplier, stateStoreNames); + } + @Override public KStream transformValues(final ValueTransformerSupplier valueTransformerSupplier, final String... stateStoreNames) {