diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 5aaad260a7..7071b3978e 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -45,6 +45,7 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +114,8 @@ void translate(final LogicalJoin join, final TranslatorContext context) { tableFieldNames); Serde keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); - Serde valueSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); + SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = + (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); // Always re-partition the messages from the input stream by the composite key and then join the messages // with the table.