diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index de4afc2c924fa..a4dc0a68062ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -53,9 +53,11 @@ import org.slf4j.Logger; +import java.text.MessageFormat; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -200,7 +202,8 @@ public void send(final String topic, try { keyBytes = keySerializer.serialize(topic, headers, key); } catch (final ClassCastException exception) { - throw createStreamsExceptionForKeyClassCastException( + throw createStreamsExceptionForClassCastException( + ProductionExceptionHandler.SerializationExceptionOrigin.KEY, topic, key, keySerializer, @@ -223,7 +226,8 @@ public void send(final String topic, try { valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { - throw createStreamsExceptionForValueClassCastException( + throw createStreamsExceptionForClassCastException( + ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, topic, value, valueSerializer, @@ -335,39 +339,27 @@ private void handleException(final ProductionExceptionHandler.Serializati droppedRecordsSensor.record(); } - private StreamsException createStreamsExceptionForKeyClassCastException(final String topic, - final K key, - final Serializer keySerializer, - final ClassCastException exception) { - final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); - return new StreamsException( - String.format( - "ClassCastException while producing data to topic %s. " + - "The key serializer %s is not compatible to the actual key type: %s. " + - "Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - topic, - keySerializer.getClass().getName(), - keyClass), - exception); - } - private StreamsException createStreamsExceptionForValueClassCastException(final String topic, - final V value, - final Serializer valueSerializer, - final ClassCastException exception) { - final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + private StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, + final String topic, + final KV keyOrValue, + final Serializer keyOrValueSerializer, + final ClassCastException exception) { + final String keyOrValueClass = keyOrValue == null + ? String.format("unknown because %s is null", origin.toString().toLowerCase(Locale.ROOT)) : keyOrValue.getClass().getName(); + return new StreamsException( + MessageFormat.format( String.format( "ClassCastException while producing data to topic %s. " + - "The value serializer %s is not compatible to the actual value type: %s. " + - "Change the default value serde in StreamConfig or provide the correct value serde via method parameters " + - "(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + "The {0} serializer %s is not compatible to the actual {0} type: %s. " + + "Change the default {0} serde in StreamConfig or provide the correct {0} serde via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", topic, - valueSerializer.getClass().getName(), - valueClass), + keyOrValueSerializer.getClass().getName(), + keyOrValueClass), + origin.toString().toLowerCase(Locale.ROOT)), exception); }