Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;

import java.nio.BufferUnderflowException;
import java.util.EnumSet;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -284,6 +285,37 @@ public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
return dest;
}

/**
* Starting from the current position, read an integer indicating the size of the byte array to read,
* then read the array. Consumes the buffer: upon returning, the buffer's position is after the array
* that is returned.
* @param buffer The buffer to read a size-prefixed array from
* @return The array
*/
public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
final int size = buffer.getInt();
return getNullableArray(buffer, size);
}

/**
* Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
* is after the array that is returned.
* @param buffer The buffer to read a size-prefixed array from
* @param size The number of bytes to read out of the buffer
* @return The array
*/
public static byte[] getNullableArray(final ByteBuffer buffer, final int size) {
if (size > buffer.remaining()) {
// preemptively throw this when the read is doomed to fail, so we don't have to allocate the array.
throw new BufferUnderflowException();
}
final byte[] oldBytes = size == -1 ? null : new byte[size];
if (oldBytes != null) {
buffer.get(oldBytes);
}
return oldBytes;
}

/**
* Returns a copy of src byte array
* @param src The byte array to copy
Expand Down
72 changes: 67 additions & 5 deletions clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -63,6 +64,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -83,7 +85,7 @@ public void testMurmur2() {
cases.put("a-little-bit-long-string".getBytes(), -985981536);
cases.put("a-little-bit-longer-string".getBytes(), -1486304829);
cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971);
cases.put(new byte[]{'a', 'b', 'c'}, 479470107);
cases.put(new byte[] {'a', 'b', 'c'}, 479470107);

for (Map.Entry<byte[], Integer> c : cases.entrySet()) {
assertEquals(c.getValue().intValue(), murmur2(c.getKey()));
Expand Down Expand Up @@ -216,6 +218,65 @@ public void toArrayDirectByteBuffer() {
assertEquals(2, buffer.position());
}

@Test
public void getNullableSizePrefixedArrayExact() {
byte[] input = {0, 0, 0, 2, 1, 0};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {1, 0}, array);
assertEquals(6, buffer.position());
assertFalse(buffer.hasRemaining());
}

@Test
public void getNullableSizePrefixedArrayExactEmpty() {
byte[] input = {0, 0, 0, 0};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {}, array);
assertEquals(4, buffer.position());
assertFalse(buffer.hasRemaining());
}

@Test
public void getNullableSizePrefixedArrayRemainder() {
byte[] input = {0, 0, 0, 2, 1, 0, 9};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {1, 0}, array);
assertEquals(6, buffer.position());
assertTrue(buffer.hasRemaining());
}

@Test
public void getNullableSizePrefixedArrayNull() {
// -1
byte[] input = {-1, -1, -1, -1};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertNull(array);
assertEquals(4, buffer.position());
assertFalse(buffer.hasRemaining());
}

@Test
public void getNullableSizePrefixedArrayInvalid() {
// -2
byte[] input = {-1, -1, -1, -2};
final ByteBuffer buffer = ByteBuffer.wrap(input);
assertThrows(NegativeArraySizeException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
}

@Test
public void getNullableSizePrefixedArrayUnderflow() {
// Integer.MAX_VALUE
byte[] input = {127, -1, -1, -1};
final ByteBuffer buffer = ByteBuffer.wrap(input);
// note, we get a buffer underflow exception instead of an OOME, even though the encoded size
// would be 2,147,483,647 aka 2.1 GB, probably larger than the available heap
assertThrows(BufferUnderflowException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
}

@Test
public void utf8ByteArraySerde() {
String utf8String = "A\u00ea\u00f1\u00fcC";
Expand Down Expand Up @@ -427,7 +488,7 @@ public void testReadFullyOrFailWithPartialFileChannelReads() throws IOException
String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
assertEquals("The buffer should be populated correctly", expectedBufferContent,
new String(buffer.array()));
new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
Expand All @@ -444,7 +505,7 @@ public void testReadFullyWithPartialFileChannelReads() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
Utils.readFully(channelMock, buffer, 0L);
assertEquals("The buffer should be populated correctly.", expectedBufferContent,
new String(buffer.array()));
new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
Expand Down Expand Up @@ -493,7 +554,7 @@ public void testLoadProps() throws IOException {
*
* @param channelMock The mocked FileChannel object
* @param bufferSize The buffer size
* @return Expected buffer string
* @return Expected buffer string
* @throws IOException If an I/O error occurs
*/
private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
Expand Down Expand Up @@ -530,8 +591,9 @@ private static class TestCloseable implements Closeable {
@Override
public void close() throws IOException {
closed = true;
if (closeException != null)
if (closeException != null) {
throw closeException;
}
}

static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;

public final class FullChangeSerde<T> {
private final Serde<T> inner;
Expand Down Expand Up @@ -68,33 +69,6 @@ public Change<T> deserializeParts(final String topic, final Change<byte[]> seria
return new Change<>(newValue, oldValue);
}

/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used in the test now, so I moved it.

if (serialChange == null) {
return null;
}

final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;

final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));


