diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java index 72bfc99804b9f..74665bc38774e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java @@ -18,37 +18,34 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; + +import java.util.function.Supplier; /** * Allows serde access across different context types. */ public class SerdeGetter { - private final org.apache.kafka.streams.processor.ProcessorContext oldProcessorContext; - private final org.apache.kafka.streams.processor.api.ProcessorContext newProcessorContext; - private final StateStoreContext stateStorecontext; - public SerdeGetter(final org.apache.kafka.streams.processor.ProcessorContext context) { - oldProcessorContext = context; - newProcessorContext = null; - stateStorecontext = null; - } - public SerdeGetter(final org.apache.kafka.streams.processor.api.ProcessorContext context) { - oldProcessorContext = null; - newProcessorContext = context; - stateStorecontext = null; + private final Supplier> keySerdeSupplier; + private final Supplier> valueSerdeSupplier; + + public SerdeGetter(final ProcessorContext context) { + keySerdeSupplier = context::keySerde; + valueSerdeSupplier = context::valueSerde; } + public SerdeGetter(final StateStoreContext context) { - oldProcessorContext = null; - newProcessorContext = null; - stateStorecontext = context; + keySerdeSupplier = context::keySerde; + valueSerdeSupplier = context::valueSerde; } - public Serde keySerde() { - return oldProcessorContext != null ? oldProcessorContext.keySerde() : - newProcessorContext != null ? newProcessorContext.keySerde() : stateStorecontext.keySerde(); + + public Serde keySerde() { + return keySerdeSupplier.get(); } - public Serde valueSerde() { - return oldProcessorContext != null ? oldProcessorContext.valueSerde() : - newProcessorContext != null ? newProcessorContext.valueSerde() : stateStorecontext.valueSerde(); + + public Serde valueSerde() { + return valueSerdeSupplier.get(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 306d6bf9cfb8d..001f866e9632e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -58,7 +58,7 @@ public class RocksDBTimeOrderedKeyValueBuffer implements TimeOrderedKeyVal private final boolean loggingEnabled; private int partition; private String changelogTopic; - private InternalProcessorContext context; + private InternalProcessorContext context; private boolean minValid; public static class Builder implements StoreBuilder> { @@ -156,7 +156,7 @@ public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesSto @Override public void setSerdesIfNull(final SerdeGetter getter) { keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; - valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde; + valueSerde = valueSerde == null ? (Serde) getter.valueSerde() : valueSerde; } private long observedStreamTime() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java index cf44ca19bb1d7..813024fe7c13b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java @@ -20,13 +20,11 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.state.StateSerdes; - public class StoreSerdeInitializer { static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, @@ -41,19 +39,6 @@ static StateSerdes prepareStoreSerde(final StateStoreContext contex ); } - static StateSerdes prepareStoreSerde(final ProcessorContext context, - final String storeName, - final String changelogTopic, - final Serde keySerde, - final Serde valueSerde, - final PrepareFunc prepareValueSerdeFunc) { - return new StateSerdes<>( - changelogTopic, - prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true, context.taskId()), - prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false, context.taskId()) - ); - } - private static Serde prepareSerde(final PrepareFunc prepare, final String storeName, final Serde serde, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 8fbde2f78e172..c1ff46f873e80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStoreContext; @@ -59,10 +60,11 @@ public class RocksDBTimeOrderedKeyValueBufferTest { public Sensor sensor; public long offset; + @SuppressWarnings({"rawtypes", "unchecked"}) @BeforeEach public void setUp() { - when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); - when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); final Metrics metrics = new Metrics(); offset = 0; streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java index 2891825ba5207..eb85cde807cbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockInternalNewProcessorContext; @@ -62,7 +60,7 @@ public void shouldPrepareStoreSerdeForProcessorContext() { utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenReturn(valueSerde); final StateSerdes result = StoreSerdeInitializer.prepareStoreSerde( - (ProcessorContext) context, "myStore", "topic", keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); + context, "myStore", "topic", keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); assertThat(result.keySerde(), equalTo(keySerde)); assertThat(result.valueSerde(), equalTo(valueSerde)); @@ -77,7 +75,7 @@ public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, - () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); @@ -92,7 +90,7 @@ public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForProcessorContext( .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, - () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); @@ -107,7 +105,7 @@ public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForStateStoreContext() .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, - () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); @@ -122,7 +120,7 @@ public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForStateStoreContext .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(StreamsException.class, - () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); @@ -136,7 +134,7 @@ public void shouldThrowStreamsExceptionWithExplicitErrorMessageForProcessorConte utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new StreamsException("")); final Throwable exception = assertThrows(StreamsException.class, - () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); @@ -149,7 +147,7 @@ public void shouldThrowStreamsExceptionWithExplicitErrorMessageForStateStoreCont utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new StreamsException("")); final Throwable exception = assertThrows(StreamsException.class, - () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + () -> StoreSerdeInitializer.prepareStoreSerde(context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore"));