diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index 06216fc9d8c71..843606b4d79f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -53,10 +53,7 @@ public void init(final ProcessorContext context) { @Override public void process(final K key, final V value) { - final R transformedValue = valueTransformer.transform(key, value); - if (transformedValue != null) { - context.forward(key, transformedValue); - } + context.forward(key, valueTransformer.transform(key, value)); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index 4dc68e0d63115..196f71c9e0a95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.SingletonNoOpValueTransformer; import org.apache.kafka.test.StreamsTestUtils; -import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -43,7 +42,6 @@ import java.util.Properties; -import static org.easymock.EasyMock.mock; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; @@ -140,24 +138,6 @@ public void close() { } assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } - @Test - public void shouldEmitNoRecordIfTransformReturnsNull() { - final ProcessorContext context = mock(ProcessorContext.class); - final ValueTransformerWithKey valueTransformer = mock(ValueTransformerWithKey.class); - final KStreamTransformValues.KStreamTransformValuesProcessor processor = - new KStreamTransformValues.KStreamTransformValuesProcessor<>(valueTransformer); - processor.init(context); - - final Integer inputKey = 1; - final Integer inputValue = 10; - EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andStubReturn(null); - EasyMock.replay(context); - - processor.process(inputKey, inputValue); - - EasyMock.verify(context); - } - @SuppressWarnings("unchecked") @Test public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {