diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 51fe820a52d18..41cdec2269a44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -131,10 +131,10 @@ public VR apply(final K readOnlyKey, final V value) { static InternalValueTransformerWithKeySupplier toInternalValueTransformerSupplier(final ValueTransformerSupplier valueTransformerSupplier) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - final ValueTransformer valueTransformer = valueTransformerSupplier.get(); return new InternalValueTransformerWithKeySupplier() { @Override public InternalValueTransformerWithKey get() { + final ValueTransformer valueTransformer = valueTransformerSupplier.get(); return new InternalValueTransformerWithKey() { @Override public VR punctuate(final long timestamp) { @@ -162,10 +162,10 @@ public void close() { static InternalValueTransformerWithKeySupplier toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier) { Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null"); - final ValueTransformerWithKey valueTransformerWithKey = valueTransformerWithKeySupplier.get(); return new InternalValueTransformerWithKeySupplier() { @Override public InternalValueTransformerWithKey get() { + final ValueTransformerWithKey valueTransformerWithKey = valueTransformerWithKeySupplier.get(); return new InternalValueTransformerWithKey() { @Override public VR punctuate(final long timestamp) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index e16d8e48ad255..dcfb9ba5d789e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -30,6 +32,10 @@ import java.util.Random; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertTrue; public class AbstractStreamTest { @@ -38,6 +44,33 @@ public class AbstractStreamTest { @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); + @Test + public void testToInternlValueTransformerSupplierSuppliesNewTransformers() { + final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class); + expect(vts.get()).andReturn(null).times(3); + final InternalValueTransformerWithKeySupplier ivtwks = + AbstractStream.toInternalValueTransformerSupplier(vts); + replay(vts); + ivtwks.get(); + ivtwks.get(); + ivtwks.get(); + verify(vts); + } + + @Test + public void testToInternalValueTransformerSupplierSuppliesNewTransformers() { + final ValueTransformerWithKeySupplier vtwks = + createMock(ValueTransformerWithKeySupplier.class); + expect(vtwks.get()).andReturn(null).times(3); + final InternalValueTransformerWithKeySupplier ivtwks = + AbstractStream.toInternalValueTransformerSupplier(vtwks); + replay(vtwks); + ivtwks.get(); + ivtwks.get(); + ivtwks.get(); + verify(vtwks); + } + @Test public void testShouldBeExtensible() { final StreamsBuilder builder = new StreamsBuilder();