From 74d396d131dbc1bbbb4e69f3caa1c9031a6d9981 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 27 Apr 2016 08:08:51 +0100 Subject: [PATCH 1/3] fix npe in KStreamImpl.to(..) --- .../kafka/streams/kstream/internals/KStreamImplTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index b5c3d47a80b10..aa54298efe779 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -133,4 +133,11 @@ public Integer apply(Integer value1, Integer value2) { 1, // process builder.build("X", null).processors().size()); } + + @Test + public void testToWithNullValueSerdeDoesntNPE() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream inputStream = builder.stream(stringSerde, stringSerde, "input"); + inputStream.to(stringSerde,null,"output"); + } } From 07ce58942936505896675b912710a7606325ebf8 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 27 Apr 2016 08:11:15 +0100 Subject: [PATCH 2/3] fix npe in KStreamImpl.to(..) --- .../apache/kafka/streams/kstream/internals/KStreamImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index aa54298efe779..3d45d1dcc8a29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -138,6 +138,6 @@ public Integer apply(Integer value1, Integer value2) { public void testToWithNullValueSerdeDoesntNPE() { final KStreamBuilder builder = new KStreamBuilder(); final KStream inputStream = builder.stream(stringSerde, stringSerde, "input"); - inputStream.to(stringSerde,null,"output"); + inputStream.to(stringSerde, null, "output"); } } From 49d48fb522d9cd0292b4e3ea4e684cf5387989b7 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 27 Apr 2016 08:12:53 +0100 Subject: [PATCH 3/3] actually commit the fix --- .../org/apache/kafka/streams/kstream/internals/KStreamImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a84b4aa980d5b..91bcef94eb35d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -298,7 +298,7 @@ public void to(Serde keySerde, Serde valSerde, StreamPartitioner par String name = topology.newName(SINK_NAME); Serializer keySerializer = keySerde == null ? null : keySerde.serializer(); - Serializer valSerializer = keySerde == null ? null : valSerde.serializer(); + Serializer valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer;