Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
Expand Down Expand Up @@ -54,6 +52,8 @@
import java.util.Iterator;
import java.util.Map;

import static org.apache.kafka.common.utils.Utils.mkSet;

/**
* Implementation of Converter that uses JSON to store schemas and objects. By default this converter will serialize Connect keys, values,
* and headers with schemas, although this can be disabled with {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG schemas.enable}
Expand Down Expand Up @@ -191,6 +191,9 @@ public Object convert(Schema schema, JsonNode value) {
// Convert values in Kafka Connect form into/from their logical types. These logical converters are discovered by logical type
// names specified in the field
private static final HashMap<String, LogicalTypeConverter> LOGICAL_CONVERTERS = new HashMap<>();

private static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.withExactBigDecimals(true);

static {
LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
Expand All @@ -201,9 +204,9 @@ public JsonNode toJson(final Schema schema, final Object value, final JsonConver
final BigDecimal decimal = (BigDecimal) value;
switch (config.decimalFormat()) {
case NUMERIC:
return JsonNodeFactory.instance.numberNode(decimal);
return JSON_NODE_FACTORY.numberNode(decimal);
case BASE64:
return JsonNodeFactory.instance.binaryNode(Decimal.fromLogical(schema, decimal));
return JSON_NODE_FACTORY.binaryNode(Decimal.fromLogical(schema, decimal));
default:
throw new DataException("Unexpected " + JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat());
}
Expand All @@ -229,7 +232,7 @@ public Object toConnect(final Schema schema, final JsonNode value) {
public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Date, expected Date but was " + value.getClass());
return JsonNodeFactory.instance.numberNode(Date.fromLogical(schema, (java.util.Date) value));
return JSON_NODE_FACTORY.numberNode(Date.fromLogical(schema, (java.util.Date) value));
}

@Override
Expand All @@ -245,7 +248,7 @@ public Object toConnect(final Schema schema, final JsonNode value) {
public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Time, expected Date but was " + value.getClass());
return JsonNodeFactory.instance.numberNode(Time.fromLogical(schema, (java.util.Date) value));
return JSON_NODE_FACTORY.numberNode(Time.fromLogical(schema, (java.util.Date) value));
}

@Override
Expand All @@ -261,7 +264,7 @@ public Object toConnect(final Schema schema, final JsonNode value) {
public JsonNode toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Timestamp, expected Date but was " + value.getClass());
return JsonNodeFactory.instance.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value));
return JSON_NODE_FACTORY.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value));
}

@Override
Expand All @@ -277,15 +280,23 @@ public Object toConnect(final Schema schema, final JsonNode value) {
private Cache<Schema, ObjectNode> fromConnectSchemaCache;
private Cache<JsonNode, Schema> toConnectSchemaCache;

private final JsonSerializer serializer = new JsonSerializer();
private final JsonSerializer serializer;
private final JsonDeserializer deserializer;

public JsonConverter() {
// this ensures that the JsonDeserializer maintains full precision on
// floating point numbers that cannot fit into float64
final Set<DeserializationFeature> deserializationFeatures = new HashSet<>();
deserializationFeatures.add(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
deserializer = new JsonDeserializer(deserializationFeatures);
serializer = new JsonSerializer(
mkSet(),
JSON_NODE_FACTORY
);

deserializer = new JsonDeserializer(
mkSet(
// this ensures that the JsonDeserializer maintains full precision on
// floating point numbers that cannot fit into float64
DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS
),
JSON_NODE_FACTORY
);
}

@Override
Expand Down Expand Up @@ -362,7 +373,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
// was stripped during serialization and we need to fill in an all-encompassing schema.
if (!config.schemasEnabled()) {
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
jsonValue = envelope;
Expand Down Expand Up @@ -413,17 +424,17 @@ public ObjectNode asJsonSchema(Schema schema) {
jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy();
break;
case ARRAY:
jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.valueSchema()));
break;
case MAP:
jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.keySchema()));
jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.valueSchema()));
break;
case STRUCT:
jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
ArrayNode fields = JsonNodeFactory.instance.arrayNode();
jsonSchema = JSON_NODE_FACTORY.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
ArrayNode fields = JSON_NODE_FACTORY.arrayNode();
for (Field field : schema.fields()) {
ObjectNode fieldJsonSchema = asJsonSchema(field.schema()).deepCopy();
fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
Expand All @@ -443,7 +454,7 @@ public ObjectNode asJsonSchema(Schema schema) {
if (schema.doc() != null)
jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
if (schema.parameters() != null) {
ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode();
ObjectNode jsonSchemaParams = JSON_NODE_FACTORY.objectNode();
for (Map.Entry<String, String> prop : schema.parameters().entrySet())
jsonSchemaParams.put(prop.getKey(), prop.getValue());
jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
Expand Down Expand Up @@ -596,7 +607,7 @@ private JsonNode convertToJson(Schema schema, Object value) {
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
return JsonNodeFactory.instance.nullNode();
return JSON_NODE_FACTORY.nullNode();
throw new DataException("Conversion error: null value for field that is required and has no default value");
}

Expand All @@ -617,32 +628,32 @@ private JsonNode convertToJson(Schema schema, Object value) {
}
switch (schemaType) {
case INT8:
return JsonNodeFactory.instance.numberNode((Byte) value);
return JSON_NODE_FACTORY.numberNode((Byte) value);
case INT16:
return JsonNodeFactory.instance.numberNode((Short) value);
return JSON_NODE_FACTORY.numberNode((Short) value);
case INT32:
return JsonNodeFactory.instance.numberNode((Integer) value);
return JSON_NODE_FACTORY.numberNode((Integer) value);
case INT64:
return JsonNodeFactory.instance.numberNode((Long) value);
return JSON_NODE_FACTORY.numberNode((Long) value);
case FLOAT32:
return JsonNodeFactory.instance.numberNode((Float) value);
return JSON_NODE_FACTORY.numberNode((Float) value);
case FLOAT64:
return JsonNodeFactory.instance.numberNode((Double) value);
return JSON_NODE_FACTORY.numberNode((Double) value);
case BOOLEAN:
return JsonNodeFactory.instance.booleanNode((Boolean) value);
return JSON_NODE_FACTORY.booleanNode((Boolean) value);
case STRING:
CharSequence charSeq = (CharSequence) value;
return JsonNodeFactory.instance.textNode(charSeq.toString());
return JSON_NODE_FACTORY.textNode(charSeq.toString());
case BYTES:
if (value instanceof byte[])
return JsonNodeFactory.instance.binaryNode((byte[]) value);
return JSON_NODE_FACTORY.binaryNode((byte[]) value);
else if (value instanceof ByteBuffer)
return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
else
throw new DataException("Invalid type for bytes type: " + value.getClass());
case ARRAY: {
Collection collection = (Collection) value;
ArrayNode list = JsonNodeFactory.instance.arrayNode();
ArrayNode list = JSON_NODE_FACTORY.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
Expand All @@ -668,9 +679,9 @@ else if (value instanceof ByteBuffer)
ObjectNode obj = null;
ArrayNode list = null;
if (objectMode)
obj = JsonNodeFactory.instance.objectNode();
obj = JSON_NODE_FACTORY.objectNode();
else
list = JsonNodeFactory.instance.arrayNode();
list = JSON_NODE_FACTORY.arrayNode();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
Expand All @@ -680,15 +691,15 @@ else if (value instanceof ByteBuffer)
if (objectMode)
obj.set(mapKey.asText(), mapValue);
else
list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
}
return objectMode ? obj : list;
}
case STRUCT: {
Struct struct = (Struct) value;
if (!struct.schema().equals(schema))
throw new DataException("Mismatching schema.");
ObjectNode obj = JsonNodeFactory.instance.objectNode();
ObjectNode obj = JSON_NODE_FACTORY.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.errors.SerializationException;
Expand All @@ -29,23 +30,28 @@
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
*/
public class JsonDeserializer implements Deserializer<JsonNode> {
private ObjectMapper objectMapper = new ObjectMapper();
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Default constructor needed by Kafka
*/
public JsonDeserializer() {
this(Collections.emptySet());
this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true));
}

