diff --git a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java index 9e7225c21106d..92d53dc2e4d13 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java +++ b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java @@ -17,10 +17,9 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.InvalidRecordException; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.message.EndTxnMarker; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,23 +34,16 @@ public class EndTransactionMarker { private static final Logger log = LoggerFactory.getLogger(EndTransactionMarker.class); - private static final short CURRENT_END_TXN_MARKER_VERSION = 0; - private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema( - new Field("version", Type.INT16), - new Field("coordinator_epoch", Type.INT32)); - static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6; - static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE = DefaultRecord.sizeInBytes(0, 0L, - ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE, - EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE, - Record.EMPTY_HEADERS); - private final ControlRecordType type; private final int coordinatorEpoch; + private final ByteBuffer buffer; public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) { ensureTransactionMarkerControlType(type); this.type = type; this.coordinatorEpoch = coordinatorEpoch; + EndTxnMarker marker = new EndTxnMarker().setCoordinatorEpoch(coordinatorEpoch); + this.buffer = MessageUtil.toVersionPrefixedByteBuffer(EndTxnMarker.HIGHEST_SUPPORTED_VERSION, marker); } public int coordinatorEpoch() { @@ -62,19 +54,8 @@ public ControlRecordType controlType() { return type; } - private Struct buildRecordValue() { - Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0); - struct.set("version", CURRENT_END_TXN_MARKER_VERSION); - struct.set("coordinator_epoch", coordinatorEpoch); - return struct; - } - public ByteBuffer serializeValue() { - Struct valueStruct = buildRecordValue(); - ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf()); - valueStruct.writeTo(value); - value.flip(); - return value; + return buffer.duplicate(); } @Override @@ -95,7 +76,7 @@ public int hashCode() { private static void ensureTransactionMarkerControlType(ControlRecordType type) { if (type != ControlRecordType.COMMIT && type != ControlRecordType.ABORT) - throw new IllegalArgumentException("Invalid control record type for end transaction marker" + type); + throw new IllegalArgumentException("Invalid control record type for end transaction marker " + type); } public static EndTransactionMarker deserialize(Record record) { @@ -103,24 +84,27 @@ public static EndTransactionMarker deserialize(Record record) { return deserializeValue(type, record.value()); } + // Visible for testing static EndTransactionMarker deserializeValue(ControlRecordType type, ByteBuffer value) { ensureTransactionMarkerControlType(type); - if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE) - throw new InvalidRecordException("Invalid value size found for end transaction marker. Must have " + - "at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, but found only " + value.remaining()); - - short version = value.getShort(0); - if (version < 0) + short version = value.getShort(); + if (version < EndTxnMarker.LOWEST_SUPPORTED_VERSION) throw new InvalidRecordException("Invalid version found for end transaction marker: " + version + ". May indicate data corruption"); - if (version > CURRENT_END_TXN_MARKER_VERSION) + if (version > EndTxnMarker.HIGHEST_SUPPORTED_VERSION) log.debug("Received end transaction marker value version {}. Parsing as version {}", version, - CURRENT_END_TXN_MARKER_VERSION); + EndTxnMarker.HIGHEST_SUPPORTED_VERSION); + EndTxnMarker marker = new EndTxnMarker(new ByteBufferAccessor(value), EndTxnMarker.HIGHEST_SUPPORTED_VERSION); + return new EndTransactionMarker(type, marker.coordinatorEpoch()); + } - int coordinatorEpoch = value.getInt(2); - return new EndTransactionMarker(type, coordinatorEpoch); + public int endTxnMarkerValueSize() { + return DefaultRecord.sizeInBytes(0, 0L, + ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE, + buffer.remaining(), + Record.EMPTY_HEADERS); } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index c06188edf2220..c2fd231e4b7cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -678,8 +678,7 @@ public static MemoryRecords withEndTransactionMarker(long timestamp, long produc public static MemoryRecords withEndTransactionMarker(long initialOffset, long timestamp, int partitionLeaderEpoch, long producerId, short producerEpoch, EndTransactionMarker marker) { - int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + - EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE; + int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + marker.endTxnMarkerValueSize(); ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize); writeEndTransactionalMarker(buffer, initialOffset, timestamp, partitionLeaderEpoch, producerId, producerEpoch, marker); diff --git a/clients/src/main/resources/common/message/EndTxnMarker.json b/clients/src/main/resources/common/message/EndTxnMarker.json new file mode 100644 index 0000000000000..f6e1e209ae9ca --- /dev/null +++ b/clients/src/main/resources/common/message/EndTxnMarker.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "EndTxnMarker", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ + { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+", + "about": "The coordinator epoch when appending the record" + } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java index 64224e003285f..c9a3b3b10b7d7 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java @@ -17,16 +17,31 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.message.EndTxnMarker; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.utils.ByteUtils; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class EndTransactionMarkerTest { + // Old hard-coded schema, used to validate old hard-coded schema format is exactly the same as new auto generated protocol format + private final Schema v0Schema = new Schema( + new Field("version", Type.INT16), + new Field("coordinator_epoch", Type.INT32)); + + private static final List VALID_CONTROLLER_RECORD_TYPE = Arrays.asList(ControlRecordType.COMMIT, ControlRecordType.ABORT); + @Test public void testUnknownControlTypeNotAllowed() { assertThrows(IllegalArgumentException.class, @@ -40,19 +55,13 @@ public void testCannotDeserializeUnknownControlType() { } @Test - public void testIllegalNegativeVersion() { + public void testIllegalVersion() { ByteBuffer buffer = ByteBuffer.allocate(2); buffer.putShort((short) -1); buffer.flip(); assertThrows(InvalidRecordException.class, () -> EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer)); } - @Test - public void testNotEnoughBytes() { - assertThrows(InvalidRecordException.class, - () -> EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, ByteBuffer.wrap(new byte[0]))); - } - @Test public void testSerde() { int coordinatorEpoch = 79; @@ -73,4 +82,70 @@ public void testDeserializeNewerVersion() { EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer); assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch()); } + + @Test + public void testSerializeAndDeserialize() { + for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) { + for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION; + version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) { + EndTransactionMarker marker = new EndTransactionMarker(type, 1); + + ByteBuffer buffer = marker.serializeValue(); + EndTransactionMarker deserializedMarker = EndTransactionMarker.deserializeValue(type, buffer); + assertEquals(marker, deserializedMarker); + } + } + } + + @Test + public void testEndTxnMarkerValueSize() { + for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) { + EndTransactionMarker marker = new EndTransactionMarker(type, 1); + int offsetSize = ByteUtils.sizeOfVarint(0); + int timestampSize = ByteUtils.sizeOfVarlong(0); + int keySize = ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE; + int valueSize = marker.serializeValue().remaining(); + int headerSize = ByteUtils.sizeOfVarint(Record.EMPTY_HEADERS.length); + int totalSize = 1 + offsetSize + timestampSize + ByteUtils.sizeOfVarint(keySize) + keySize + ByteUtils.sizeOfVarint(valueSize) + valueSize + headerSize; + assertEquals(ByteUtils.sizeOfVarint(totalSize) + totalSize, marker.endTxnMarkerValueSize()); + } + } + + @Test + public void testBackwardDeserializeCompatibility() { + int coordinatorEpoch = 10; + for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) { + for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION; + version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) { + + Struct struct = new Struct(v0Schema); + struct.set("version", version); + struct.set("coordinator_epoch", coordinatorEpoch); + + ByteBuffer oldVersionBuffer = ByteBuffer.allocate(struct.sizeOf()); + struct.writeTo(oldVersionBuffer); + oldVersionBuffer.flip(); + + EndTransactionMarker deserializedMarker = EndTransactionMarker.deserializeValue(type, oldVersionBuffer); + assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); + assertEquals(type, deserializedMarker.controlType()); + } + } + } + + @Test + public void testForwardDeserializeCompatibility() { + int coordinatorEpoch = 10; + for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) { + for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION; + version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) { + EndTransactionMarker marker = new EndTransactionMarker(type, coordinatorEpoch); + ByteBuffer newVersionBuffer = marker.serializeValue(); + + Struct struct = v0Schema.read(newVersionBuffer); + EndTransactionMarker deserializedMarker = new EndTransactionMarker(type, struct.getInt("coordinator_epoch")); + assertEquals(marker, deserializedMarker); + } + } + } }