Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.message.EndTxnMarker;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,23 +34,16 @@
public class EndTransactionMarker {
private static final Logger log = LoggerFactory.getLogger(EndTransactionMarker.class);

private static final short CURRENT_END_TXN_MARKER_VERSION = 0;
private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema(
new Field("version", Type.INT16),
new Field("coordinator_epoch", Type.INT32));
static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6;
static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE = DefaultRecord.sizeInBytes(0, 0L,
ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE,
Record.EMPTY_HEADERS);

private final ControlRecordType type;
private final int coordinatorEpoch;
private final ByteBuffer buffer;

public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) {
ensureTransactionMarkerControlType(type);
this.type = type;
this.coordinatorEpoch = coordinatorEpoch;
EndTxnMarker marker = new EndTxnMarker().setCoordinatorEpoch(coordinatorEpoch);
this.buffer = MessageUtil.toVersionPrefixedByteBuffer(EndTxnMarker.HIGHEST_SUPPORTED_VERSION, marker);
}

public int coordinatorEpoch() {
Expand All @@ -62,19 +54,8 @@ public ControlRecordType controlType() {
return type;
}

private Struct buildRecordValue() {
Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0);
struct.set("version", CURRENT_END_TXN_MARKER_VERSION);
struct.set("coordinator_epoch", coordinatorEpoch);
return struct;
}

public ByteBuffer serializeValue() {
Struct valueStruct = buildRecordValue();
ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf());
valueStruct.writeTo(value);
value.flip();
return value;
return buffer.duplicate();
}

@Override
Expand All @@ -95,32 +76,35 @@ public int hashCode() {

private static void ensureTransactionMarkerControlType(ControlRecordType type) {
if (type != ControlRecordType.COMMIT && type != ControlRecordType.ABORT)
throw new IllegalArgumentException("Invalid control record type for end transaction marker" + type);
throw new IllegalArgumentException("Invalid control record type for end transaction marker " + type);
}

public static EndTransactionMarker deserialize(Record record) {
ControlRecordType type = ControlRecordType.parse(record.key());
return deserializeValue(type, record.value());
}

// Visible for testing
static EndTransactionMarker deserializeValue(ControlRecordType type, ByteBuffer value) {
ensureTransactionMarkerControlType(type);

if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE)
throw new InvalidRecordException("Invalid value size found for end transaction marker. Must have " +
"at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, but found only " + value.remaining());

short version = value.getShort(0);
if (version < 0)
Comment thread
dengziming marked this conversation as resolved.
short version = value.getShort();
if (version < EndTxnMarker.LOWEST_SUPPORTED_VERSION)
throw new InvalidRecordException("Invalid version found for end transaction marker: " + version +
". May indicate data corruption");

if (version > CURRENT_END_TXN_MARKER_VERSION)
if (version > EndTxnMarker.HIGHEST_SUPPORTED_VERSION)
log.debug("Received end transaction marker value version {}. Parsing as version {}", version,
CURRENT_END_TXN_MARKER_VERSION);
EndTxnMarker.HIGHEST_SUPPORTED_VERSION);
EndTxnMarker marker = new EndTxnMarker(new ByteBufferAccessor(value), EndTxnMarker.HIGHEST_SUPPORTED_VERSION);
return new EndTransactionMarker(type, marker.coordinatorEpoch());
}

int coordinatorEpoch = value.getInt(2);
return new EndTransactionMarker(type, coordinatorEpoch);
public int endTxnMarkerValueSize() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit test for this method?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's good to test it since it's public.

