From 9f1ebf3ef16dc965e7b8c21aa18cc1a7ff7187bd Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 5 Mar 2020 17:37:04 +0000 Subject: [PATCH 1/2] KAFKA-9667: Connect JSON serde strip trailing zeros This change turns on exact decimal processing in Jackson for decimals, meaning trailing zeros are maintained. This means a value of `1.2300` can be deserialized and re-serialized to JSON without any loss of information. --- .../kafka/connect/json/JsonConverter.java | 59 ++++++++++--------- .../kafka/connect/json/JsonDeserializer.java | 10 +++- .../kafka/connect/json/JsonConverterTest.java | 31 +++++++++- 3 files changed, 67 insertions(+), 33 deletions(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index ada278509829c..a4948a3c79244 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -191,6 +191,9 @@ public Object convert(Schema schema, JsonNode value) { // Convert values in Kafka Connect form into/from their logical types. These logical converters are discovered by logical type // names specified in the field private static final HashMap LOGICAL_CONVERTERS = new HashMap<>(); + + private static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.withExactBigDecimals(true); + static { LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() { @Override @@ -201,9 +204,9 @@ public JsonNode toJson(final Schema schema, final Object value, final JsonConver final BigDecimal decimal = (BigDecimal) value; switch (config.decimalFormat()) { case NUMERIC: - return JsonNodeFactory.instance.numberNode(decimal); + return JSON_NODE_FACTORY.numberNode(decimal); case BASE64: - return JsonNodeFactory.instance.binaryNode(Decimal.fromLogical(schema, decimal)); + return JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal)); default: throw new DataException("Unexpected " + JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat()); } @@ -229,7 +232,7 @@ public Object toConnect(final Schema schema, final JsonNode value) { public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) { if (!(value instanceof java.util.Date)) throw new DataException("Invalid type for Date, expected Date but was " + value.getClass()); - return JsonNodeFactory.instance.numberNode(Date.fromLogical(schema, (java.util.Date) value)); + return JSON_NODE_FACTORY.numberNode(Date.fromLogical(schema, (java.util.Date) value)); } @Override @@ -245,7 +248,7 @@ public Object toConnect(final Schema schema, final JsonNode value) { public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) { if (!(value instanceof java.util.Date)) throw new DataException("Invalid type for Time, expected Date but was " + value.getClass()); - return JsonNodeFactory.instance.numberNode(Time.fromLogical(schema, (java.util.Date) value)); + return JSON_NODE_FACTORY.numberNode(Time.fromLogical(schema, (java.util.Date) value)); } @Override @@ -261,7 +264,7 @@ public Object toConnect(final Schema schema, final JsonNode value) { public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) { if (!(value instanceof java.util.Date)) throw new DataException("Invalid type for Timestamp, expected Date but was " + value.getClass()); - return JsonNodeFactory.instance.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value)); + return JSON_NODE_FACTORY.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value)); } @Override @@ -285,7 +288,7 @@ public JsonConverter() { // floating point numbers that cannot fit into float64 final Set deserializationFeatures = new HashSet<>(); deserializationFeatures.add(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); - deserializer = new JsonDeserializer(deserializationFeatures); + deserializer = new JsonDeserializer(deserializationFeatures, true); } @Override @@ -362,7 +365,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { // The deserialized data should either be an envelope object containing the schema and the payload or the schema // was stripped during serialization and we need to fill in an all-encompassing schema. if (!config.schemasEnabled()) { - ObjectNode envelope = JsonNodeFactory.instance.objectNode(); + ObjectNode envelope = JSON_NODE_FACTORY.objectNode(); envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null); envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue); jsonValue = envelope; @@ -413,17 +416,17 @@ public ObjectNode asJsonSchema(Schema schema) { jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy(); break; case ARRAY: - jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME); + jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME); jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.valueSchema())); break; case MAP: - jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME); + jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME); jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.keySchema())); jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.valueSchema())); break; case STRUCT: - jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME); - ArrayNode fields = JsonNodeFactory.instance.arrayNode(); + jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME); + ArrayNode fields = JSON_NODE_FACTORY.arrayNode(); for (Field field : schema.fields()) { ObjectNode fieldJsonSchema = asJsonSchema(field.schema()).deepCopy(); fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name()); @@ -443,7 +446,7 @@ public ObjectNode asJsonSchema(Schema schema) { if (schema.doc() != null) jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc()); if (schema.parameters() != null) { - ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode(); + ObjectNode jsonSchemaParams = JSON_NODE_FACTORY.objectNode(); for (Map.Entry prop : schema.parameters().entrySet()) jsonSchemaParams.put(prop.getKey(), prop.getValue()); jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams); @@ -596,7 +599,7 @@ private JsonNode convertToJson(Schema schema, Object value) { if (schema.defaultValue() != null) return convertToJson(schema, schema.defaultValue()); if (schema.isOptional()) - return JsonNodeFactory.instance.nullNode(); + return JSON_NODE_FACTORY.nullNode(); throw new DataException("Conversion error: null value for field that is required and has no default value"); } @@ -617,32 +620,32 @@ private JsonNode convertToJson(Schema schema, Object value) { } switch (schemaType) { case INT8: - return JsonNodeFactory.instance.numberNode((Byte) value); + return JSON_NODE_FACTORY.numberNode((Byte) value); case INT16: - return JsonNodeFactory.instance.numberNode((Short) value); + return JSON_NODE_FACTORY.numberNode((Short) value); case INT32: - return JsonNodeFactory.instance.numberNode((Integer) value); + return JSON_NODE_FACTORY.numberNode((Integer) value); case INT64: - return JsonNodeFactory.instance.numberNode((Long) value); + return JSON_NODE_FACTORY.numberNode((Long) value); case FLOAT32: - return JsonNodeFactory.instance.numberNode((Float) value); + return JSON_NODE_FACTORY.numberNode((Float) value); case FLOAT64: - return JsonNodeFactory.instance.numberNode((Double) value); + return JSON_NODE_FACTORY.numberNode((Double) value); case BOOLEAN: - return JsonNodeFactory.instance.booleanNode((Boolean) value); + return JSON_NODE_FACTORY.booleanNode((Boolean) value); case STRING: CharSequence charSeq = (CharSequence) value; - return JsonNodeFactory.instance.textNode(charSeq.toString()); + return JSON_NODE_FACTORY.textNode(charSeq.toString()); case BYTES: if (value instanceof byte[]) - return JsonNodeFactory.instance.binaryNode((byte[]) value); + return JSON_NODE_FACTORY.binaryNode((byte[]) value); else if (value instanceof ByteBuffer) - return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array()); + return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array()); else throw new DataException("Invalid type for bytes type: " + value.getClass()); case ARRAY: { Collection collection = (Collection) value; - ArrayNode list = JsonNodeFactory.instance.arrayNode(); + ArrayNode list = JSON_NODE_FACTORY.arrayNode(); for (Object elem : collection) { Schema valueSchema = schema == null ? null : schema.valueSchema(); JsonNode fieldValue = convertToJson(valueSchema, elem); @@ -668,9 +671,9 @@ else if (value instanceof ByteBuffer) ObjectNode obj = null; ArrayNode list = null; if (objectMode) - obj = JsonNodeFactory.instance.objectNode(); + obj = JSON_NODE_FACTORY.objectNode(); else - list = JsonNodeFactory.instance.arrayNode(); + list = JSON_NODE_FACTORY.arrayNode(); for (Map.Entry entry : map.entrySet()) { Schema keySchema = schema == null ? null : schema.keySchema(); Schema valueSchema = schema == null ? null : schema.valueSchema(); @@ -680,7 +683,7 @@ else if (value instanceof ByteBuffer) if (objectMode) obj.set(mapKey.asText(), mapValue); else - list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue)); + list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue)); } return objectMode ? obj : list; } @@ -688,7 +691,7 @@ else if (value instanceof ByteBuffer) Struct struct = (Struct) value; if (!struct.schema().equals(schema)) throw new DataException("Mismatching schema."); - ObjectNode obj = JsonNodeFactory.instance.objectNode(); + ObjectNode obj = JSON_NODE_FACTORY.objectNode(); for (Field field : schema.fields()) { obj.set(field.name(), convertToJson(field.schema(), struct.get(field))); } diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java index a656e53283fea..e2d20dbf3c990 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; import java.util.Collections; import java.util.Set; import org.apache.kafka.common.errors.SerializationException; @@ -35,7 +36,7 @@ public class JsonDeserializer implements Deserializer { * Default constructor needed by Kafka */ public JsonDeserializer() { - this(Collections.emptySet()); + this(Collections.emptySet(), false); } /** @@ -43,9 +44,14 @@ public JsonDeserializer() { * for the deserializer * * @param deserializationFeatures the specified deserialization features + * @param exactDecimals {@code true} if trailing zeros on decimals should be maintained. */ - JsonDeserializer(final Set deserializationFeatures) { + JsonDeserializer( + final Set deserializationFeatures, + final boolean exactDecimals + ) { deserializationFeatures.forEach(objectMapper::enable); + objectMapper.setNodeFactory(JsonNodeFactory.withExactBigDecimals(exactDecimals)); } @Override diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 2a5695031e49d..2e189e2d584ae 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.json; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -65,8 +66,11 @@ public class JsonConverterTest { private static final String TOPIC = "topic"; - ObjectMapper objectMapper = new ObjectMapper(); - JsonConverter converter = new JsonConverter(); + private final ObjectMapper objectMapper = new ObjectMapper() + .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS) + .setNodeFactory(JsonNodeFactory.withExactBigDecimals(true)); + + private final JsonConverter converter = new JsonConverter(); @Before public void setUp() { @@ -272,6 +276,16 @@ public void numericDecimalToConnect() { assertEquals(reference, schemaAndValue.value()); } + @Test + public void numericDecimalWithTrailingZerosToConnect() { + BigDecimal reference = new BigDecimal(new BigInteger("15600"), 4); + Schema schema = Decimal.schema(4); + String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"4\" } }, \"payload\": 1.5600 }"; + SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes()); + assertEquals(schema, schemaAndValue.schema()); + assertEquals(reference, schemaAndValue.value()); + } + @Test public void highPrecisionNumericDecimalToConnect() { // this number is too big to be kept in a float64! @@ -634,7 +648,18 @@ public void decimalToNumericJson() { } @Test - public void decimalToJsonWithoutSchema() throws IOException { + public void decimalWithTrailingZerosToNumericJson() { + converter.configure(Collections.singletonMap(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()), false); + JsonNode converted = parse(converter.fromConnectData(TOPIC, Decimal.schema(4), new BigDecimal(new BigInteger("15600"), 4))); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"4\" } }"), + converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertTrue("expected node to be numeric", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNumber()); + assertEquals(new BigDecimal("1.5600"), converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).decimalValue()); + } + + @Test + public void decimalToJsonWithoutSchema() { assertThrows( "expected data exception when serializing BigDecimal without schema", DataException.class, From d20bf6bf2e3976b75efabb34bf482617643e855b Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Fri, 6 Mar 2020 15:30:37 +0000 Subject: [PATCH 2/2] requested changes --- .../kafka/connect/json/JsonConverter.java | 24 ++++++++++++------- .../kafka/connect/json/JsonDeserializer.java | 10 ++++---- .../kafka/connect/json/JsonSerializer.java | 20 ++++++++++++++++ 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index a4948a3c79244..8a2d6768cc96f 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.util.HashSet; -import java.util.Set; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; @@ -54,6 +52,8 @@ import java.util.Iterator; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkSet; + /** * Implementation of Converter that uses JSON to store schemas and objects. By default this converter will serialize Connect keys, values, * and headers with schemas, although this can be disabled with {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG schemas.enable} @@ -280,15 +280,23 @@ public Object toConnect(final Schema schema, final JsonNode value) { private Cache fromConnectSchemaCache; private Cache toConnectSchemaCache; - private final JsonSerializer serializer = new JsonSerializer(); + private final JsonSerializer serializer; private final JsonDeserializer deserializer; public JsonConverter() { - // this ensures that the JsonDeserializer maintains full precision on - // floating point numbers that cannot fit into float64 - final Set deserializationFeatures = new HashSet<>(); - deserializationFeatures.add(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); - deserializer = new JsonDeserializer(deserializationFeatures, true); + serializer = new JsonSerializer( + mkSet(), + JSON_NODE_FACTORY + ); + + deserializer = new JsonDeserializer( + mkSet( + // this ensures that the JsonDeserializer maintains full precision on + // floating point numbers that cannot fit into float64 + DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS + ), + JSON_NODE_FACTORY + ); } @Override diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java index e2d20dbf3c990..2e6e821b2d3bc 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java @@ -30,13 +30,13 @@ * structured data without having associated Java classes. This deserializer also supports Connect schemas. */ public class JsonDeserializer implements Deserializer { - private ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper = new ObjectMapper(); /** * Default constructor needed by Kafka */ public JsonDeserializer() { - this(Collections.emptySet(), false); + this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true)); } /** @@ -44,14 +44,14 @@ public JsonDeserializer() { * for the deserializer * * @param deserializationFeatures the specified deserialization features - * @param exactDecimals {@code true} if trailing zeros on decimals should be maintained. + * @param jsonNodeFactory the json node factory to use. */ JsonDeserializer( final Set deserializationFeatures, - final boolean exactDecimals + final JsonNodeFactory jsonNodeFactory ) { deserializationFeatures.forEach(objectMapper::enable); - objectMapper.setNodeFactory(JsonNodeFactory.withExactBigDecimals(exactDecimals)); + objectMapper.setNodeFactory(jsonNodeFactory); } @Override diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java index 94ec0a83e8ce6..0f2b62bd0a40e 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java @@ -18,9 +18,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; +import java.util.Collections; +import java.util.Set; + /** * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily * structured data without corresponding Java classes. This serializer also supports Connect schemas. @@ -32,7 +37,22 @@ public class JsonSerializer implements Serializer { * Default constructor needed by Kafka */ public JsonSerializer() { + this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true)); + } + /** + * A constructor that additionally specifies some {@link SerializationFeature} + * for the serializer + * + * @param serializationFeatures the specified serialization features + * @param jsonNodeFactory the json node factory to use. + */ + JsonSerializer( + final Set serializationFeatures, + final JsonNodeFactory jsonNodeFactory + ) { + serializationFeatures.forEach(objectMapper::enable); + objectMapper.setNodeFactory(jsonNodeFactory); } @Override