diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a84b4aa980d5b..91bcef94eb35d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -298,7 +298,7 @@ public void to(Serde keySerde, Serde valSerde, StreamPartitioner par String name = topology.newName(SINK_NAME); Serializer keySerializer = keySerde == null ? null : keySerde.serializer(); - Serializer valSerializer = keySerde == null ? null : valSerde.serializer(); + Serializer valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index b5c3d47a80b10..3d45d1dcc8a29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -133,4 +133,11 @@ public Integer apply(Integer value1, Integer value2) { 1, // process builder.build("X", null).processors().size()); } + + @Test + public void testToWithNullValueSerdeDoesntNPE() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream inputStream = builder.stream(stringSerde, stringSerde, "input"); + inputStream.to(stringSerde, null, "output"); + } }