From 3285ed17a465ac452a7a0cbcca5d528b91075ae7 Mon Sep 17 00:00:00 2001 From: Ayoub Omari Date: Tue, 23 Apr 2024 20:24:21 +0200 Subject: [PATCH 1/5] KAFKA-16573: Specify node and store where serdes are needed --- .../apache/kafka/streams/StreamsConfig.java | 4 +- .../internals/WrappingNullableUtils.java | 5 ++- .../streams/processor/internals/SinkNode.java | 12 +++++- .../processor/internals/SourceNode.java | 12 +++++- .../state/internals/MeteredKeyValueStore.java | 34 ++++++++++++----- .../state/internals/MeteredSessionStore.java | 37 +++++++++++++------ .../MeteredVersionedKeyValueStore.java | 35 +++++++++++++----- .../state/internals/MeteredWindowStore.java | 30 +++++++++++---- 8 files changed, 123 insertions(+), 46 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 44c064f40fbd3..1d3aaceb50d8d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1856,12 +1856,12 @@ public KafkaClientSupplier getKafkaClientSupplier() { * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde * class}. * - * @return an configured instance of key Serde class + * @return a configured instance of key Serde class */ @SuppressWarnings("WeakerAccess") public Serde defaultKeySerde() { final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); - if (keySerdeConfigSetting == null) { + if (keySerdeConfigSetting == null) { throw new ConfigException("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"); } try { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java index b904608c3d1a1..9b01158984944 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java @@ -42,6 +42,7 @@ private static Deserializer prepareDeserializer(final Deserializer spe } return deserializerToUse; } + @SuppressWarnings("unchecked") private static Serializer prepareSerializer(final Serializer specificSerializer, final ProcessorContext context, final boolean isKey, final String name) { final Serializer serializerToUse; @@ -60,7 +61,7 @@ private static Serializer prepareSerializer(final Serializer specificS private static Serde prepareSerde(final Serde specificSerde, final SerdeGetter getter, final boolean isKey) { final Serde serdeToUse; if (specificSerde == null) { - serdeToUse = (Serde) (isKey ? getter.keySerde() : getter.valueSerde()); + serdeToUse = (Serde) (isKey ? getter.keySerde() : getter.valueSerde()); } else { serdeToUse = specificSerde; } @@ -93,12 +94,14 @@ public static Serde prepareKeySerde(final Serde specificSerde, final S public static Serde prepareValueSerde(final Serde specificSerde, final SerdeGetter getter) { return prepareSerde(specificSerde, getter, false); } + @SuppressWarnings({"rawtypes", "unchecked"}) public static void initNullableSerializer(final Serializer specificSerializer, final SerdeGetter getter) { if (specificSerializer instanceof WrappingNullableSerializer) { ((WrappingNullableSerializer) specificSerializer).setIfUnset(getter); } } + @SuppressWarnings({"rawtypes", "unchecked"}) public static void initNullableDeserializer(final Deserializer specificDeserializer, final SerdeGetter getter) { if (specificDeserializer instanceof WrappingNullableDeserializer) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 6f508eff2792a..b86b3fdf6b415 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Record; @@ -58,8 +60,14 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; - keySerializer = prepareKeySerializer(keySerializer, context, this.name()); - valSerializer = prepareValueSerializer(valSerializer, context, this.name()); + try { + keySerializer = prepareKeySerializer(keySerializer, context, this.name()); + valSerializer = prepareValueSerializer(valSerializer, context, this.name()); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for sink node %s", name()), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for sink node %s", name()), e); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 5d0c04b96a89f..1de37215825f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; @@ -74,8 +76,14 @@ public void init(final InternalProcessorContext context) { super.init(context); this.context = context; - keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); - valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); + try { + keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); + valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for source node %s", name()), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for source node %s", name()), e); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 57fd8cb15fe2a..303bdb4ac2c40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -23,6 +24,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; @@ -173,21 +175,33 @@ protected Serde prepareValueSerdeForStore(final Serde valueSerde, final Se protected void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) - ); + try { + serdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) + ); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) - ); + try { + serdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) + ); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index b697a2602f6de..a4608df53a213 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -16,14 +16,15 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; -import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -50,6 +51,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; +import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; public class MeteredSessionStore @@ -137,21 +140,33 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)), - WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context)) - ); + try { + serdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerde(valueSerde, new SerdeGetter(context)) + ); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)), - WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context)) - ); + try { + serdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerde(valueSerde, new SerdeGetter(context)) + ); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index eaf1e20c5fe78..708e21d82564c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -26,10 +26,13 @@ import java.time.Instant; import java.util.Map; import java.util.Objects; + +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -296,11 +299,17 @@ protected void initStoreSerde(final ProcessorContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(plainValueSerde, new SerdeGetter(context)) - ); + try { + plainValueSerdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerde(plainValueSerde, new SerdeGetter(context)) + ); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } @Override @@ -310,11 +319,17 @@ protected void initStoreSerde(final StateStoreContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(plainValueSerde, new SerdeGetter(context)) - ); + try { + plainValueSerdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerde(plainValueSerde, new SerdeGetter(context)) + ); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 12284b48d5a29..560282de9b836 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; @@ -156,19 +158,31 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context))); + try { + serdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerde(valueSerde, new SerdeGetter(context))); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context))); + try { + serdes = new StateSerdes<>( + changelogTopic, + prepareKeySerde(keySerde, new SerdeGetter(context)), + prepareValueSerde(valueSerde, new SerdeGetter(context))); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); + } } @SuppressWarnings("unchecked") From b720677d55e5f21e5500079922359cda4c23a45f Mon Sep 17 00:00:00 2001 From: Ayoub Omari Date: Sat, 4 May 2024 01:55:13 +0200 Subject: [PATCH 2/5] KAFKA-16573: add tests of serdes initialization exceptions --- .../streams/processor/internals/SinkNode.java | 11 +- .../processor/internals/SourceNode.java | 11 +- .../state/internals/MeteredKeyValueStore.java | 27 +--- .../state/internals/MeteredSessionStore.java | 29 +--- .../MeteredVersionedKeyValueStore.java | 28 +--- .../state/internals/MeteredWindowStore.java | 25 +-- .../internals/StoreSerdeInitializer.java | 67 ++++++++ .../internals/ProcessorNodeTest.java | 3 +- .../processor/internals/SinkNodeTest.java | 52 ++++++- .../processor/internals/SourceNodeTest.java | 67 +++++++- .../internals/StoreSerdeInitializerTest.java | 144 ++++++++++++++++++ 11 files changed, 351 insertions(+), 113 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index b86b3fdf6b415..9501218a96c47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -62,11 +62,18 @@ public void init(final InternalProcessorContext context) { this.context = context; try { keySerializer = prepareKeySerializer(keySerializer, context, this.name()); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e); + } + + try { valSerializer = prepareValueSerializer(valSerializer, context, this.name()); } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for sink node %s", name()), e); + throw new ConfigException(String.format("Failed to initialize value serdes for sink node %s", name())); } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for sink node %s", name()), e); + throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 1de37215825f7..456acfdefb767 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -78,11 +78,18 @@ public void init(final InternalProcessorContext context) { try { keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize key serdes for source node %s", name())); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e); + } + + try { valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for source node %s", name()), e); + throw new ConfigException(String.format("Failed to initialize value serdes for source node %s", name())); } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for source node %s", name()), e); + throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 303bdb4ac2c40..6c0615eefd1dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -24,7 +23,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; @@ -58,7 +56,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; @@ -175,33 +172,13 @@ protected Serde prepareValueSerdeForStore(final Serde valueSerde, final Se protected void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) - ); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); } protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) - ); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index a4608df53a213..5af009c123e7d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -16,13 +16,11 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.ProcessorContext; @@ -31,7 +29,6 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; -import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; @@ -51,8 +48,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; public class MeteredSessionStore @@ -140,33 +135,13 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context)) - ); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context)) - ); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index 708e21d82564c..432c9c955d91b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -18,8 +18,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; @@ -27,12 +25,10 @@ import java.util.Map; import java.util.Objects; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -299,17 +295,7 @@ protected void initStoreSerde(final ProcessorContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - plainValueSerdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(plainValueSerde, new SerdeGetter(context)) - ); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, plainValueSerde); } @Override @@ -319,17 +305,7 @@ protected void initStoreSerde(final StateStoreContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - plainValueSerdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(plainValueSerde, new SerdeGetter(context)) - ); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, plainValueSerde); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 560282de9b836..adbd03c5df524 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -16,13 +16,11 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; @@ -54,7 +52,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; @@ -158,31 +155,13 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context))); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - try { - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context))); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize serdes for store %s", storeName), e); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize serdes for store %s", storeName), e); - } + serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java new file mode 100644 index 0000000000000..9987d4b4049f2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.StateSerdes; + + +public class StoreSerdeInitializer { + static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, + final String changelogTopic, final Serde keySerde, + final Serde valueSerde) { + return new StateSerdes<>( + changelogTopic, + prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true), + prepareSerde(WrappingNullableUtils::prepareValueSerde, storeName, valueSerde, new SerdeGetter(context), false) + ); + } + + static StateSerdes prepareStoreSerde(final ProcessorContext context, final String storeName, + final String changelogTopic, final Serde keySerde, + final Serde valueSerde) { + return new StateSerdes<>( + changelogTopic, + prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true), + prepareSerde(WrappingNullableUtils::prepareValueSerde, storeName, valueSerde, new SerdeGetter(context), false) + ); + } + + private static Serde prepareSerde(final PrepareFunc prepare, final String storeName, final Serde serde, + final SerdeGetter getter, final Boolean isKey) { + + final String serdeType = isKey ? "key" : "value"; + try { + return prepare.prepareSerde(serde, getter); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize %s serdes for store %s", serdeType, storeName)); + } catch (final StreamsException e) { + throw new StreamsException(String.format("Failed to initialize %s serdes for store %s", serdeType, storeName), e); + } + } +} + +interface PrepareFunc { + Serde prepareSerde(Serde serde, SerdeGetter getter); +} + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index dfa3f9e422a75..16442ec562247 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() { final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology)); final String msg = se.getMessage(); - assertTrue("Error about class cast with serdes", msg.contains("StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); - assertTrue("Error about class cast with serdes", msg.contains("specify a key serde")); + assertTrue("Error about class cast with serdes", msg.contains("Failed to initialize key serdes for source node")); } private static class ClassCastProcessor extends ExceptionalProcessor { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 7e7f7b824b5c2..86748ee71c6e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -16,18 +16,27 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; public class SinkNodeTest { private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); @@ -40,14 +49,21 @@ public class SinkNodeTest { // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed @SuppressWarnings({"unchecked", "rawtypes"}) private final SinkNode illTypedSink = (SinkNode) sink; + private MockedStatic utilsMock; - @Before - public void before() { - sink.init(context); + @BeforeEach + public void setup() { + utilsMock = Mockito.mockStatic(WrappingNullableUtils.class); + } + + @AfterEach + public void cleanup() { + utilsMock.close(); } @Test public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { + sink.init(context); // When/Then context.setTime(-1); // ensures a negative timestamp is set for the record we send next try { @@ -58,4 +74,30 @@ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { } } + @Test + public void shouldThrowConfigExceptionOnUndefinedKeySerde() { + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> sink.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for sink node anyNodeName")); + } + + @Test + public void shouldThrowConfigExceptionOnUndefinedValueSerde() { + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> sink.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for sink node anyNodeName")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedSerde() { + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for sink node anyNodeName")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 03f22a3a917d6..ef9d14aff012a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; @@ -24,11 +25,17 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.StreamsTestUtils; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -39,9 +46,25 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; public class SourceNodeTest { + private MockedStatic utilsMock; + + @BeforeEach + public void setup() { + utilsMock = Mockito.mockStatic(WrappingNullableUtils.class); + } + + @AfterEach + public void cleanup() { + utilsMock.close(); + } + + @Test public void shouldProvideTopicHeadersAndDataToKeyDeserializer() { final SourceNode sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); @@ -106,4 +129,46 @@ public void shouldExposeProcessMetrics() { contains(sensorNamePrefix + ".s.process") ); } + + @Test + public void shouldThrowConfigExceptionOnUndefinedKeySerde() { + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); + + final SourceNode node = + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> node.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for source node TESTING_NODE")); + } + + @Test + public void shouldThrowConfigExceptionOnUndefinedValueSerde() { + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); + + final SourceNode node = + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> node.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for source node TESTING_NODE")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedSerde() { + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); + + final SourceNode node = + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for source node TESTING_NODE")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java new file mode 100644 index 0000000000000..c82e56707879e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; + +public class StoreSerdeInitializerTest { + + private MockedStatic utilsMock; + + @BeforeEach + public void setup() { + utilsMock = Mockito.mockStatic(WrappingNullableUtils.class); + } + + @AfterEach + public void cleanup() { + utilsMock.close(); + } + + @Test + public void shouldPrepareStoreSerdeForProcessorContext() { + final Serde keySerde = new Serdes.StringSerde(); + final Serde valueSerde = new Serdes.StringSerde(); + + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenReturn(keySerde); + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenReturn(valueSerde); + + final StateSerdes result = StoreSerdeInitializer.prepareStoreSerde( + (ProcessorContext) context, "myStore", "topic", keySerde, valueSerde); + + assertThat(result.keySerde(), equalTo(keySerde)); + assertThat(result.valueSerde(), equalTo(valueSerde)); + assertThat(result.topic(), equalTo("topic")); + } + + @Test + public void shouldThrowConfigExceptionOnUndefinedKeySerdeForProcessorContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( + (ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + } + + @Test + public void shouldThrowConfigExceptionOnUndefinedValueSerdeForProcessorContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())) + .thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( + (ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + } + + @Test + public void shouldThrowConfigExceptionOnUndefinedKeySerdeForStateStoreContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( + (StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + } + + @Test + public void shouldThrowConfigExceptionOnUndefinedValueSerdeForStateStoreContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new ConfigException("")); + + final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( + (StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde( + (ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForStateStoreContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde( + (StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + } +} From 95d332677a8cf1ca89a92c2d73badace1b4e1cf7 Mon Sep 17 00:00:00 2001 From: Ayoub Omari Date: Sat, 4 May 2024 11:56:12 +0200 Subject: [PATCH 3/5] KAFKA-16573: parametrize valueSerde function for TimestampedStores --- .../state/internals/MeteredKeyValueStore.java | 6 ++-- .../state/internals/MeteredSessionStore.java | 7 ++-- .../MeteredTimestampedKeyValueStore.java | 2 +- .../MeteredVersionedKeyValueStore.java | 7 ++-- .../state/internals/MeteredWindowStore.java | 6 ++-- .../internals/StoreSerdeInitializer.java | 10 +++--- .../internals/StoreSerdeInitializerTest.java | 34 +++++++++++-------- 7 files changed, 45 insertions(+), 27 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 6c0615eefd1dd..83c4f1ba99d80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -172,13 +172,15 @@ protected Serde prepareValueSerdeForStore(final Serde valueSerde, final Se protected void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 5af009c123e7d..fdf0f09b3b908 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -135,13 +136,15 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index b5c774fef3c8c..a568a1e85e4e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -86,6 +86,7 @@ public class MeteredTimestampedKeyValueStore (query, positionBound, config, store) -> runTimestampedKeyQuery(query, positionBound, config) ) ); + @SuppressWarnings("unchecked") @Override protected Serde> prepareValueSerdeForStore(final Serde> valueSerde, final SerdeGetter getter) { @@ -96,7 +97,6 @@ protected Serde> prepareValueSerdeForStore(final Serde getWithBinary(final K key) { try { return maybeMeasureLatency(() -> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index 432c9c955d91b..6b168d9cd0e4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -295,7 +296,8 @@ protected void initStoreSerde(final ProcessorContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, plainValueSerde); + plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde); } @Override @@ -305,7 +307,8 @@ protected void initStoreSerde(final StateStoreContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, plainValueSerde); + plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index adbd03c5df524..149083d45710f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -155,13 +155,15 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerde); } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = StoreSerdeInitializer.prepareStoreSerde(context, storeName, changelogTopic, keySerde, valueSerde); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java index 9987d4b4049f2..93c3f2b39ccd1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java @@ -29,21 +29,23 @@ public class StoreSerdeInitializer { static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, final String changelogTopic, final Serde keySerde, - final Serde valueSerde) { + final Serde valueSerde, + final PrepareFunc prepareValueSerdeFunc) { return new StateSerdes<>( changelogTopic, prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true), - prepareSerde(WrappingNullableUtils::prepareValueSerde, storeName, valueSerde, new SerdeGetter(context), false) + prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false) ); } static StateSerdes prepareStoreSerde(final ProcessorContext context, final String storeName, final String changelogTopic, final Serde keySerde, - final Serde valueSerde) { + final Serde valueSerde, + final PrepareFunc prepareValueSerdeFunc) { return new StateSerdes<>( changelogTopic, prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true), - prepareSerde(WrappingNullableUtils::prepareValueSerde, storeName, valueSerde, new SerdeGetter(context), false) + prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java index c82e56707879e..9e9d2309dc648 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java @@ -31,10 +31,10 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; public class StoreSerdeInitializerTest { @@ -62,7 +62,7 @@ public void shouldPrepareStoreSerdeForProcessorContext() { utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenReturn(valueSerde); final StateSerdes result = StoreSerdeInitializer.prepareStoreSerde( - (ProcessorContext) context, "myStore", "topic", keySerde, valueSerde); + (ProcessorContext) context, "myStore", "topic", keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); assertThat(result.keySerde(), equalTo(keySerde)); assertThat(result.valueSerde(), equalTo(valueSerde)); @@ -75,8 +75,9 @@ public void shouldThrowConfigExceptionOnUndefinedKeySerdeForProcessorContext() { utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new ConfigException("")); - final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( - (ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + final Throwable exception = assertThrows(ConfigException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); } @@ -88,8 +89,9 @@ public void shouldThrowConfigExceptionOnUndefinedValueSerdeForProcessorContext() utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())) .thenThrow(new ConfigException("")); - final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( - (ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + final Throwable exception = assertThrows(ConfigException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); } @@ -100,8 +102,9 @@ public void shouldThrowConfigExceptionOnUndefinedKeySerdeForStateStoreContext() utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new ConfigException("")); - final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( - (StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + final Throwable exception = assertThrows(ConfigException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); } @@ -112,8 +115,9 @@ public void shouldThrowConfigExceptionOnUndefinedValueSerdeForStateStoreContext( utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new ConfigException("")); - final Throwable exception = assertThrows(ConfigException.class, () -> StoreSerdeInitializer.prepareStoreSerde( - (StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + final Throwable exception = assertThrows(ConfigException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); } @@ -124,8 +128,9 @@ public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new StreamsException("")); - final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde( - (ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); } @@ -136,8 +141,9 @@ public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForStateStoreContext utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new StreamsException("")); - final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde( - (StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde())); + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); } From e7dadd618edee0e5ddaa2645992746dca7c94562 Mon Sep 17 00:00:00 2001 From: Ayoub Omari Date: Fri, 10 May 2024 11:05:08 +0200 Subject: [PATCH 4/5] KAFKA-16573: review improvements --- .../internals/WrappingNullableUtils.java | 8 ++------ .../streams/processor/internals/SinkNode.java | 4 ++-- .../processor/internals/SourceNode.java | 4 ++-- .../internals/StoreSerdeInitializer.java | 19 +++++++++++++------ .../internals/ProcessorNodeTest.java | 3 ++- .../processor/internals/SinkNodeTest.java | 16 ++++++++++++---- .../processor/internals/SourceNodeTest.java | 16 ++++++++++++---- 7 files changed, 45 insertions(+), 25 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java index 9b01158984944..5dff888e621f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java @@ -33,9 +33,7 @@ private static Deserializer prepareDeserializer(final Deserializer spe final Deserializer deserializerToUse; if (specificDeserializer == null) { - final Deserializer contextKeyDeserializer = context.keySerde().deserializer(); - final Deserializer contextValueDeserializer = context.valueSerde().deserializer(); - deserializerToUse = (Deserializer) (isKey ? contextKeyDeserializer : contextValueDeserializer); + deserializerToUse = (Deserializer) (isKey ? context.keySerde().deserializer() : context.valueSerde().deserializer()); } else { deserializerToUse = specificDeserializer; initNullableDeserializer(deserializerToUse, new SerdeGetter(context)); @@ -47,9 +45,7 @@ private static Deserializer prepareDeserializer(final Deserializer spe private static Serializer prepareSerializer(final Serializer specificSerializer, final ProcessorContext context, final boolean isKey, final String name) { final Serializer serializerToUse; if (specificSerializer == null) { - final Serializer contextKeySerializer = context.keySerde().serializer(); - final Serializer contextValueSerializer = context.valueSerde().serializer(); - serializerToUse = (Serializer) (isKey ? contextKeySerializer : contextValueSerializer); + serializerToUse = (Serializer) (isKey ? context.keySerde().serializer() : context.valueSerde().serializer()); } else { serializerToUse = specificSerializer; initNullableSerializer(serializerToUse, new SerdeGetter(context)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 9501218a96c47..f44af68683668 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -63,7 +63,7 @@ public void init(final InternalProcessorContext context) { try { keySerializer = prepareKeySerializer(keySerializer, context, this.name()); } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); + throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s. %s", name(), e.getMessage())); } catch (final StreamsException e) { throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e); } @@ -71,7 +71,7 @@ public void init(final InternalProcessorContext context) { try { valSerializer = prepareValueSerializer(valSerializer, context, this.name()); } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize value serdes for sink node %s", name())); + throw new ConfigException(String.format("Failed to initialize value serdes for sink node %s. %s", name(), e.getMessage())); } catch (final StreamsException e) { throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 456acfdefb767..53a1caec11643 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -79,7 +79,7 @@ public void init(final InternalProcessorContext context) { try { keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize key serdes for source node %s", name())); + throw new ConfigException(String.format("Failed to initialize key serdes for source node %s. %s", name(), e.getMessage())); } catch (final StreamsException e) { throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e); } @@ -87,7 +87,7 @@ public void init(final InternalProcessorContext context) { try { valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize value serdes for source node %s", name())); + throw new ConfigException(String.format("Failed to initialize value serdes for source node %s. %s", name(), e.getMessage())); } catch (final StreamsException e) { throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java index 93c3f2b39ccd1..f22bf6f49bcc4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java @@ -27,8 +27,10 @@ public class StoreSerdeInitializer { - static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, - final String changelogTopic, final Serde keySerde, + static StateSerdes prepareStoreSerde(final StateStoreContext context, + final String storeName, + final String changelogTopic, + final Serde keySerde, final Serde valueSerde, final PrepareFunc prepareValueSerdeFunc) { return new StateSerdes<>( @@ -38,8 +40,10 @@ static StateSerdes prepareStoreSerde(final StateStoreContext contex ); } - static StateSerdes prepareStoreSerde(final ProcessorContext context, final String storeName, - final String changelogTopic, final Serde keySerde, + static StateSerdes prepareStoreSerde(final ProcessorContext context, + final String storeName, + final String changelogTopic, + final Serde keySerde, final Serde valueSerde, final PrepareFunc prepareValueSerdeFunc) { return new StateSerdes<>( @@ -49,8 +53,11 @@ static StateSerdes prepareStoreSerde(final ProcessorContext context ); } - private static Serde prepareSerde(final PrepareFunc prepare, final String storeName, final Serde serde, - final SerdeGetter getter, final Boolean isKey) { + private static Serde prepareSerde(final PrepareFunc prepare, + final String storeName, + final Serde serde, + final SerdeGetter getter, + final Boolean isKey) { final String serdeType = isKey ? "key" : "value"; try { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 16442ec562247..b4e811dc561c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -156,7 +156,8 @@ public void testTopologyLevelConfigException() { final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology)); final String msg = se.getMessage(); - assertTrue("Error about class cast with serdes", msg.contains("Failed to initialize key serdes for source node")); + assertThat(msg, containsString("Failed to initialize key serdes for source node")); + assertThat(msg, containsString("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); } private static class ClassCastProcessor extends ExceptionalProcessor { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 86748ee71c6e0..fdc97e086da97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -76,20 +76,28 @@ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { @Test public void shouldThrowConfigExceptionOnUndefinedKeySerde() { - utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(ConfigException.class, () -> sink.init(context)); - assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for sink node anyNodeName")); + assertThat( + exception.getMessage(), + equalTo("Failed to initialize key serdes for sink node anyNodeName. Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") + ); } @Test public void shouldThrowConfigExceptionOnUndefinedValueSerde() { - utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(ConfigException.class, () -> sink.init(context)); - assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for sink node anyNodeName")); + assertThat( + exception.getMessage(), + equalTo("Failed to initialize value serdes for sink node anyNodeName. Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") + ); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index ef9d14aff012a..feeda48f138be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -137,11 +137,15 @@ public void shouldThrowConfigExceptionOnUndefinedKeySerde() { final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); - utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(ConfigException.class, () -> node.init(context)); - assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for source node TESTING_NODE")); + assertThat( + exception.getMessage(), + equalTo("Failed to initialize key serdes for source node TESTING_NODE. Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") + ); } @Test @@ -151,11 +155,15 @@ public void shouldThrowConfigExceptionOnUndefinedValueSerde() { final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); - utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); final Throwable exception = assertThrows(ConfigException.class, () -> node.init(context)); - assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for source node TESTING_NODE")); + assertThat( + exception.getMessage(), + equalTo("Failed to initialize value serdes for source node TESTING_NODE. Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") + ); } @Test From a8e320fe4d538cf54191abc9ae5084c654ac1cc1 Mon Sep 17 00:00:00 2001 From: Ayoub Omari Date: Fri, 31 May 2024 21:02:55 +0200 Subject: [PATCH 5/5] KAFKA-16573: catch Config & Streams exceptions into StreamsException --- .../streams/processor/internals/SinkNode.java | 12 +++---- .../processor/internals/SourceNode.java | 12 +++---- .../internals/StoreSerdeInitializer.java | 18 +++++----- .../internals/ProcessorNodeTest.java | 8 ++--- .../processor/internals/SinkNodeTest.java | 22 ++++++++---- .../processor/internals/SourceNodeTest.java | 22 ++++++++---- .../internals/StoreSerdeInitializerTest.java | 35 +++++++++++-------- 7 files changed, 71 insertions(+), 58 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index f44af68683668..6e79616d30a9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -62,18 +62,14 @@ public void init(final InternalProcessorContext context) { this.context = context; try { keySerializer = prepareKeySerializer(keySerializer, context, this.name()); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s. %s", name(), e.getMessage())); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e); + } catch (ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e, context.taskId()); } try { valSerializer = prepareValueSerializer(valSerializer, context, this.name()); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize value serdes for sink node %s. %s", name(), e.getMessage())); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e, context.taskId()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 53a1caec11643..2f53840acc94f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -78,18 +78,14 @@ public void init(final InternalProcessorContext context) { try { keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize key serdes for source node %s. %s", name(), e.getMessage())); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e, context.taskId()); } try { valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize value serdes for source node %s. %s", name(), e.getMessage())); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e, context.taskId()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java index f22bf6f49bcc4..1a9aa02f3c555 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.state.StateSerdes; @@ -35,8 +36,8 @@ static StateSerdes prepareStoreSerde(final StateStoreContext contex final PrepareFunc prepareValueSerdeFunc) { return new StateSerdes<>( changelogTopic, - prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true), - prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false) + prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true, context.taskId()), + prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false, context.taskId()) ); } @@ -48,8 +49,8 @@ static StateSerdes prepareStoreSerde(final ProcessorContext context final PrepareFunc prepareValueSerdeFunc) { return new StateSerdes<>( changelogTopic, - prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true), - prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false) + prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true, context.taskId()), + prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false, context.taskId()) ); } @@ -57,15 +58,14 @@ private static Serde prepareSerde(final PrepareFunc prepare, final String storeName, final Serde serde, final SerdeGetter getter, - final Boolean isKey) { + final Boolean isKey, + final TaskId taskId) { final String serdeType = isKey ? "key" : "value"; try { return prepare.prepareSerde(serde, getter); - } catch (final ConfigException e) { - throw new ConfigException(String.format("Failed to initialize %s serdes for store %s", serdeType, storeName)); - } catch (final StreamsException e) { - throw new StreamsException(String.format("Failed to initialize %s serdes for store %s", serdeType, storeName), e); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize %s serdes for store %s", serdeType, storeName), e, taskId); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index b4e811dc561c1..fdd5214385ad1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; @@ -154,10 +153,9 @@ public void testTopologyLevelConfigException() { .flatMapValues(value -> Collections.singletonList("")); final Topology topology = builder.build(); - final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology)); - final String msg = se.getMessage(); - assertThat(msg, containsString("Failed to initialize key serdes for source node")); - assertThat(msg, containsString("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + final StreamsException se = assertThrows(StreamsException.class, () -> new TopologyTestDriver(topology)); + assertThat(se.getMessage(), containsString("Failed to initialize key serdes for source node")); + assertThat(se.getCause().getMessage(), containsString("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); } private static class ClassCastProcessor extends ExceptionalProcessor { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index fdc97e086da97..805f2fd5db46e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -75,33 +75,41 @@ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { } @Test - public void shouldThrowConfigExceptionOnUndefinedKeySerde() { + public void shouldThrowStreamsExceptionOnUndefinedKeySerde() { utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, () -> sink.init(context)); + final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); assertThat( exception.getMessage(), - equalTo("Failed to initialize key serdes for sink node anyNodeName. Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") + equalTo("Failed to initialize key serdes for sink node anyNodeName") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") ); } @Test - public void shouldThrowConfigExceptionOnUndefinedValueSerde() { + public void shouldThrowStreamsExceptionOnUndefinedValueSerde() { utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, () -> sink.init(context)); + final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); assertThat( exception.getMessage(), - equalTo("Failed to initialize value serdes for sink node anyNodeName. Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") + equalTo("Failed to initialize value serdes for sink node anyNodeName") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") ); } @Test - public void shouldThrowStreamsExceptionOnUndefinedSerde() { + public void shouldThrowStreamsExceptionWithExplicitErrorMessage() { utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new StreamsException("")); final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index feeda48f138be..f048f9948dece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -131,7 +131,7 @@ public void shouldExposeProcessMetrics() { } @Test - public void shouldThrowConfigExceptionOnUndefinedKeySerde() { + public void shouldThrowStreamsExceptionOnUndefinedKeySerde() { final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); final SourceNode node = @@ -140,16 +140,20 @@ public void shouldThrowConfigExceptionOnUndefinedKeySerde() { utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, () -> node.init(context)); + final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); assertThat( exception.getMessage(), - equalTo("Failed to initialize key serdes for source node TESTING_NODE. Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") + equalTo("Failed to initialize key serdes for source node TESTING_NODE") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") ); } @Test - public void shouldThrowConfigExceptionOnUndefinedValueSerde() { + public void shouldThrowStreamsExceptionOnUndefinedValueSerde() { final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); final SourceNode node = @@ -158,16 +162,20 @@ public void shouldThrowConfigExceptionOnUndefinedValueSerde() { utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any())) .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, () -> node.init(context)); + final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); assertThat( exception.getMessage(), - equalTo("Failed to initialize value serdes for source node TESTING_NODE. Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") + equalTo("Failed to initialize value serdes for source node TESTING_NODE") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") ); } @Test - public void shouldThrowStreamsExceptionOnUndefinedSerde() { + public void shouldThrowStreamsExceptionWithExplicitErrorMessage() { final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); final SourceNode node = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java index 9e9d2309dc648..2a692f278e34c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java @@ -70,60 +70,67 @@ public void shouldPrepareStoreSerdeForProcessorContext() { } @Test - public void shouldThrowConfigExceptionOnUndefinedKeySerdeForProcessorContext() { + public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() { final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); - utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, + final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); } @Test - public void shouldThrowConfigExceptionOnUndefinedValueSerdeForProcessorContext() { + public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForProcessorContext() { final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())) - .thenThrow(new ConfigException("")); + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, + final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); } @Test - public void shouldThrowConfigExceptionOnUndefinedKeySerdeForStateStoreContext() { + public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForStateStoreContext() { final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); - utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, + final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); } @Test - public void shouldThrowConfigExceptionOnUndefinedValueSerdeForStateStoreContext() { + public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForStateStoreContext() { final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); - utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new ConfigException("")); + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); - final Throwable exception = assertThrows(ConfigException.class, + final Throwable exception = assertThrows(StreamsException.class, () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); } @Test - public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() { + public void shouldThrowStreamsExceptionWithExplicitErrorMessageForProcessorContext() { final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new StreamsException("")); @@ -136,7 +143,7 @@ public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() } @Test - public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForStateStoreContext() { + public void shouldThrowStreamsExceptionWithExplicitErrorMessageForStateStoreContext() { final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new StreamsException(""));