diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java index 6db8ccd0b8289..614a0f567447e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java @@ -57,6 +57,16 @@ public byte[] serialize(final String topic, return null; } final byte[] rawValue = valueSerializer.serialize(topic, data); + + // Since we can't control the result of the internal serializer, we make sure that the result + // is not null as well. + // Serializing non-null values to null can be useful when working with Optional-like values + // where the Optional.empty case is serialized to null. + // See the discussion here: https://github.com/apache/kafka/pull/7679 + if (rawValue == null) { + return null; + } + final byte[] rawTimestamp = timestampSerializer.serialize(topic, timestamp); return ByteBuffer .allocate(rawTimestamp.length + rawValue.length) @@ -70,4 +80,4 @@ public void close() { valueSerializer.close(); timestampSerializer.close(); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java new file mode 100644 index 0000000000000..5fa918feb3d2c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ValueAndTimestampSerializerTest { + private static final String topic = "some-topic"; + private static final long timestamp = 23; + + private static final ValueAndTimestampSerde stringSerde = + new ValueAndTimestampSerde<>(Serdes.String()); + + @Test + public void shouldSerializeNonNullDataUsingTheInternalSerializer() { + final String value = "some-string"; + + final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(value, timestamp); + + final byte[] serialized = + stringSerde.serializer().serialize(topic, valueAndTimestamp); + + assertThat(serialized, is(notNullValue())); + + final ValueAndTimestamp deserialized = + stringSerde.deserializer().deserialize(topic, serialized); + + assertThat(deserialized, is(valueAndTimestamp)); + } + + @Test + public void shouldSerializeNullDataAsNull() { + final byte[] serialized = + stringSerde.serializer().serialize(topic, ValueAndTimestamp.make(null, timestamp)); + + assertThat(serialized, is(nullValue())); + } + + @Test + public void shouldReturnNullWhenTheInternalSerializerReturnsNull() { + // Testing against regressions with respect to https://github.com/apache/kafka/pull/7679 + + final Serializer alwaysNullSerializer = (topic, data) -> null; + + final ValueAndTimestampSerializer serializer = + new ValueAndTimestampSerializer<>(alwaysNullSerializer); + + final byte[] serialized = serializer.serialize(topic, "non-null-data", timestamp); + + assertThat(serialized, is(nullValue())); + } +}