return DefaultRecord.sizeInBytes(0, 0L,
ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
buffer.remaining(),
Record.EMPTY_HEADERS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,7 @@ public static MemoryRecords withEndTransactionMarker(long timestamp, long produc
public static MemoryRecords withEndTransactionMarker(long initialOffset, long timestamp, int partitionLeaderEpoch,
long producerId, short producerEpoch,
EndTransactionMarker marker) {
int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + marker.endTxnMarkerValueSize();
ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize);
writeEndTransactionalMarker(buffer, initialOffset, timestamp, partitionLeaderEpoch, producerId,
producerEpoch, marker);
Expand Down
26 changes: 26 additions & 0 deletions clients/src/main/resources/common/message/EndTxnMarker.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"type": "data",
"name": "EndTxnMarker",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "CoordinatorEpoch", "type": "int32", "versions": "0+",
"about": "The coordinator epoch when appending the record"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,31 @@
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.message.EndTxnMarker;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.ByteUtils;

import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class EndTransactionMarkerTest {

// Old hard-coded schema, used to validate old hard-coded schema format is exactly the same as new auto generated protocol format
private final Schema v0Schema = new Schema(
new Field("version", Type.INT16),
new Field("coordinator_epoch", Type.INT32));

private static final List<ControlRecordType> VALID_CONTROLLER_RECORD_TYPE = Arrays.asList(ControlRecordType.COMMIT, ControlRecordType.ABORT);

@Test
public void testUnknownControlTypeNotAllowed() {
assertThrows(IllegalArgumentException.class,
Expand All @@ -40,19 +55,13 @@ public void testCannotDeserializeUnknownControlType() {
}

@Test
public void testIllegalNegativeVersion() {
public void testIllegalVersion() {
ByteBuffer buffer = ByteBuffer.allocate(2);
buffer.putShort((short) -1);
buffer.flip();
assertThrows(InvalidRecordException.class, () -> EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer));
}

@Test
public void testNotEnoughBytes() {
assertThrows(InvalidRecordException.class,
() -> EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, ByteBuffer.wrap(new byte[0])));
}

@Test
public void testSerde() {
int coordinatorEpoch = 79;
Expand All @@ -73,4 +82,70 @@ public void testDeserializeNewerVersion() {
EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
}

@Test
public void testSerializeAndDeserialize() {
for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) {
EndTransactionMarker marker = new EndTransactionMarker(type, 1);

ByteBuffer buffer = marker.serializeValue();
EndTransactionMarker deserializedMarker = EndTransactionMarker.deserializeValue(type, buffer);
assertEquals(marker, deserializedMarker);
}
}
}

@Test
public void testEndTxnMarkerValueSize() {
for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
EndTransactionMarker marker = new EndTransactionMarker(type, 1);
int offsetSize = ByteUtils.sizeOfVarint(0);
int timestampSize = ByteUtils.sizeOfVarlong(0);
int keySize = ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE;
int valueSize = marker.serializeValue().remaining();
int headerSize = ByteUtils.sizeOfVarint(Record.EMPTY_HEADERS.length);
int totalSize = 1 + offsetSize + timestampSize + ByteUtils.sizeOfVarint(keySize) + keySize + ByteUtils.sizeOfVarint(valueSize) + valueSize + headerSize;
assertEquals(ByteUtils.sizeOfVarint(totalSize) + totalSize, marker.endTxnMarkerValueSize());
}
}

@Test
public void testBackwardDeserializeCompatibility() {
int coordinatorEpoch = 10;
for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) {

Struct struct = new Struct(v0Schema);
struct.set("version", version);
struct.set("coordinator_epoch", coordinatorEpoch);

ByteBuffer oldVersionBuffer = ByteBuffer.allocate(struct.sizeOf());
struct.writeTo(oldVersionBuffer);
oldVersionBuffer.flip();

EndTransactionMarker deserializedMarker = EndTransactionMarker.deserializeValue(type, oldVersionBuffer);
assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
assertEquals(type, deserializedMarker.controlType());
}
}
}

@Test
public void testForwardDeserializeCompatibility() {
int coordinatorEpoch = 10;
for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) {
EndTransactionMarker marker = new EndTransactionMarker(type, coordinatorEpoch);
ByteBuffer newVersionBuffer = marker.serializeValue();

Struct struct = v0Schema.read(newVersionBuffer);
EndTransactionMarker deserializedMarker = new EndTransactionMarker(type, struct.getInt("coordinator_epoch"));
assertEquals(marker, deserializedMarker);
}
}
}
}