diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java index a78f175b2a0d9..d902f049faea2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java @@ -48,6 +48,11 @@ public long readLong() { return buf.getLong(); } + @Override + public double readDouble() { + return ByteUtils.readDouble(buf); + } + @Override public void readArray(byte[] arr) { buf.get(arr); @@ -88,6 +93,11 @@ public void writeLong(long val) { buf.putLong(val); } + @Override + public void writeDouble(double val) { + ByteUtils.writeDouble(val, buf); + } + @Override public void writeByteArray(byte[] arr) { buf.put(arr); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java index a2903f8b44274..899e8e487abcf 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java @@ -30,6 +30,7 @@ public interface Readable { short readShort(); int readInt(); long readLong(); + double readDouble(); void readArray(byte[] arr); int readUnsignedVarint(); ByteBuffer readByteBuffer(int length); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java index ac9133fc16066..82e2201478007 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java @@ -25,6 +25,7 @@ public interface Writable { void writeShort(short val); void writeInt(int val); void writeLong(long val); + void writeDouble(double val); void writeByteArray(byte[] arr); void writeUnsignedVarint(int i); void writeByteBuffer(ByteBuffer buf); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java index 749b7f3dc961f..7d5c3f5d3d4bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java @@ -91,6 +91,16 @@ public Int16(String name, String docString) { } } + public static class Float64 extends Field { + public Float64(String name, String docString) { + super(name, Type.FLOAT64, docString, false, null); + } + + public Float64(String name, String docString, double defaultValue) { + super(name, Type.FLOAT64, docString, true, defaultValue); + } + } + public static class Str extends Field { public Str(String name, String docString) { super(name, Type.STRING, docString, false, null); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index f2a3476ac0fe6..b58ae91711440 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -97,6 +97,10 @@ public Short get(Field.Int16 field) { return getShort(field.name); } + public Double get(Field.Float64 field) { + return getDouble(field.name); + } + public String get(Field.Str field) { return getString(field.name); } @@ -147,6 +151,12 @@ public Integer getOrElse(Field.Int32 field, int alternative) { return alternative; } + public Double getOrElse(Field.Float64 field, double alternative) { + if (hasField(field.name)) + return getDouble(field.name); + return alternative; + } + public String getOrElse(Field.NullableStr field, String alternative) { if (hasField(field.name)) return getString(field.name); @@ -264,6 +274,14 @@ public UUID getUUID(String name) { return (UUID) get(name); } + public Double getDouble(BoundField field) { + return (Double) get(field); + } + + public Double getDouble(String name) { + return (Double) get(name); + } + public Object[] getArray(BoundField field) { return (Object[]) get(field); } @@ -369,6 +387,10 @@ public Struct set(Field.Int16 def, short value) { return set(def.name, value); } + public Struct set(Field.Float64 def, double value) { + return set(def.name, value); + } + public Struct set(Field.Bool def, boolean value) { return set(def.name, value); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 6183625a785f8..6d9c2503be26e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -367,6 +367,42 @@ public String documentation() { } }; + public static final DocumentedType FLOAT64 = new DocumentedType() { + @Override + public void write(ByteBuffer buffer, Object o) { + ByteUtils.writeDouble((Double) o, buffer); + } + + @Override + public Object read(ByteBuffer buffer) { + return ByteUtils.readDouble(buffer); + } + + @Override + public int sizeOf(Object o) { + return 8; + } + + @Override + public String typeName() { + return "FLOAT64"; + } + + @Override + public Double validate(Object item) { + if (item instanceof Double) + return (Double) item; + else + throw new SchemaException(item + " is not a Double."); + } + + @Override + public String documentation() { + return "Represents a double-precision 64-bit format IEEE 754 value. " + + "The values are encoded using eight bytes in network byte order (big-endian)."; + } + }; + public static final DocumentedType STRING = new DocumentedType() { @Override public void write(ByteBuffer buffer, Object o) { @@ -956,7 +992,7 @@ public String documentation() { private static String toHtml() { DocumentedType[] types = { BOOLEAN, INT8, INT16, INT32, INT64, - UNSIGNED_INT32, VARINT, VARLONG, UUID, + UNSIGNED_INT32, VARINT, VARLONG, UUID, FLOAT64, STRING, COMPACT_STRING, NULLABLE_STRING, COMPACT_NULLABLE_STRING, BYTES, COMPACT_BYTES, NULLABLE_BYTES, COMPACT_NULLABLE_BYTES, RECORDS, new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING)}; diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java index 681e95b5299e1..15868721da9ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -253,6 +253,26 @@ public static long readVarlong(ByteBuffer buffer) { return (value >>> 1) ^ -(value & 1); } + /** + * Read a double-precision 64-bit format IEEE 754 value. + * + * @param in The input to read from + * @return The double value read + */ + public static double readDouble(DataInput in) throws IOException { + return in.readDouble(); + } + + /** + * Read a double-precision 64-bit format IEEE 754 value. + * + * @param buffer The buffer to read from + * @return The long value read + */ + public static double readDouble(ByteBuffer buffer) { + return buffer.getDouble(); + } + /** * Write the given integer following the variable-length unsigned encoding from * Google Protocol Buffers @@ -346,6 +366,26 @@ public static void writeVarlong(long value, ByteBuffer buffer) { buffer.put((byte) v); } + /** + * Write the given double following the double-precision 64-bit format IEEE 754 value into the output. + * + * @param value The value to write + * @param out The output to write to + */ + public static void writeDouble(double value, DataOutput out) throws IOException { + out.writeDouble(value); + } + + /** + * Write the given double following the double-precision 64-bit format IEEE 754 value into the buffer. + * + * @param value The value to write + * @param buffer The buffer to write to + */ + public static void writeDouble(double value, ByteBuffer buffer) { + buffer.putDouble(value); + } + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * diff --git a/clients/src/main/resources/common/message/README.md b/clients/src/main/resources/common/message/README.md index 64e17b16b5075..774059399446a 100644 --- a/clients/src/main/resources/common/message/README.md +++ b/clients/src/main/resources/common/message/README.md @@ -79,6 +79,8 @@ There are several primitive field types available. * "int64": a 64-bit integer. +* "float64": is a double-precision floating point number (IEEE 754). + * "string": a UTF-8 string. * "bytes": binary data. @@ -93,8 +95,8 @@ Guide](https://kafka.apache.org/protocol.html). Nullable Fields --------------- -Booleans and ints can never be null. However, fields that are strings, bytes, -or arrays may optionally be "nullable." When a field is "nullable," that +Booleans, ints, and floats can never be null. However, fields that are strings, +bytes, or arrays may optionally be "nullable." When a field is "nullable," that simply means that we are prepared to serialize and deserialize null entries for that field. @@ -172,6 +174,8 @@ been set: * Integer fields default to 0. +* Floats default to 0. + * Booleans default to false. * Strings default to the empty string. @@ -186,20 +190,20 @@ versions of the field are nullable. Custom Default Values --------------------- -You may set a custom default for fields that are integers, booleans, or strings. -Just add a "default" entry in the JSON object. The custom default overrides the -normal default for the type. So for example, you could make a boolean field -default to true rather than false, and so forth. +You may set a custom default for fields that are integers, booleans, floats, or +strings. Just add a "default" entry in the JSON object. The custom default +overrides the normal default for the type. So for example, you could make a +boolean field default to true rather than false, and so forth. Note that the default must be valid for the field type. So the default for an -int16 field must by an integer that fits in 16 bits, and so forth. You may +int16 field must be an integer that fits in 16 bits, and so forth. You may specify hex or octal values, as long as they are prefixed with 0x or 0. It is currently not possible to specify a custom default for bytes or array fields. Custom defaults are useful when an older message version lacked some information. For example, if an older request lacked a timeout field, you may want to specify that the server should assume that the timeout for such a -request is 5000 ms (or some other arbitrary value.) +request is 5000 ms (or some other arbitrary value). Ignorable Fields ---------------- @@ -223,7 +227,7 @@ Hash Sets --------- One very common pattern in Kafka is to load array elements from a message into a Map or Set for easier access. The message protocol makes this easier with -the "mapKey" concept. +the "mapKey" concept. If some of the elements of an array are annotated with "mapKey": true, the entire array will be treated as a linked hash set rather than a list. Elements diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index e1c85671c51f5..dfce6aad53604 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -44,6 +44,7 @@ public void setup() { new Field("int64", Type.INT64), new Field("varint", Type.VARINT), new Field("varlong", Type.VARLONG), + new Field("float64", Type.FLOAT64), new Field("string", Type.STRING), new Field("compact_string", Type.COMPACT_STRING), new Field("nullable_string", Type.NULLABLE_STRING), @@ -64,6 +65,7 @@ public void setup() { .set("int64", 1L) .set("varint", 300) .set("varlong", 500L) + .set("float64", 0.5D) .set("string", "1") .set("compact_string", "1") .set("nullable_string", null) @@ -87,6 +89,16 @@ public void testSimple() { check(Type.INT16, (short) -11111, "INT16"); check(Type.INT32, -11111111, "INT32"); check(Type.INT64, -11111111111L, "INT64"); + check(Type.FLOAT64, 2.5, "FLOAT64"); + check(Type.FLOAT64, -0.5, "FLOAT64"); + check(Type.FLOAT64, 1e300, "FLOAT64"); + check(Type.FLOAT64, 0.0, "FLOAT64"); + check(Type.FLOAT64, -0.0, "FLOAT64"); + check(Type.FLOAT64, Double.MAX_VALUE, "FLOAT64"); + check(Type.FLOAT64, Double.MIN_VALUE, "FLOAT64"); + check(Type.FLOAT64, Double.NaN, "FLOAT64"); + check(Type.FLOAT64, Double.NEGATIVE_INFINITY, "FLOAT64"); + check(Type.FLOAT64, Double.POSITIVE_INFINITY, "FLOAT64"); check(Type.STRING, "", "STRING"); check(Type.STRING, "hello", "STRING"); check(Type.STRING, "A\u00ea\u00f1\u00fcC", "STRING"); diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/StructTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/StructTest.java index 44cd09bc19d01..12e561c4f53c0 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/StructTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/StructTest.java @@ -29,6 +29,7 @@ public class StructTest { new Field.Int32("int32", ""), new Field.Int64("int64", ""), new Field.Bool("boolean", ""), + new Field.Float64("float64", ""), new Field.Str("string", "")); private static final Schema ARRAY_SCHEMA = new Schema(new Field.Array("array", new ArrayOf(Type.INT8), "")); @@ -46,6 +47,7 @@ public void testEquals() { .set("int32", 12) .set("int64", (long) 12) .set("boolean", true) + .set("float64", 0.5) .set("string", "foobar"); Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA) .set("int8", (byte) 12) @@ -53,6 +55,7 @@ public void testEquals() { .set("int32", 12) .set("int64", (long) 12) .set("boolean", true) + .set("float64", 0.5) .set("string", "foobar"); Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA) .set("int8", (byte) 12) @@ -60,6 +63,7 @@ public void testEquals() { .set("int32", 12) .set("int64", (long) 12) .set("boolean", true) + .set("float64", 0.5) .set("string", "mismatching string"); assertEquals(struct1, struct2); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java index f15e26d222b05..7e599f9614027 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java @@ -221,6 +221,23 @@ public void testInvalidVarlong() { ByteUtils.readVarlong(buf); } + @Test + public void testDouble() throws IOException { + assertDoubleSerde(0.0, 0x0L); + assertDoubleSerde(-0.0, 0x8000000000000000L); + assertDoubleSerde(1.0, 0x3FF0000000000000L); + assertDoubleSerde(-1.0, 0xBFF0000000000000L); + assertDoubleSerde(123e45, 0x49B58B82C0E0BB00L); + assertDoubleSerde(-123e45, 0xC9B58B82C0E0BB00L); + assertDoubleSerde(Double.MIN_VALUE, 0x1L); + assertDoubleSerde(-Double.MIN_VALUE, 0x8000000000000001L); + assertDoubleSerde(Double.MAX_VALUE, 0x7FEFFFFFFFFFFFFFL); + assertDoubleSerde(-Double.MAX_VALUE, 0xFFEFFFFFFFFFFFFFL); + assertDoubleSerde(Double.NaN, 0x7FF8000000000000L); + assertDoubleSerde(Double.POSITIVE_INFINITY, 0x7FF0000000000000L); + assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L); + } + private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throws IOException { ByteBuffer buf = ByteBuffer.allocate(32); ByteUtils.writeUnsignedVarint(value, buf); @@ -269,4 +286,25 @@ private void assertVarlongSerde(long value, byte[] expectedEncoding) throws IOEx assertEquals(value, ByteUtils.readVarlong(in)); } + private void assertDoubleSerde(double value, long expectedLongValue) throws IOException { + byte[] expectedEncoding = new byte[8]; + for (int i = 0; i < 8; i++) { + expectedEncoding[7 - i] = (byte) (expectedLongValue & 0xFF); + expectedLongValue >>= 8; + } + + ByteBuffer buf = ByteBuffer.allocate(8); + ByteUtils.writeDouble(value, buf); + buf.flip(); + assertEquals(value, ByteUtils.readDouble(buf.duplicate()), 0.0); + assertArrayEquals(expectedEncoding, Utils.toArray(buf)); + + buf.rewind(); + DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buf)); + ByteUtils.writeDouble(value, out); + buf.flip(); + assertArrayEquals(expectedEncoding, Utils.toArray(buf)); + DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf)); + assertEquals(value, ByteUtils.readDouble(in), 0.0); + } } diff --git a/clients/src/test/resources/common/message/SimpleExampleMessage.json b/clients/src/test/resources/common/message/SimpleExampleMessage.json index 0346ca522ab5d..b39e7aba1b5ec 100644 --- a/clients/src/test/resources/common/message/SimpleExampleMessage.json +++ b/clients/src/test/resources/common/message/SimpleExampleMessage.json @@ -25,13 +25,15 @@ "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1 }, { "name": "myInt16", "type": "int16", "default": "123", "taggedVersions": "1+", "tag": 2 }, - { "name": "myString", "type": "string", "taggedVersions": "1+", "tag": 3 }, + { "name": "myFloat64", "type": "float64", "default": "12.34", + "taggedVersions": "1+", "tag": 3 }, + { "name": "myString", "type": "string", "taggedVersions": "1+", "tag": 4 }, { "name": "myBytes", "type": "bytes", - "nullableVersions": "1+", "taggedVersions": "1+", "tag": 4 }, + "nullableVersions": "1+", "taggedVersions": "1+", "tag": 5 }, { "name": "taggedUuid", "type": "uuid", "default": "212d5494-4a8b-4fdf-94b3-88b470beb367", - "taggedVersions": "1+", "tag": 5}, + "taggedVersions": "1+", "tag": 6 }, { "name": "taggedLong", "type": "int64", "default": "0xcafcacafcacafca", - "taggedVersions": "1+", "tag": 6}, + "taggedVersions": "1+", "tag": 7 }, { "name": "zeroCopyByteBuffer", "versions": "1", "type": "bytes", "zeroCopy": true }, { "name": "nullableZeroCopyByteBuffer", "versions": "1", "nullableVersions": "0+", "type": "bytes", "zeroCopy": true } diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java index b14449ba9d97a..0b4880d9940db 100644 --- a/generator/src/main/java/org/apache/kafka/message/FieldType.java +++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java @@ -112,6 +112,21 @@ public String toString() { } } + final class Float64FieldType implements FieldType { + static final Float64FieldType INSTANCE = new Float64FieldType(); + private static final String NAME = "float64"; + + @Override + public Optional fixedLength() { + return Optional.of(8); + } + + @Override + public String toString() { + return NAME; + } + } + final class StringFieldType implements FieldType { static final StringFieldType INSTANCE = new StringFieldType(); private static final String NAME = "string"; @@ -241,6 +256,8 @@ static FieldType parse(String string) { return Int64FieldType.INSTANCE; case UUIDFieldType.NAME: return UUIDFieldType.INSTANCE; + case Float64FieldType.NAME: + return Float64FieldType.INSTANCE; case StringFieldType.NAME: return StringFieldType.INSTANCE; case BytesFieldType.NAME: diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index 420f689708430..e1bd59aeea1c4 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -345,6 +345,8 @@ private String fieldAbstractJavaType(FieldSpec field) { } else if (field.type() instanceof FieldType.UUIDFieldType) { headerGenerator.addImport(MessageGenerator.UUID_CLASS); return "UUID"; + } else if (field.type() instanceof FieldType.Float64FieldType) { + return "double"; } else if (field.type().isString()) { return "String"; } else if (field.type().isBytes()) { @@ -573,6 +575,8 @@ private String primitiveReadExpression(FieldType type) { return "_readable.readLong()"; } else if (type instanceof FieldType.UUIDFieldType) { return "_readable.readUUID()"; + } else if (type instanceof FieldType.Float64FieldType) { + return "_readable.readDouble()"; } else if (type.isStruct()) { return String.format("new %s(_readable, _version)", type.toString()); } else { @@ -821,6 +825,8 @@ private String getBoxedJavaType(FieldType type) { } else if (type instanceof FieldType.UUIDFieldType) { headerGenerator.addImport(MessageGenerator.UUID_CLASS); return "UUID"; + } else if (type instanceof FieldType.Float64FieldType) { + return "Double"; } else if (type.isString()) { return "String"; } else if (type.isStruct()) { @@ -843,6 +849,8 @@ private String readFieldFromStruct(FieldType type, String name, boolean zeroCopy return String.format("struct.getLong(\"%s\")", name); } else if (type instanceof FieldType.UUIDFieldType) { return String.format("struct.getUUID(\"%s\")", name); + } else if (type instanceof FieldType.Float64FieldType) { + return String.format("struct.getDouble(\"%s\")", name); } else if (type.isString()) { return String.format("struct.getString(\"%s\")", name); } else if (type.isBytes()) { @@ -1052,6 +1060,8 @@ private String primitiveWriteExpression(FieldType type, String name) { return String.format("_writable.writeLong(%s)", name); } else if (type instanceof FieldType.UUIDFieldType) { return String.format("_writable.writeUUID(%s)", name); + } else if (type instanceof FieldType.Float64FieldType) { + return String.format("_writable.writeDouble(%s)", name); } else if (type instanceof FieldType.StructType) { return String.format("%s.write(_writable, _cache, _version)", name); } else { @@ -1278,6 +1288,7 @@ private void generateFieldToStruct(FieldSpec field, Versions versions) { (field.type() instanceof FieldType.Int32FieldType) || (field.type() instanceof FieldType.Int64FieldType) || (field.type() instanceof FieldType.UUIDFieldType) || + (field.type() instanceof FieldType.Float64FieldType) || (field.type() instanceof FieldType.StringFieldType)) { buffer.printf("struct.set(\"%s\", this.%s);%n", field.snakeCaseName(), field.camelCaseName()); @@ -1317,6 +1328,7 @@ private void generateTaggedFieldToMap(FieldSpec field, Versions versions) { (field.type() instanceof FieldType.Int32FieldType) || (field.type() instanceof FieldType.Int64FieldType) || (field.type() instanceof FieldType.UUIDFieldType) || + (field.type() instanceof FieldType.Float64FieldType) || (field.type() instanceof FieldType.StringFieldType)) { buffer.printf("_taggedFields.put(%d, %s);%n", field.tag().get(), field.camelCaseName()); @@ -1726,6 +1738,9 @@ private void generateFieldHashCode(FieldSpec field) { } else if (field.type() instanceof FieldType.UUIDFieldType) { buffer.printf("hashCode = 31 * hashCode + %s.hashCode();%n", field.camelCaseName()); + } else if (field.type() instanceof FieldType.Float64FieldType) { + buffer.printf("hashCode = 31 * hashCode + Double.hashCode(%s);%n", + field.camelCaseName(), field.camelCaseName()); } else if (field.type().isBytes()) { if (field.zeroCopy()) { headerGenerator.addImport(MessageGenerator.OBJECTS_CLASS); @@ -1770,7 +1785,8 @@ private void generateFieldToString(String prefix, FieldSpec field) { } else if ((field.type() instanceof FieldType.Int8FieldType) || (field.type() instanceof FieldType.Int16FieldType) || (field.type() instanceof FieldType.Int32FieldType) || - (field.type() instanceof FieldType.Int64FieldType)) { + (field.type() instanceof FieldType.Int64FieldType) || + (field.type() instanceof FieldType.Float64FieldType)) { buffer.printf("+ \"%s%s=\" + %s%n", prefix, field.camelCaseName(), field.camelCaseName()); } else if (field.type().isString()) { @@ -1893,6 +1909,18 @@ private String fieldDefault(FieldSpec field) { headerGenerator.addImport(MessageGenerator.UUID_CLASS); return "UUID.fromString(\"" + field.defaultString() + "\")"; } + } else if (field.type() instanceof FieldType.Float64FieldType) { + if (field.defaultString().isEmpty()) { + return "0.0"; + } else { + try { + Double.parseDouble(field.defaultString()); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid default for float64 field " + + field.name() + ": " + field.defaultString(), e); + } + return "Double.parseDouble(\"" + field.defaultString() + "\")"; + } } else if (field.type() instanceof FieldType.StringFieldType) { if (field.defaultString().equals("null")) { validateNullDefault(field); diff --git a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java index 2f1b448e64910..717eeda3796a4 100644 --- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java @@ -260,6 +260,12 @@ private String fieldTypeToSchemaType(FieldType type, throw new RuntimeException("Type " + type + " cannot be nullable."); } return "Type.UUID"; + } else if (type instanceof FieldType.Float64FieldType) { + headerGenerator.addImport(MessageGenerator.TYPE_CLASS); + if (nullable) { + throw new RuntimeException("Type " + type + " cannot be nullable."); + } + return "Type.FLOAT64"; } else if (type instanceof FieldType.StringFieldType) { headerGenerator.addImport(MessageGenerator.TYPE_CLASS); if (fieldFlexibleVersions.contains(version)) {