Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@

import org.slf4j.Logger;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -200,7 +202,8 @@ public <K, V> void send(final String topic,
try {
keyBytes = keySerializer.serialize(topic, headers, key);
} catch (final ClassCastException exception) {
throw createStreamsExceptionForKeyClassCastException(
throw createStreamsExceptionForClassCastException(
ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic,
key,
keySerializer,
Expand All @@ -223,7 +226,8 @@ public <K, V> void send(final String topic,
try {
valBytes = valueSerializer.serialize(topic, headers, value);
} catch (final ClassCastException exception) {
throw createStreamsExceptionForValueClassCastException(
throw createStreamsExceptionForClassCastException(
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic,
value,
valueSerializer,
Expand Down Expand Up @@ -335,39 +339,27 @@ private <K, V> void handleException(final ProductionExceptionHandler.Serializati

droppedRecordsSensor.record();
}
private <K> StreamsException createStreamsExceptionForKeyClassCastException(final String topic,
final K key,
final Serializer<K> keySerializer,
final ClassCastException exception) {
final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
return new StreamsException(
String.format(
"ClassCastException while producing data to topic %s. " +
"The key serializer %s is not compatible to the actual key type: %s. " +
"Change the default key serde in StreamConfig or provide the correct key serde via method parameters " +
"(for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with " +
"`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
topic,
keySerializer.getClass().getName(),
keyClass),
exception);
}

private <V> StreamsException createStreamsExceptionForValueClassCastException(final String topic,
final V value,
final Serializer<V> valueSerializer,
final ClassCastException exception) {
final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
private <KV> StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final KV keyOrValue,
final Serializer<KV> keyOrValueSerializer,
final ClassCastException exception) {
final String keyOrValueClass = keyOrValue == null
? String.format("unknown because %s is null", origin.toString().toLowerCase(Locale.ROOT)) : keyOrValue.getClass().getName();

return new StreamsException(
MessageFormat.format(
String.format(
"ClassCastException while producing data to topic %s. " +
"The value serializer %s is not compatible to the actual value type: %s. " +
"Change the default value serde in StreamConfig or provide the correct value serde via method parameters " +
"(for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with " +
"`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
"The {0} serializer %s is not compatible to the actual {0} type: %s. " +
"Change the default {0} serde in StreamConfig or provide the correct {0} serde via method parameters " +
"(for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with " +
"`Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
topic,
valueSerializer.getClass().getName(),
valueClass),
keyOrValueSerializer.getClass().getName(),
keyOrValueClass),
origin.toString().toLowerCase(Locale.ROOT)),
exception);
}

Expand Down