/**
* A constructor that additionally specifies some {@link DeserializationFeature}
* for the deserializer
*
* @param deserializationFeatures the specified deserialization features
* @param jsonNodeFactory the json node factory to use.
*/
JsonDeserializer(final Set<DeserializationFeature> deserializationFeatures) {
JsonDeserializer(
final Set<DeserializationFeature> deserializationFeatures,
final JsonNodeFactory jsonNodeFactory
) {
deserializationFeatures.forEach(objectMapper::enable);
objectMapper.setNodeFactory(jsonNodeFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Collections;
import java.util.Set;

/**
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also supports Connect schemas.
Expand All @@ -32,7 +37,22 @@ public class JsonSerializer implements Serializer<JsonNode> {
* Default constructor needed by Kafka
*/
public JsonSerializer() {
this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true));
}

/**
* A constructor that additionally specifies some {@link SerializationFeature}
* for the serializer
*
* @param serializationFeatures the specified serialization features
* @param jsonNodeFactory the json node factory to use.
*/
JsonSerializer(
final Set<SerializationFeature> serializationFeatures,
final JsonNodeFactory jsonNodeFactory
) {
serializationFeatures.forEach(objectMapper::enable);
objectMapper.setNodeFactory(jsonNodeFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.json;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -65,8 +66,11 @@
public class JsonConverterTest {
private static final String TOPIC = "topic";

ObjectMapper objectMapper = new ObjectMapper();
JsonConverter converter = new JsonConverter();
private final ObjectMapper objectMapper = new ObjectMapper()
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));

private final JsonConverter converter = new JsonConverter();

@Before
public void setUp() {
Expand Down Expand Up @@ -272,6 +276,16 @@ public void numericDecimalToConnect() {
assertEquals(reference, schemaAndValue.value());
}

@Test
public void numericDecimalWithTrailingZerosToConnect() {
BigDecimal reference = new BigDecimal(new BigInteger("15600"), 4);
Schema schema = Decimal.schema(4);
String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"4\" } }, \"payload\": 1.5600 }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
assertEquals(schema, schemaAndValue.schema());
assertEquals(reference, schemaAndValue.value());
}

@Test
public void highPrecisionNumericDecimalToConnect() {
// this number is too big to be kept in a float64!
Expand Down Expand Up @@ -634,7 +648,18 @@ public void decimalToNumericJson() {
}

@Test
public void decimalToJsonWithoutSchema() throws IOException {
public void decimalWithTrailingZerosToNumericJson() {
converter.configure(Collections.singletonMap(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()), false);
JsonNode converted = parse(converter.fromConnectData(TOPIC, Decimal.schema(4), new BigDecimal(new BigInteger("15600"), 4)));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"4\" } }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertTrue("expected node to be numeric", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNumber());
assertEquals(new BigDecimal("1.5600"), converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).decimalValue());
}

@Test
public void decimalToJsonWithoutSchema() {
assertThrows(
"expected data exception when serializing BigDecimal without schema",
DataException.class,
Expand Down