Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,14 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No
} catch (ArithmeticException e) {
// continue
}
float fValue = decimal.floatValue();
if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY
&& decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue);
}
double dValue = decimal.doubleValue();
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY
&& decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
}
Schema schema = Decimal.schema(decimal.scale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -726,6 +728,132 @@ public void shouldConvertTimestampValues() {
public void canConsume() {
}

@Test
public void shouldParseBigIntegerAsDecimalWithZeroScale() {
BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1"));
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Decimal.schema(0), schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof BigDecimal);
assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1"));
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Decimal.schema(0), schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof BigDecimal);
assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
}

@Test
public void shouldParseByteAsInt8() {
Byte value = Byte.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Byte);
assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
value = Byte.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Byte);
assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
}

@Test
public void shouldParseShortAsInt16() {
Short value = Short.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Short);
assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
value = Short.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Short);
assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
}

@Test
public void shouldParseIntegerAsInt32() {
Integer value = Integer.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Integer);
assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
value = Integer.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Integer);
assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
}

@Test
public void shouldParseLongAsInt64() {
Long value = Long.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Long);
assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
value = Long.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Long);
assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
}

@Test
public void shouldParseFloatAsFloat32() {
Float value = Float.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Float);
assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
value = -Float.MAX_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Float);
assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
}

@Test
public void shouldParseDoubleAsFloat64() {
Double value = Double.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Double);
assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
value = -Double.MAX_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Double);
assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
}

protected void assertParsed(String input) {
assertParsed(input, input);
}
Expand Down