Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public long readLong() {
return buf.getLong();
}

@Override
public double readDouble() {
return ByteUtils.readDouble(buf);
}

@Override
public void readArray(byte[] arr) {
buf.get(arr);
Expand Down Expand Up @@ -88,6 +93,11 @@ public void writeLong(long val) {
buf.putLong(val);
}

@Override
public void writeDouble(double val) {
ByteUtils.writeDouble(val, buf);
}

@Override
public void writeByteArray(byte[] arr) {
buf.put(arr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface Readable {
short readShort();
int readInt();
long readLong();
double readDouble();
void readArray(byte[] arr);
int readUnsignedVarint();
ByteBuffer readByteBuffer(int length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public interface Writable {
void writeShort(short val);
void writeInt(int val);
void writeLong(long val);
void writeDouble(double val);
void writeByteArray(byte[] arr);
void writeUnsignedVarint(int i);
void writeByteBuffer(ByteBuffer buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ public Int16(String name, String docString) {
}
}

public static class Float64 extends Field {
public Float64(String name, String docString) {
super(name, Type.FLOAT64, docString, false, null);
}

public Float64(String name, String docString, double defaultValue) {
super(name, Type.FLOAT64, docString, true, defaultValue);
}
}

public static class Str extends Field {
public Str(String name, String docString) {
super(name, Type.STRING, docString, false, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public Short get(Field.Int16 field) {
return getShort(field.name);
}

public Double get(Field.Float64 field) {
return getDouble(field.name);
}

public String get(Field.Str field) {
return getString(field.name);
}
Expand Down Expand Up @@ -147,6 +151,12 @@ public Integer getOrElse(Field.Int32 field, int alternative) {
return alternative;
}

public Double getOrElse(Field.Float64 field, double alternative) {
if (hasField(field.name))
return getDouble(field.name);
return alternative;
}

public String getOrElse(Field.NullableStr field, String alternative) {
if (hasField(field.name))
return getString(field.name);
Expand Down Expand Up @@ -264,6 +274,14 @@ public UUID getUUID(String name) {
return (UUID) get(name);
}

public Double getDouble(BoundField field) {
return (Double) get(field);
}

public Double getDouble(String name) {
return (Double) get(name);
}

public Object[] getArray(BoundField field) {
return (Object[]) get(field);
}
Expand Down Expand Up @@ -369,6 +387,10 @@ public Struct set(Field.Int16 def, short value) {
return set(def.name, value);
}

public Struct set(Field.Float64 def, double value) {
return set(def.name, value);
}

public Struct set(Field.Bool def, boolean value) {
return set(def.name, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,42 @@ public String documentation() {
}
};

public static final DocumentedType FLOAT64 = new DocumentedType() {
@Override
public void write(ByteBuffer buffer, Object o) {
ByteUtils.writeDouble((Double) o, buffer);
}

@Override
public Object read(ByteBuffer buffer) {
return ByteUtils.readDouble(buffer);
}

@Override
public int sizeOf(Object o) {
return 8;
}

@Override
public String typeName() {
return "FLOAT64";
}

@Override
public Double validate(Object item) {
if (item instanceof Double)
return (Double) item;
else
throw new SchemaException(item + " is not a Double.");
}

@Override
public String documentation() {
return "Represents a double-precision 64-bit format IEEE 754 value. " +
"The values are encoded using eight bytes in network byte order (big-endian).";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my benefit, what was the reasoning for choosing big endian?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency - all of the integers types are big-endian as well.

}
};

public static final DocumentedType STRING = new DocumentedType() {
@Override
public void write(ByteBuffer buffer, Object o) {
Expand Down Expand Up @@ -956,7 +992,7 @@ public String documentation() {
private static String toHtml() {
DocumentedType[] types = {
BOOLEAN, INT8, INT16, INT32, INT64,
UNSIGNED_INT32, VARINT, VARLONG, UUID,
UNSIGNED_INT32, VARINT, VARLONG, UUID, FLOAT64,
STRING, COMPACT_STRING, NULLABLE_STRING, COMPACT_NULLABLE_STRING,
BYTES, COMPACT_BYTES, NULLABLE_BYTES, COMPACT_NULLABLE_BYTES,
RECORDS, new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING)};
Expand Down
40 changes: 40 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,26 @@ public static long readVarlong(ByteBuffer buffer) {
return (value >>> 1) ^ -(value & 1);
}

/**
* Read a double-precision 64-bit format IEEE 754 value.
*
* @param in The input to read from
* @return The double value read
*/
public static double readDouble(DataInput in) throws IOException {
return in.readDouble();
}

/**
* Read a double-precision 64-bit format IEEE 754 value.
*
* @param buffer The buffer to read from
* @return The long value read
*/
public static double readDouble(ByteBuffer buffer) {
return buffer.getDouble();
}

/**
* Write the given integer following the variable-length unsigned encoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>
Expand Down Expand Up @@ -346,6 +366,26 @@ public static void writeVarlong(long value, ByteBuffer buffer) {
buffer.put((byte) v);
}

/**
* Write the given double following the double-precision 64-bit format IEEE 754 value into the output.
*
* @param value The value to write
* @param out The output to write to
*/
public static void writeDouble(double value, DataOutput out) throws IOException {
out.writeDouble(value);
}

/**
* Write the given double following the double-precision 64-bit format IEEE 754 value into the buffer.
*
* @param value The value to write
* @param buffer The buffer to write to
*/
public static void writeDouble(double value, ByteBuffer buffer) {
buffer.putDouble(value);
}

/**
* Number of bytes needed to encode an integer in unsigned variable-length format.
*
Expand Down
22 changes: 13 additions & 9 deletions clients/src/main/resources/common/message/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ There are several primitive field types available.

* "int64": a 64-bit integer.

* "float64": is a double-precision floating point number (IEEE 754).

* "string": a UTF-8 string.

* "bytes": binary data.
Expand All @@ -93,8 +95,8 @@ Guide](https://kafka.apache.org/protocol.html).

Nullable Fields
---------------
Booleans and ints can never be null. However, fields that are strings, bytes,
or arrays may optionally be "nullable." When a field is "nullable," that
Booleans, ints, and floats can never be null. However, fields that are strings,
bytes, or arrays may optionally be "nullable." When a field is "nullable," that
simply means that we are prepared to serialize and deserialize null entries for
that field.

Expand Down Expand Up @@ -172,6 +174,8 @@ been set:

* Integer fields default to 0.

* Floats default to 0.

* Booleans default to false.

* Strings default to the empty string.
Expand All @@ -186,20 +190,20 @@ versions of the field are nullable.

Custom Default Values
---------------------
You may set a custom default for fields that are integers, booleans, or strings.
Just add a "default" entry in the JSON object. The custom default overrides the
normal default for the type. So for example, you could make a boolean field
default to true rather than false, and so forth.
You may set a custom default for fields that are integers, booleans, floats, or
strings. Just add a "default" entry in the JSON object. The custom default
overrides the normal default for the type. So for example, you could make a
boolean field default to true rather than false, and so forth.

Note that the default must be valid for the field type. So the default for an
int16 field must by an integer that fits in 16 bits, and so forth. You may
int16 field must be an integer that fits in 16 bits, and so forth. You may
specify hex or octal values, as long as they are prefixed with 0x or 0. It is
currently not possible to specify a custom default for bytes or array fields.

Custom defaults are useful when an older message version lacked some
information. For example, if an older request lacked a timeout field, you may
want to specify that the server should assume that the timeout for such a
request is 5000 ms (or some other arbitrary value.)
request is 5000 ms (or some other arbitrary value).

Ignorable Fields
----------------
Expand All @@ -223,7 +227,7 @@ Hash Sets
---------
One very common pattern in Kafka is to load array elements from a message into
a Map or Set for easier access. The message protocol makes this easier with
the "mapKey" concept.
the "mapKey" concept.

If some of the elements of an array are annotated with "mapKey": true, the
entire array will be treated as a linked hash set rather than a list. Elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void setup() {
new Field("int64", Type.INT64),
new Field("varint", Type.VARINT),
new Field("varlong", Type.VARLONG),
new Field("float64", Type.FLOAT64),
new Field("string", Type.STRING),
new Field("compact_string", Type.COMPACT_STRING),
new Field("nullable_string", Type.NULLABLE_STRING),
Expand All @@ -64,6 +65,7 @@ public void setup() {
.set("int64", 1L)
.set("varint", 300)
.set("varlong", 500L)
.set("float64", 0.5D)
.set("string", "1")
.set("compact_string", "1")
.set("nullable_string", null)
Expand All @@ -87,6 +89,16 @@ public void testSimple() {
check(Type.INT16, (short) -11111, "INT16");
check(Type.INT32, -11111111, "INT32");
check(Type.INT64, -11111111111L, "INT64");
check(Type.FLOAT64, 2.5, "FLOAT64");
check(Type.FLOAT64, -0.5, "FLOAT64");
check(Type.FLOAT64, 1e300, "FLOAT64");
check(Type.FLOAT64, 0.0, "FLOAT64");
check(Type.FLOAT64, -0.0, "FLOAT64");
check(Type.FLOAT64, Double.MAX_VALUE, "FLOAT64");
check(Type.FLOAT64, Double.MIN_VALUE, "FLOAT64");
check(Type.FLOAT64, Double.NaN, "FLOAT64");
check(Type.FLOAT64, Double.NEGATIVE_INFINITY, "FLOAT64");
check(Type.FLOAT64, Double.POSITIVE_INFINITY, "FLOAT64");
Comment thread
bdbyrne marked this conversation as resolved.
check(Type.STRING, "", "STRING");
check(Type.STRING, "hello", "STRING");
check(Type.STRING, "A\u00ea\u00f1\u00fcC", "STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class StructTest {
new Field.Int32("int32", ""),
new Field.Int64("int64", ""),
new Field.Bool("boolean", ""),
new Field.Float64("float64", ""),
new Field.Str("string", ""));

private static final Schema ARRAY_SCHEMA = new Schema(new Field.Array("array", new ArrayOf(Type.INT8), ""));
Expand All @@ -46,20 +47,23 @@ public void testEquals() {
.set("int32", 12)
.set("int64", (long) 12)
.set("boolean", true)
.set("float64", 0.5)
.set("string", "foobar");
Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA)
.set("int8", (byte) 12)
.set("int16", (short) 12)
.set("int32", 12)
.set("int64", (long) 12)
.set("boolean", true)
.set("float64", 0.5)
.set("string", "foobar");
Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA)
.set("int8", (byte) 12)
.set("int16", (short) 12)
.set("int32", 12)
.set("int64", (long) 12)
.set("boolean", true)
.set("float64", 0.5)
.set("string", "mismatching string");

assertEquals(struct1, struct2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,23 @@ public void testInvalidVarlong() {
ByteUtils.readVarlong(buf);
}

@Test
public void testDouble() throws IOException {
assertDoubleSerde(0.0, 0x0L);
assertDoubleSerde(-0.0, 0x8000000000000000L);
assertDoubleSerde(1.0, 0x3FF0000000000000L);
assertDoubleSerde(-1.0, 0xBFF0000000000000L);
assertDoubleSerde(123e45, 0x49B58B82C0E0BB00L);
assertDoubleSerde(-123e45, 0xC9B58B82C0E0BB00L);
assertDoubleSerde(Double.MIN_VALUE, 0x1L);
assertDoubleSerde(-Double.MIN_VALUE, 0x8000000000000001L);
assertDoubleSerde(Double.MAX_VALUE, 0x7FEFFFFFFFFFFFFFL);
assertDoubleSerde(-Double.MAX_VALUE, 0xFFEFFFFFFFFFFFFFL);
assertDoubleSerde(Double.NaN, 0x7FF8000000000000L);
assertDoubleSerde(Double.POSITIVE_INFINITY, 0x7FF0000000000000L);
assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
}

private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(32);
ByteUtils.writeUnsignedVarint(value, buf);
Expand Down Expand Up @@ -269,4 +286,25 @@ private void assertVarlongSerde(long value, byte[] expectedEncoding) throws IOEx
assertEquals(value, ByteUtils.readVarlong(in));
}

private void assertDoubleSerde(double value, long expectedLongValue) throws IOException {
byte[] expectedEncoding = new byte[8];
for (int i = 0; i < 8; i++) {
expectedEncoding[7 - i] = (byte) (expectedLongValue & 0xFF);
expectedLongValue >>= 8;
}

ByteBuffer buf = ByteBuffer.allocate(8);
ByteUtils.writeDouble(value, buf);
buf.flip();
assertEquals(value, ByteUtils.readDouble(buf.duplicate()), 0.0);
assertArrayEquals(expectedEncoding, Utils.toArray(buf));

buf.rewind();
DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buf));
ByteUtils.writeDouble(value, out);
buf.flip();
assertArrayEquals(expectedEncoding, Utils.toArray(buf));
DataInputStream in = new DataInputStream(new ByteBufferInputStream(buf));
assertEquals(value, ByteUtils.readDouble(in), 0.0);
}
}
Loading