buffer.putInt(oldSize);
if (serialChange.oldValue != null) {
buffer.put(serialChange.oldValue);
}

buffer.putInt(newSize);
if (serialChange.newValue != null) {
buffer.put(serialChange.newValue);
}
return buffer.array();
}

/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
* need to be able to read it (so that we can load the state store from previously-written changelog records).
Expand All @@ -104,19 +78,8 @@ public static Change<byte[]> decomposeLegacyFormattedArrayIntoChangeArrays(final
return null;
}
final ByteBuffer buffer = ByteBuffer.wrap(data);

final int oldSize = buffer.getInt();
final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
if (oldBytes != null) {
buffer.get(oldBytes);
}

final int newSize = buffer.getInt();
final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
if (newBytes != null) {
buffer.get(newBytes);
}

final byte[] oldBytes = getNullableSizePrefixedArray(buffer);
final byte[] newBytes = getNullableSizePrefixedArray(buffer);
return new Change<>(newBytes, oldBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Objects;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;

public class ProcessorRecordContext implements RecordContext {

Expand Down Expand Up @@ -161,12 +163,10 @@ public byte[] serialize() {
public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long offset = buffer.getLong();
final int topicSize = buffer.getInt();
final String topic;
{
// not handling the null topic condition, because we believe the topic will never be null when we serialize
final byte[] topicBytes = new byte[topicSize];
buffer.get(topicBytes);
// we believe the topic will never be null when we serialize
final byte[] topicBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
topic = new String(topicBytes, UTF_8);
}
final int partition = buffer.getInt();
Expand All @@ -177,19 +177,8 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
} else {
final Header[] headerArr = new Header[headerCount];
for (int i = 0; i < headerCount; i++) {
final int keySize = buffer.getInt();
final byte[] keyBytes = new byte[keySize];
buffer.get(keyBytes);

final int valueSize = buffer.getInt();
final byte[] valueBytes;
if (valueSize == -1) {
valueBytes = null;
} else {
valueBytes = new byte[valueSize];
buffer.get(valueBytes);
}

final byte[] keyBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
final byte[] valueBytes = getNullableSizePrefixedArray(buffer);
headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes);
}
headers = new RecordHeaders(headerArr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Arrays;
import java.util.Objects;

import static org.apache.kafka.common.utils.Utils.getNullableArray;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;

public final class BufferValue {
private static final int NULL_VALUE_SENTINEL = -1;
private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
Expand Down Expand Up @@ -67,35 +70,21 @@ ProcessorRecordContext context() {
static BufferValue deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);

final byte[] priorValue = extractValue(buffer);
final byte[] priorValue = getNullableSizePrefixedArray(buffer);

final byte[] oldValue;
final int oldValueLength = buffer.getInt();
if (oldValueLength == NULL_VALUE_SENTINEL) {
oldValue = null;
} else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
oldValue = new byte[oldValueLength];
buffer.get(oldValue);
oldValue = getNullableArray(buffer, oldValueLength);
}

final byte[] newValue = extractValue(buffer);
final byte[] newValue = getNullableSizePrefixedArray(buffer);

return new BufferValue(priorValue, oldValue, newValue, context);
}

private static byte[] extractValue(final ByteBuffer buffer) {
final int valueLength = buffer.getInt();
if (valueLength == NULL_VALUE_SENTINEL) {
return null;
} else {
final byte[] value = new byte[valueLength];
buffer.get(value);
return value;
}
}

ByteBuffer serialize(final int endPadding) {

final int sizeOfValueLength = Integer.BYTES;
Expand All @@ -120,7 +109,7 @@ ByteBuffer serialize(final int endPadding) {

if (oldValue == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
} else if (priorValue == oldValue) {
} else if (Arrays.equals(priorValue, oldValue)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was correct before, since we check equality and enforce identity in the constructor, but Arrays.equals is extremely cheap when the arrays are identical, so explicitly doing an identity check instead of equality was a micro-optimization.

buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
} else {
buffer.putInt(sizeOfOldValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Arrays;
import java.util.Objects;

import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;

public class ContextualRecord {
private final byte[] value;
private final ProcessorRecordContext recordContext;
Expand All @@ -43,36 +45,10 @@ long residentMemorySizeEstimate() {
return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
}

ByteBuffer serialize(final int endPadding) {
final byte[] serializedContext = recordContext.serialize();

final int sizeOfContext = serializedContext.length;
final int sizeOfValueLength = Integer.BYTES;
final int sizeOfValue = value == null ? 0 : value.length;
final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding);

buffer.put(serializedContext);
if (value == null) {
buffer.putInt(-1);
} else {
buffer.putInt(value.length);
buffer.put(value);
}

return buffer;
}

static ContextualRecord deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);

final int valueLength = buffer.getInt();
if (valueLength == -1) {
return new ContextualRecord(null, context);
} else {
final byte[] value = new byte[valueLength];
buffer.get(value);
return new ContextualRecord(value, context);
}
final byte[] value = getNullableSizePrefixedArray(buffer);
return new ContextualRecord(value, context);
}

@Override
Expand Down
Loading