diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index cfe4b04454149..d89d57ac9e355 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import java.nio.BufferUnderflowException; import java.util.EnumSet; import java.util.SortedSet; import java.util.TreeSet; @@ -284,6 +285,37 @@ public static byte[] toArray(ByteBuffer buffer, int offset, int size) { return dest; } + /** + * Starting from the current position, read an integer indicating the size of the byte array to read, + * then read the array. Consumes the buffer: upon returning, the buffer's position is after the array + * that is returned. + * @param buffer The buffer to read a size-prefixed array from + * @return The array + */ + public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) { + final int size = buffer.getInt(); + return getNullableArray(buffer, size); + } + + /** + * Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position + * is after the array that is returned. + * @param buffer The buffer to read a size-prefixed array from + * @param size The number of bytes to read out of the buffer + * @return The array + */ + public static byte[] getNullableArray(final ByteBuffer buffer, final int size) { + if (size > buffer.remaining()) { + // preemptively throw this when the read is doomed to fail, so we don't have to allocate the array. + throw new BufferUnderflowException(); + } + final byte[] oldBytes = size == -1 ? null : new byte[size]; + if (oldBytes != null) { + buffer.get(oldBytes); + } + return oldBytes; + } + /** * Returns a copy of src byte array * @param src The byte array to copy diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index e134f7681be09..3cc0043b9629a 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -26,6 +26,7 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; @@ -63,6 +64,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -83,7 +85,7 @@ public void testMurmur2() { cases.put("a-little-bit-long-string".getBytes(), -985981536); cases.put("a-little-bit-longer-string".getBytes(), -1486304829); cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971); - cases.put(new byte[]{'a', 'b', 'c'}, 479470107); + cases.put(new byte[] {'a', 'b', 'c'}, 479470107); for (Map.Entry c : cases.entrySet()) { assertEquals(c.getValue().intValue(), murmur2(c.getKey())); @@ -216,6 +218,65 @@ public void toArrayDirectByteBuffer() { assertEquals(2, buffer.position()); } + @Test + public void getNullableSizePrefixedArrayExact() { + byte[] input = {0, 0, 0, 2, 1, 0}; + final ByteBuffer buffer = ByteBuffer.wrap(input); + final byte[] array = Utils.getNullableSizePrefixedArray(buffer); + assertArrayEquals(new byte[] {1, 0}, array); + assertEquals(6, buffer.position()); + assertFalse(buffer.hasRemaining()); + } + + @Test + public void getNullableSizePrefixedArrayExactEmpty() { + byte[] input = {0, 0, 0, 0}; + final ByteBuffer buffer = ByteBuffer.wrap(input); + final byte[] array = Utils.getNullableSizePrefixedArray(buffer); + assertArrayEquals(new byte[] {}, array); + assertEquals(4, buffer.position()); + assertFalse(buffer.hasRemaining()); + } + + @Test + public void getNullableSizePrefixedArrayRemainder() { + byte[] input = {0, 0, 0, 2, 1, 0, 9}; + final ByteBuffer buffer = ByteBuffer.wrap(input); + final byte[] array = Utils.getNullableSizePrefixedArray(buffer); + assertArrayEquals(new byte[] {1, 0}, array); + assertEquals(6, buffer.position()); + assertTrue(buffer.hasRemaining()); + } + + @Test + public void getNullableSizePrefixedArrayNull() { + // -1 + byte[] input = {-1, -1, -1, -1}; + final ByteBuffer buffer = ByteBuffer.wrap(input); + final byte[] array = Utils.getNullableSizePrefixedArray(buffer); + assertNull(array); + assertEquals(4, buffer.position()); + assertFalse(buffer.hasRemaining()); + } + + @Test + public void getNullableSizePrefixedArrayInvalid() { + // -2 + byte[] input = {-1, -1, -1, -2}; + final ByteBuffer buffer = ByteBuffer.wrap(input); + assertThrows(NegativeArraySizeException.class, () -> Utils.getNullableSizePrefixedArray(buffer)); + } + + @Test + public void getNullableSizePrefixedArrayUnderflow() { + // Integer.MAX_VALUE + byte[] input = {127, -1, -1, -1}; + final ByteBuffer buffer = ByteBuffer.wrap(input); + // note, we get a buffer underflow exception instead of an OOME, even though the encoded size + // would be 2,147,483,647 aka 2.1 GB, probably larger than the available heap + assertThrows(BufferUnderflowException.class, () -> Utils.getNullableSizePrefixedArray(buffer)); + } + @Test public void utf8ByteArraySerde() { String utf8String = "A\u00ea\u00f1\u00fcC"; @@ -427,7 +488,7 @@ public void testReadFullyOrFailWithPartialFileChannelReads() throws IOException String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize); Utils.readFullyOrFail(channelMock, buffer, 0L, "test"); assertEquals("The buffer should be populated correctly", expectedBufferContent, - new String(buffer.array())); + new String(buffer.array())); assertFalse("The buffer should be filled", buffer.hasRemaining()); verify(channelMock, atLeastOnce()).read(any(), anyLong()); } @@ -444,7 +505,7 @@ public void testReadFullyWithPartialFileChannelReads() throws IOException { ByteBuffer buffer = ByteBuffer.allocate(bufferSize); Utils.readFully(channelMock, buffer, 0L); assertEquals("The buffer should be populated correctly.", expectedBufferContent, - new String(buffer.array())); + new String(buffer.array())); assertFalse("The buffer should be filled", buffer.hasRemaining()); verify(channelMock, atLeastOnce()).read(any(), anyLong()); } @@ -493,7 +554,7 @@ public void testLoadProps() throws IOException { * * @param channelMock The mocked FileChannel object * @param bufferSize The buffer size - * @return Expected buffer string + * @return Expected buffer string * @throws IOException If an I/O error occurs */ private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock, @@ -530,8 +591,9 @@ private static class TestCloseable implements Closeable { @Override public void close() throws IOException { closed = true; - if (closeException != null) + if (closeException != null) { throw closeException; + } } static TestCloseable[] createCloseables(boolean... exceptionOnClose) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java index 5d7c7e35697ee..3a3439448cbe0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray; public final class FullChangeSerde { private final Serde inner; @@ -68,33 +69,6 @@ public Change deserializeParts(final String topic, final Change seria return new Change<>(newValue, oldValue); } - /** - * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here - * so that we can produce the legacy format to test that we can still deserialize it. - */ - public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change serialChange) { - if (serialChange == null) { - return null; - } - - final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length; - final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length; - - final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize)); - - - buffer.putInt(oldSize); - if (serialChange.oldValue != null) { - buffer.put(serialChange.oldValue); - } - - buffer.putInt(newSize); - if (serialChange.newValue != null) { - buffer.put(serialChange.newValue); - } - return buffer.array(); - } - /** * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still * need to be able to read it (so that we can load the state store from previously-written changelog records). @@ -104,19 +78,8 @@ public static Change decomposeLegacyFormattedArrayIntoChangeArrays(final return null; } final ByteBuffer buffer = ByteBuffer.wrap(data); - - final int oldSize = buffer.getInt(); - final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize]; - if (oldBytes != null) { - buffer.get(oldBytes); - } - - final int newSize = buffer.getInt(); - final byte[] newBytes = newSize == -1 ? null : new byte[newSize]; - if (newBytes != null) { - buffer.get(newBytes); - } - + final byte[] oldBytes = getNullableSizePrefixedArray(buffer); + final byte[] newBytes = getNullableSizePrefixedArray(buffer); return new Change<>(newBytes, oldBytes); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 5662417a21460..5dd00620196a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -26,6 +26,8 @@ import java.util.Objects; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; +import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray; public class ProcessorRecordContext implements RecordContext { @@ -161,12 +163,10 @@ public byte[] serialize() { public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { final long timestamp = buffer.getLong(); final long offset = buffer.getLong(); - final int topicSize = buffer.getInt(); final String topic; { - // not handling the null topic condition, because we believe the topic will never be null when we serialize - final byte[] topicBytes = new byte[topicSize]; - buffer.get(topicBytes); + // we believe the topic will never be null when we serialize + final byte[] topicBytes = requireNonNull(getNullableSizePrefixedArray(buffer)); topic = new String(topicBytes, UTF_8); } final int partition = buffer.getInt(); @@ -177,19 +177,8 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { } else { final Header[] headerArr = new Header[headerCount]; for (int i = 0; i < headerCount; i++) { - final int keySize = buffer.getInt(); - final byte[] keyBytes = new byte[keySize]; - buffer.get(keyBytes); - - final int valueSize = buffer.getInt(); - final byte[] valueBytes; - if (valueSize == -1) { - valueBytes = null; - } else { - valueBytes = new byte[valueSize]; - buffer.get(valueBytes); - } - + final byte[] keyBytes = requireNonNull(getNullableSizePrefixedArray(buffer)); + final byte[] valueBytes = getNullableSizePrefixedArray(buffer); headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes); } headers = new RecordHeaders(headerArr); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java index b52ec24d20cc1..f27ab19a002da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java @@ -22,6 +22,9 @@ import java.util.Arrays; import java.util.Objects; +import static org.apache.kafka.common.utils.Utils.getNullableArray; +import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray; + public final class BufferValue { private static final int NULL_VALUE_SENTINEL = -1; private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2; @@ -67,35 +70,21 @@ ProcessorRecordContext context() { static BufferValue deserialize(final ByteBuffer buffer) { final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer); - final byte[] priorValue = extractValue(buffer); + final byte[] priorValue = getNullableSizePrefixedArray(buffer); final byte[] oldValue; final int oldValueLength = buffer.getInt(); - if (oldValueLength == NULL_VALUE_SENTINEL) { - oldValue = null; - } else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) { + if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) { oldValue = priorValue; } else { - oldValue = new byte[oldValueLength]; - buffer.get(oldValue); + oldValue = getNullableArray(buffer, oldValueLength); } - final byte[] newValue = extractValue(buffer); + final byte[] newValue = getNullableSizePrefixedArray(buffer); return new BufferValue(priorValue, oldValue, newValue, context); } - private static byte[] extractValue(final ByteBuffer buffer) { - final int valueLength = buffer.getInt(); - if (valueLength == NULL_VALUE_SENTINEL) { - return null; - } else { - final byte[] value = new byte[valueLength]; - buffer.get(value); - return value; - } - } - ByteBuffer serialize(final int endPadding) { final int sizeOfValueLength = Integer.BYTES; @@ -120,7 +109,7 @@ ByteBuffer serialize(final int endPadding) { if (oldValue == null) { buffer.putInt(NULL_VALUE_SENTINEL); - } else if (priorValue == oldValue) { + } else if (Arrays.equals(priorValue, oldValue)) { buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL); } else { buffer.putInt(sizeOfOldValue); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java index 3c24f521c23ed..a26b4375c0cd7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java @@ -22,6 +22,8 @@ import java.util.Arrays; import java.util.Objects; +import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray; + public class ContextualRecord { private final byte[] value; private final ProcessorRecordContext recordContext; @@ -43,36 +45,10 @@ long residentMemorySizeEstimate() { return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate(); } - ByteBuffer serialize(final int endPadding) { - final byte[] serializedContext = recordContext.serialize(); - - final int sizeOfContext = serializedContext.length; - final int sizeOfValueLength = Integer.BYTES; - final int sizeOfValue = value == null ? 0 : value.length; - final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding); - - buffer.put(serializedContext); - if (value == null) { - buffer.putInt(-1); - } else { - buffer.putInt(value.length); - buffer.put(value); - } - - return buffer; - } - static ContextualRecord deserialize(final ByteBuffer buffer) { final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer); - - final int valueLength = buffer.getInt(); - if (valueLength == -1) { - return new ContextualRecord(null, context); - } else { - final byte[] value = new byte[valueLength]; - buffer.get(value); - return new ContextualRecord(value, context); - } + final byte[] value = getNullableSizePrefixedArray(buffer); + return new ContextualRecord(value, context); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 9feccb9d57a1e..2909e2763f1a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult; import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -54,14 +56,19 @@ import java.util.function.Supplier; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV0; +import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV1; +import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3; +import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2; public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer(); private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer(); - private static final RecordHeaders V_1_CHANGELOG_HEADERS = - new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); - private static final RecordHeaders V_2_CHANGELOG_HEADERS = - new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); + private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1}; + private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2}; + private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3}; + static final RecordHeaders CHANGELOG_HEADERS = + new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)}); private static final String METRIC_SCOPE = "in-memory-suppression"; private final Map index = new HashMap<>(); @@ -258,34 +265,43 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa final int sizeOfBufferTime = Long.BYTES; final ByteBuffer buffer = value.serialize(sizeOfBufferTime); buffer.putLong(bufferKey.time()); - + final byte[] array = buffer.array(); ((RecordCollector.Supplier) context).recordCollector().send( - changelogTopic, - key, - buffer.array(), - V_2_CHANGELOG_HEADERS, - partition, - null, - KEY_SERIALIZER, - VALUE_SERIALIZER + changelogTopic, + key, + array, + CHANGELOG_HEADERS, + partition, + null, + KEY_SERIALIZER, + VALUE_SERIALIZER ); } private void logTombstone(final Bytes key) { ((RecordCollector.Supplier) context).recordCollector().send( - changelogTopic, - key, - null, - null, - partition, - null, - KEY_SERIALIZER, - VALUE_SERIALIZER + changelogTopic, + key, + null, + null, + partition, + null, + KEY_SERIALIZER, + VALUE_SERIALIZER ); } private void restoreBatch(final Collection> batch) { for (final ConsumerRecord record : batch) { + if (record.partition() != partition) { + throw new IllegalStateException( + String.format( + "record partition [%d] is being restored by the wrong suppress partition [%d]", + record.partition(), + partition + ) + ); + } final Bytes key = Bytes.wrap(record.key()); if (record.value() == null) { // This was a tombstone. Delete the record. @@ -299,92 +315,63 @@ private void restoreBatch(final Collection> batch minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time(); } } - - if (record.partition() != partition) { - throw new IllegalStateException( - String.format( - "record partition [%d] is being restored by the wrong suppress partition [%d]", - record.partition(), - partition - ) - ); - } } else { - if (record.headers().lastHeader("v") == null) { - // in this case, the changelog value is just the serialized record value - final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value()); - final long time = timeAndValue.getLong(); - final byte[] changelogValue = new byte[record.value().length - 8]; - timeAndValue.get(changelogValue); - - final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue)); - - final ProcessorRecordContext recordContext = new ProcessorRecordContext( - record.timestamp(), - record.offset(), - record.partition(), - record.topic(), - record.headers() - ); - - cleanPut( - time, - key, - new BufferValue( - index.containsKey(key) - ? internalPriorValueForBuffered(key) - : change.oldValue, - change.oldValue, - change.newValue, - recordContext - ) - ); - } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { - // in this case, the changelog value is a serialized ContextualRecord - final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value()); - final long time = timeAndValue.getLong(); - final byte[] changelogValue = new byte[record.value().length - 8]; - timeAndValue.get(changelogValue); - - final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue)); - final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value())); - - cleanPut( - time, - key, - new BufferValue( - index.containsKey(key) - ? internalPriorValueForBuffered(key) - : change.oldValue, - change.oldValue, - change.newValue, - contextualRecord.recordContext() - ) - ); - } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { - // in this case, the changelog value is a serialized BufferValue - - final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value()); - final BufferValue bufferValue = BufferValue.deserialize(valueAndTime); - final long time = valueAndTime.getLong(); - cleanPut(time, key, bufferValue); + final Header versionHeader = record.headers().lastHeader("v"); + if (versionHeader == null) { + // Version 0: + // value: + // - buffer time + // - old value + // - new value + final byte[] previousBufferedValue = index.containsKey(key) + ? internalPriorValueForBuffered(key) + : null; + final DeserializationResult deserializationResult = deserializeV0(record, key, previousBufferedValue); + cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue()); + } else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) { + // Version 3: + // value: + // - record context + // - prior value + // - old value + // - new value + // - buffer time + final DeserializationResult deserializationResult = deserializeV3(record, key); + cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue()); + + } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) { + // Version 2: + // value: + // - record context + // - old value + // - new value + // - prior value + // - buffer time + // NOTE: 2.4.0, 2.4.1, and 2.5.0 actually encode Version 3 formatted data, + // but still set the Version 2 flag, so to deserialize, we have to duck type. + final DeserializationResult deserializationResult = duckTypeV2(record, key); + cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue()); + } else if (Arrays.equals(versionHeader.value(), V_1_CHANGELOG_HEADER_VALUE)) { + // Version 1: + // value: + // - buffer time + // - record context + // - old value + // - new value + final byte[] previousBufferedValue = index.containsKey(key) + ? internalPriorValueForBuffered(key) + : null; + final DeserializationResult deserializationResult = deserializeV1(record, key, previousBufferedValue); + cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue()); } else { throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record); } } - if (record.partition() != partition) { - throw new IllegalStateException( - String.format( - "record partition [%d] is being restored by the wrong suppress partition [%d]", - record.partition(), - partition - ) - ); - } } updateBufferMetrics(); } + @Override public void evictWhile(final Supplier predicate, final Consumer> callback) { @@ -481,8 +468,7 @@ public void put(final long time, final BufferValue buffered = getBuffered(serializedKey); final byte[] serializedPriorValue; if (buffered == null) { - final V priorValue = value.oldValue; - serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue); + serializedPriorValue = serialChange.oldValue; } else { serializedPriorValue = buffered.priorValue(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java new file mode 100644 index 0000000000000..74489c230b99f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +final class TimeOrderedKeyValueBufferChangelogDeserializationHelper { + private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {} + + static final class DeserializationResult { + private final long time; + private final Bytes key; + private final BufferValue bufferValue; + + private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) { + this.time = time; + this.key = key; + this.bufferValue = bufferValue; + } + + long time() { + return time; + } + + Bytes key() { + return key; + } + + BufferValue bufferValue() { + return bufferValue; + } + } + + static DeserializationResult deserializeV0(final ConsumerRecord record, + final Bytes key, + final byte[] previousBufferedValue) { + + final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value()); + final long time = timeAndValue.getLong(); + final byte[] changelogValue = new byte[record.value().length - 8]; + timeAndValue.get(changelogValue); + + final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue)); + + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + record.timestamp(), + record.offset(), + record.partition(), + record.topic(), + record.headers() + ); + + return new DeserializationResult( + time, + key, + new BufferValue( + previousBufferedValue == null ? change.oldValue : previousBufferedValue, + change.oldValue, + change.newValue, + recordContext + ) + ); + } + + static DeserializationResult deserializeV1(final ConsumerRecord record, + final Bytes key, + final byte[] previousBufferedValue) { + final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value()); + final long time = timeAndValue.getLong(); + final byte[] changelogValue = new byte[record.value().length - 8]; + timeAndValue.get(changelogValue); + + final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue)); + final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value())); + + return new DeserializationResult( + time, + key, + new BufferValue( + previousBufferedValue == null ? change.oldValue : previousBufferedValue, + change.oldValue, + change.newValue, + contextualRecord.recordContext() + ) + ); + } + + static DeserializationResult duckTypeV2(final ConsumerRecord record, final Bytes key) { + DeserializationResult deserializationResult = null; + RuntimeException v2DeserializationException = null; + RuntimeException v3DeserializationException = null; + try { + deserializationResult = deserializeV2(record, key); + } catch (final RuntimeException e) { + v2DeserializationException = e; + } + // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the + // V2 header, so we'll try duck-typing to see if this is decodable as V3 + if (deserializationResult == null) { + try { + deserializationResult = deserializeV3(record, key); + } catch (final RuntimeException e) { + v3DeserializationException = e; + } + } + + if (deserializationResult == null) { + // ok, it wasn't V3 either. Throw both exceptions: + final RuntimeException exception = + new RuntimeException("Couldn't deserialize record as v2 or v3: " + record, + v2DeserializationException); + exception.addSuppressed(v3DeserializationException); + throw exception; + } + return deserializationResult; + } + + private static DeserializationResult deserializeV2(final ConsumerRecord record, + final Bytes key) { + final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value()); + final ContextualRecord contextualRecord = ContextualRecord.deserialize(valueAndTime); + final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value())); + final byte[] priorValue = Utils.getNullableSizePrefixedArray(valueAndTime); + final long time = valueAndTime.getLong(); + final BufferValue bufferValue = new BufferValue(priorValue, change.oldValue, change.newValue, contextualRecord.recordContext()); + return new DeserializationResult(time, key, bufferValue); + } + + static DeserializationResult deserializeV3(final ConsumerRecord record, final Bytes key) { + final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value()); + final BufferValue bufferValue = BufferValue.deserialize(valueAndTime); + final long time = valueAndTime.getLong(); + return new DeserializationResult(time, key, bufferValue); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java index ac6762f504cf7..e7e0c88bd1eef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.serialization.Serdes; import org.junit.Test; +import java.nio.ByteBuffer; + import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -26,10 +28,37 @@ public class FullChangeSerdeTest { private final FullChangeSerde serde = FullChangeSerde.wrap(Serdes.String()); + /** + * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here + * so that we can produce the legacy format to test that we can still deserialize it. + */ + private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change serialChange) { + if (serialChange == null) { + return null; + } + + final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length; + final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length; + + final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize)); + + + buffer.putInt(oldSize); + if (serialChange.oldValue != null) { + buffer.put(serialChange.oldValue); + } + + buffer.putInt(newSize); + if (serialChange.newValue != null) { + buffer.put(serialChange.newValue); + } + return buffer.array(); + } + @Test public void shouldRoundTripNull() { assertThat(serde.serializeParts(null, null), nullValue()); - assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue()); + assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue()); assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue()); assertThat(serde.deserializeParts(null, null), nullValue()); } @@ -47,7 +76,7 @@ public void shouldRoundTripNullChange() { is(new Change(null, null)) ); - final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null)); + final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null)); assertThat( FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat), is(new Change(null, null)) @@ -57,7 +86,7 @@ public void shouldRoundTripNullChange() { @Test public void shouldRoundTripOldNull() { final Change serialized = serde.serializeParts(null, new Change<>("new", null)); - final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); + final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); final Change decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat); assertThat( serde.deserializeParts(null, decomposedLegacyFormat), @@ -68,7 +97,7 @@ public void shouldRoundTripOldNull() { @Test public void shouldRoundTripNewNull() { final Change serialized = serde.serializeParts(null, new Change<>(null, "old")); - final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); + final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); final Change decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat); assertThat( serde.deserializeParts(null, decomposedLegacyFormat), @@ -79,7 +108,7 @@ public void shouldRoundTripNewNull() { @Test public void shouldRoundTripChange() { final Change serialized = serde.serializeParts(null, new Change<>("new", "old")); - final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); + final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); final Change decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat); assertThat( serde.deserializeParts(null, decomposedLegacyFormat), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 25a44c486f43b..a054ac9390cdc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.internals.Change; -import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; @@ -56,14 +55,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; @RunWith(Parameterized.class) public class TimeOrderedKeyValueBufferTest> { - private static final RecordHeaders V_2_CHANGELOG_HEADERS = - new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); private static final String APP_ID = "test-app"; private final Function bufferSupplier; @@ -73,7 +71,7 @@ public static final class NullRejectingStringSerializer extends StringSerializer @Override public byte[] serialize(final String topic, final String data) { if (data == null) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("null data not allowed"); } return super.serialize(topic, data); } @@ -347,14 +345,14 @@ public void shouldFlush() { null, "zxcv", new KeyValue<>(1L, getBufferValue("3gon4i", 1)), - V_2_CHANGELOG_HEADERS + CHANGELOG_HEADERS ), new ProducerRecord<>(APP_ID + "-" + testName + "-changelog", 0, null, "asdf", new KeyValue<>(2L, getBufferValue("2093j", 0)), - V_2_CHANGELOG_HEADERS + CHANGELOG_HEADERS ) ))); @@ -362,7 +360,7 @@ public void shouldFlush() { } @Test - public void shouldRestoreOldFormat() { + public void shouldRestoreOldUnversionedFormat() { final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); @@ -372,12 +370,14 @@ public void shouldRestoreOldFormat() { context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); - final FullChangeSerde serializer = FullChangeSerde.wrap(Serdes.String()); + // These serialized formats were captured by running version 2.1 code. + // They verify that an upgrade from 2.1 will work. + // Do not change them. + final String toDeleteBinaryValue = "0000000000000000FFFFFFFF00000006646F6F6D6564"; + final String asdfBinaryValue = "0000000000000002FFFFFFFF0000000471776572"; + final String zxcvBinaryValue1 = "00000000000000010000000870726576696F757300000005656F34696D"; + final String zxcvBinaryValue2 = "000000000000000100000005656F34696D000000046E657874"; - final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null))); - final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null))); - final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous"))); - final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im"))); stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", 0, @@ -388,7 +388,7 @@ public void shouldRestoreOldFormat() { -1, -1, "todelete".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array()), + hexStringToByteArray(toDeleteBinaryValue)), new ConsumerRecord<>("changelog-topic", 0, 1, @@ -398,7 +398,7 @@ public void shouldRestoreOldFormat() { -1, -1, "asdf".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array()), + hexStringToByteArray(asdfBinaryValue)), new ConsumerRecord<>("changelog-topic", 0, 2, @@ -408,7 +408,7 @@ public void shouldRestoreOldFormat() { -1, -1, "zxcv".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array()), + hexStringToByteArray(zxcvBinaryValue1)), new ConsumerRecord<>("changelog-topic", 0, 3, @@ -418,7 +418,7 @@ public void shouldRestoreOldFormat() { -1, -1, "zxcv".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array()) + hexStringToByteArray(zxcvBinaryValue2)) )); assertThat(buffer.numRecords(), is(3)); @@ -486,17 +486,14 @@ public void shouldRestoreV1Format() { final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); - final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array(); - final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array(); - final FullChangeSerde fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); - final byte[] zxcvValue1 = new ContextualRecord( - FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))), - getContext(2L) - ).serialize(0).array(); - final byte[] zxcvValue2 = new ContextualRecord( - FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))), - getContext(3L) - ).serialize(0).array(); + // These serialized formats were captured by running version 2.2 code. + // They verify that an upgrade from 2.2 will work. + // Do not change them. + final String toDeleteBinary = "00000000000000000000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564"; + final String asdfBinary = "00000000000000020000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572"; + final String zxcvBinary1 = "00000000000000010000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000150000000870726576696F757300000005336F34696D"; + final String zxcvBinary2 = "00000000000000010000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E657874"; + stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", 0, @@ -507,7 +504,7 @@ public void shouldRestoreV1Format() { -1, -1, "todelete".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(), + hexStringToByteArray(toDeleteBinary), v1FlagHeaders), new ConsumerRecord<>("changelog-topic", 0, @@ -518,7 +515,7 @@ public void shouldRestoreV1Format() { -1, -1, "asdf".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array(), + hexStringToByteArray(asdfBinary), v1FlagHeaders), new ConsumerRecord<>("changelog-topic", 0, @@ -529,7 +526,7 @@ public void shouldRestoreV1Format() { -1, -1, "zxcv".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array(), + hexStringToByteArray(zxcvBinary1), v1FlagHeaders), new ConsumerRecord<>("changelog-topic", 0, @@ -540,7 +537,7 @@ public void shouldRestoreV1Format() { -1, -1, "zxcv".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array(), + hexStringToByteArray(zxcvBinary2), v1FlagHeaders) )); @@ -596,6 +593,7 @@ public void shouldRestoreV1Format() { cleanup(context, buffer); } + @Test public void shouldRestoreV2Format() { final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); @@ -609,22 +607,14 @@ public void shouldRestoreV2Format() { final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); - final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array(); - final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array(); - final byte[] zxcvValue1 = - new BufferValue( - Serdes.String().serializer().serialize(null, "previous"), - Serdes.String().serializer().serialize(null, "IGNORED"), - Serdes.String().serializer().serialize(null, "3o4im"), - getContext(2L) - ).serialize(0).array(); - final byte[] zxcvValue2 = - new BufferValue( - Serdes.String().serializer().serialize(null, "previous"), - Serdes.String().serializer().serialize(null, "3o4im"), - Serdes.String().serializer().serialize(null, "next"), - getContext(3L) - ).serialize(0).array(); + // These serialized formats were captured by running version 2.3 code. + // They verify that an upgrade from 2.3 will work. + // Do not change them. + final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564FFFFFFFF0000000000000000"; + final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572FFFFFFFF0000000000000002"; + final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000140000000749474E4F52454400000005336F34696D0000000870726576696F75730000000000000001"; + final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E6578740000000870726576696F75730000000000000001"; + stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", 0, @@ -635,7 +625,7 @@ public void shouldRestoreV2Format() { -1, -1, "todelete".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + todeleteValue.length).put(todeleteValue).putLong(0L).array(), + hexStringToByteArray(toDeleteBinary), v2FlagHeaders), new ConsumerRecord<>("changelog-topic", 0, @@ -646,7 +636,7 @@ public void shouldRestoreV2Format() { -1, -1, "asdf".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + asdfValue.length).put(asdfValue).putLong(2L).array(), + hexStringToByteArray(asdfBinary), v2FlagHeaders), new ConsumerRecord<>("changelog-topic", 0, @@ -657,7 +647,7 @@ public void shouldRestoreV2Format() { -1, -1, "zxcv".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).put(zxcvValue1).putLong(1L).array(), + hexStringToByteArray(zxcvBinary1), v2FlagHeaders), new ConsumerRecord<>("changelog-topic", 0, @@ -668,7 +658,7 @@ public void shouldRestoreV2Format() { -1, -1, "zxcv".getBytes(UTF_8), - ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).put(zxcvValue2).putLong(1L).array(), + hexStringToByteArray(zxcvBinary2), v2FlagHeaders) )); @@ -724,6 +714,249 @@ public void shouldRestoreV2Format() { cleanup(context, buffer); } + @Test + public void shouldRestoreV3FormatWithV2Header() { + // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the + // V2 header, so we need to be sure to handle this case as well. + // Note the data is the same as the V3 test. + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final MockInternalProcessorContext context = makeContext(); + buffer.init(context, buffer); + + final RecordBatchingStateRestoreCallback stateRestoreCallback = + (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); + + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); + + final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); + + // These serialized formats were captured by running version 2.4 code. + // They verify that an upgrade from 2.4 will work. + // Do not change them. + final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000"; + final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002"; + final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001"; + final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001"; + + stateRestoreCallback.restoreBatch(asList( + new ConsumerRecord<>("changelog-topic", + 0, + 0, + 999, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "todelete".getBytes(UTF_8), + hexStringToByteArray(toDeleteBinary), + headers), + new ConsumerRecord<>("changelog-topic", + 0, + 1, + 9999, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "asdf".getBytes(UTF_8), + hexStringToByteArray(asdfBinary), + headers), + new ConsumerRecord<>("changelog-topic", + 0, + 2, + 99, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "zxcv".getBytes(UTF_8), + hexStringToByteArray(zxcvBinary1), + headers), + new ConsumerRecord<>("changelog-topic", + 0, + 2, + 100, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "zxcv".getBytes(UTF_8), + hexStringToByteArray(zxcvBinary2), + headers) + )); + + assertThat(buffer.numRecords(), is(3)); + assertThat(buffer.minTimestamp(), is(0L)); + assertThat(buffer.bufferSize(), is(142L)); + + stateRestoreCallback.restoreBatch(singletonList( + new ConsumerRecord<>("changelog-topic", + 0, + 3, + 3, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "todelete".getBytes(UTF_8), + null) + )); + + assertThat(buffer.numRecords(), is(2)); + assertThat(buffer.minTimestamp(), is(1L)); + assertThat(buffer.bufferSize(), is(95L)); + + assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined())); + assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null))); + assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1)))); + + // flush the buffer into a list in buffer order so we can make assertions about the contents. + + final List> evicted = new LinkedList<>(); + buffer.evictWhile(() -> true, evicted::add); + + // Several things to note: + // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog) + // * The record timestamps are properly restored, and not conflated with the record's buffer time. + // * The keys and values are properly restored + // * The record topic is set to the original input topic, *not* the changelog topic + // * The record offset preserves the original input record's offset, *not* the offset of the changelog record + + + assertThat(evicted, is(asList( + new Eviction<>( + "zxcv", + new Change<>("next", "3o4im"), + getContext(3L)), + new Eviction<>( + "asdf", + new Change<>("qwer", null), + getContext(1L) + )))); + + cleanup(context, buffer); + } + + @Test + public void shouldRestoreV3Format() { + final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final MockInternalProcessorContext context = makeContext(); + buffer.init(context, buffer); + + final RecordBatchingStateRestoreCallback stateRestoreCallback = + (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); + + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); + + final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 3})}); + + // These serialized formats were captured by running version 2.4 code. + // They verify that an upgrade from 2.4 will work. + // Do not change them. + final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000"; + final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002"; + final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001"; + final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001"; + + stateRestoreCallback.restoreBatch(asList( + new ConsumerRecord<>("changelog-topic", + 0, + 0, + 999, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "todelete".getBytes(UTF_8), + hexStringToByteArray(toDeleteBinary), + headers), + new ConsumerRecord<>("changelog-topic", + 0, + 1, + 9999, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "asdf".getBytes(UTF_8), + hexStringToByteArray(asdfBinary), + headers), + new ConsumerRecord<>("changelog-topic", + 0, + 2, + 99, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "zxcv".getBytes(UTF_8), + hexStringToByteArray(zxcvBinary1), + headers), + new ConsumerRecord<>("changelog-topic", + 0, + 2, + 100, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "zxcv".getBytes(UTF_8), + hexStringToByteArray(zxcvBinary2), + headers) + )); + + assertThat(buffer.numRecords(), is(3)); + assertThat(buffer.minTimestamp(), is(0L)); + assertThat(buffer.bufferSize(), is(142L)); + + stateRestoreCallback.restoreBatch(singletonList( + new ConsumerRecord<>("changelog-topic", + 0, + 3, + 3, + TimestampType.CREATE_TIME, + -1L, + -1, + -1, + "todelete".getBytes(UTF_8), + null) + )); + + assertThat(buffer.numRecords(), is(2)); + assertThat(buffer.minTimestamp(), is(1L)); + assertThat(buffer.bufferSize(), is(95L)); + + assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined())); + assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null))); + assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1)))); + + // flush the buffer into a list in buffer order so we can make assertions about the contents. + + final List> evicted = new LinkedList<>(); + buffer.evictWhile(() -> true, evicted::add); + + // Several things to note: + // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog) + // * The record timestamps are properly restored, and not conflated with the record's buffer time. + // * The keys and values are properly restored + // * The record topic is set to the original input topic, *not* the changelog topic + // * The record offset preserves the original input record's offset, *not* the offset of the changelog record + + + assertThat(evicted, is(asList( + new Eviction<>( + "zxcv", + new Change<>("next", "3o4im"), + getContext(3L)), + new Eviction<>( + "asdf", + new Change<>("qwer", null), + getContext(1L) + )))); + + cleanup(context, buffer); + } + @Test public void shouldNotRestoreUnrecognizedVersionRecord() { final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); @@ -780,15 +1013,30 @@ private static BufferValue getBufferValue(final String value, final long timesta ); } - private static ContextualRecord getContextualRecord(final String value, final long timestamp) { - final FullChangeSerde fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); - return new ContextualRecord( - FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))), - getContext(timestamp) - ); - } - private static ProcessorRecordContext getContext(final long recordTimestamp) { return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null); } + + + // to be used to generate future hex-encoded values +// private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); +// private static String bytesToHex(final byte[] bytes) { +// final char[] hexChars = new char[bytes.length * 2]; +// for (int j = 0; j < bytes.length; j++) { +// final int v = bytes[j] & 0xFF; +// hexChars[j * 2] = HEX_ARRAY[v >>> 4]; +// hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; +// } +// return new String(hexChars); +// } + + private static byte[] hexStringToByteArray(final String hexString) { + final int len = hexString.length(); + final byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + + Character.digit(hexString.charAt(i + 1), 16)); + } + return data; + } }