From 2176b14a7a4f2ce626bea58e08856ecd39966d1b Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Tue, 18 Apr 2023 09:37:04 -0400 Subject: [PATCH] KAFKA-14869: Bump coordinator value records to flexible versions (KIP-915, Part-2) (#13526) This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields. Reviewers: David Jacot --- .../common/message/GroupMetadataValue.json | 9 +- .../common/message/OffsetCommitValue.json | 7 +- .../common/message/TransactionLogValue.json | 25 +- .../group/GroupMetadataManager.scala | 4 + .../transaction/TransactionLog.scala | 4 +- .../group/GroupMetadataManagerTest.scala | 218 +++++++++++++++++- .../transaction/TransactionLogTest.scala | 122 +++++++++- 7 files changed, 369 insertions(+), 20 deletions(-) diff --git a/core/src/main/resources/common/message/GroupMetadataValue.json b/core/src/main/resources/common/message/GroupMetadataValue.json index 826a7c8d326d4..8405fbb991853 100644 --- a/core/src/main/resources/common/message/GroupMetadataValue.json +++ b/core/src/main/resources/common/message/GroupMetadataValue.json @@ -16,8 +16,11 @@ { "type": "data", "name": "GroupMetadataValue", - "validVersions": "0-3", - "flexibleVersions": "none", + // Version 4 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-4", + "flexibleVersions": "4+", "fields": [ { "name": "protocolType", "versions": "0+", "type": "string"}, { "name": "generation", "versions": "0+", "type": "int32" }, @@ -29,7 +32,7 @@ "commonStructs": [ { "name": "MemberMetadata", - "versions": "0-3", + "versions": "0+", "fields": [ { "name": "memberId", "versions": "0+", "type": "string" }, { "name": "groupInstanceId", "versions": "3+", "type": "string", "default": "null", "nullableVersions": "3+", "ignorable": true}, diff --git a/core/src/main/resources/common/message/OffsetCommitValue.json b/core/src/main/resources/common/message/OffsetCommitValue.json index db8a6281d43b6..2973c5ee12ab2 100644 --- a/core/src/main/resources/common/message/OffsetCommitValue.json +++ b/core/src/main/resources/common/message/OffsetCommitValue.json @@ -16,8 +16,11 @@ { "type": "data", "name": "OffsetCommitValue", - "validVersions": "0-3", - "flexibleVersions": "none", + // Version 4 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-4", + "flexibleVersions": "4+", "fields": [ { "name": "offset", "type": "int64", "versions": "0+" }, { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true}, diff --git a/core/src/main/resources/common/message/TransactionLogValue.json b/core/src/main/resources/common/message/TransactionLogValue.json index 7915c3d7cb715..c6efc772d58db 100644 --- a/core/src/main/resources/common/message/TransactionLogValue.json +++ b/core/src/main/resources/common/message/TransactionLogValue.json @@ -16,24 +16,27 @@ { "type": "data", "name": "TransactionLogValue", - "validVersions": "0", - "flexibleVersions": "none", + // Version 1 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-1", + "flexibleVersions": "1+", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "0", + { "name": "ProducerId", "type": "int64", "versions": "0+", "about": "Producer id in use by the transactional id"}, - { "name": "ProducerEpoch", "type": "int16", "versions": "0", + { "name": "ProducerEpoch", "type": "int16", "versions": "0+", "about": "Epoch associated with the producer id"}, - { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0", + { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+", "about": "Transaction timeout in milliseconds"}, - { "name": "TransactionStatus", "type": "int8", "versions": "0", + { "name": "TransactionStatus", "type": "int8", "versions": "0+", "about": "TransactionState the transaction is in"}, - { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0", "nullableVersions": "0", + { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+", "about": "Set of partitions involved in the transaction", "fields": [ - { "name": "Topic", "type": "string", "versions": "0"}, - { "name": "PartitionIds", "type": "[]int32", "versions": "0"}]}, - { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0", + { "name": "Topic", "type": "string", "versions": "0+"}, + { "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]}, + { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+", "about": "Time the transaction was last updated"}, - { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0", + { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+", "about": "Time the transaction was started"} ] } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index ac3fc390eba97..c72678f0be238 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1083,6 +1083,8 @@ object GroupMetadataManager { val version = if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort else if (apiVersion < KAFKA_2_1_IV1) 2.toShort + // Serialize with the highest supported non-flexible version + // until a tagged field is introduced or the version is bumped. else 3.toShort MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue() .setOffset(offsetAndMetadata.offset) @@ -1111,6 +1113,8 @@ object GroupMetadataManager { if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort else if (apiVersion < KAFKA_2_1_IV0) 1.toShort else if (apiVersion < KAFKA_2_3_IV0) 2.toShort + // Serialize with the highest supported non-flexible version + // until a tagged field is introduced or the version is bumped. else 3.toShort MessageUtil.toVersionPrefixedBytes(version, new GroupMetadataValue() diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index cb501f774fd9d..0fa0280312164 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -82,7 +82,9 @@ object TransactionLog { .setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava) }.toList.asJava - MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, + // Serialize with the highest supported non-flexible version + // until a tagged field is introduced or the version is bumped. + MessageUtil.toVersionPrefixedBytes(0, new TransactionLogValue() .setProducerId(txnMetadata.producerId) .setProducerEpoch(txnMetadata.producerEpoch) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 5fe4bf95da4ae..939c8357a9225 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -27,7 +27,8 @@ import javax.management.ObjectName import kafka.api._ import kafka.cluster.Partition import kafka.common.OffsetAndMetadata -import kafka.log.{AppendOrigin, UnifiedLog, LogAppendInfo} +import kafka.internals.generated.{GroupMetadataValue, OffsetCommitValue} +import kafka.log.{AppendOrigin, LogAppendInfo, UnifiedLog} import kafka.metrics.KafkaYammerMetrics import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal} import kafka.utils.{KafkaScheduler, MockTime, TestUtils} @@ -37,7 +38,9 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics} -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection +import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type} +import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -2335,6 +2338,217 @@ class GroupMetadataManagerTest { verifySerde(version) } + @Test + def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = { + val generation = 935 + val protocolType = "consumer" + val protocol = "range" + val memberId = "98098230493" + val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null) + )) + val record = TestUtils.records(Seq( + buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes) + )).records.asScala.head + assertEquals(3, record.value.getShort) + } + + @Test + def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = { + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + offsetCommitRecords.foreach { record => + assertEquals(3, record.value.getShort) + } + } + + @Test + def testDeserializeHighestSupportedGroupMetadataValueVersion(): Unit = { + val member = new GroupMetadataValue.MemberMetadata() + .setMemberId("member") + .setClientId("client") + .setClientHost("host") + + val generation = 935 + val protocolType = "consumer" + val protocol = "range" + val leader = "leader" + val groupMetadataValue = new GroupMetadataValue() + .setProtocolType(protocolType) + .setGeneration(generation) + .setProtocol(protocol) + .setLeader(leader) + .setMembers(java.util.Collections.singletonList(member)) + + val deserialized = GroupMetadataManager.readGroupMessageValue("groupId", + MessageUtil.toVersionPrefixedByteBuffer(4, groupMetadataValue), time) + + assertEquals(generation, deserialized.generationId) + assertEquals(protocolType, deserialized.protocolType.get) + assertEquals(protocol, deserialized.protocolName.get) + assertEquals(leader, deserialized.leaderOrNull) + + val actualMember = deserialized.allMemberMetadata.head + assertEquals(member.memberId, actualMember.memberId) + assertEquals(member.clientId, actualMember.clientId) + assertEquals(member.clientHost, actualMember.clientHost) + } + + @Test + def testDeserializeHighestSupportedOffsetCommitValueVersion(): Unit = { + val offsetCommitValue = new OffsetCommitValue() + .setOffset(1000L) + .setMetadata("metadata") + .setCommitTimestamp(1500L) + .setLeaderEpoch(1) + + val serialized = MessageUtil.toVersionPrefixedByteBuffer(4, offsetCommitValue) + val deserialized = GroupMetadataManager.readOffsetMessageValue(serialized) + + assertEquals(1000L, deserialized.offset) + assertEquals("metadata", deserialized.metadata) + assertEquals(1500L, deserialized.commitTimestamp) + assertEquals(1, deserialized.leaderEpoch.get) + } + + @Test + def testDeserializeFutureOffsetCommitValue(): Unit = { + // Copy of OffsetCommitValue.SCHEMA_4 with a few + // additional tagged fields. + val futureOffsetCommitSchema = new Schema( + new Field("offset", Type.INT64, ""), + new Field("leader_epoch", Type.INT32, ""), + new Field("metadata", Type.COMPACT_STRING, ""), + new Field("commit_timestamp", Type.INT64, ""), + TaggedFieldsSection.of( + Int.box(0), new Field("offset_foo", Type.STRING, ""), + Int.box(1), new Field("offset_bar", Type.INT32, "") + ) + ) + + // Create OffsetCommitValue with tagged fields + val offsetCommit = new Struct(futureOffsetCommitSchema) + offsetCommit.set("offset", 1000L) + offsetCommit.set("leader_epoch", 100) + offsetCommit.set("metadata", "metadata") + offsetCommit.set("commit_timestamp", 2000L) + val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]() + offsetCommitTaggedFields.put(0, "foo") + offsetCommitTaggedFields.put(1, 4000) + offsetCommit.set("_tagged_fields", offsetCommitTaggedFields) + + // Prepare the buffer. + val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2) + buffer.put(0.toByte) + buffer.put(4.toByte) // Add 4 as version. + offsetCommit.writeTo(buffer) + buffer.flip() + + // Read the buffer with the real schema and verify that tagged + // fields were read but ignored. + buffer.getShort() // Skip version. + val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort) + assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag)) + + // Read the buffer with readOffsetMessageValue. + buffer.rewind() + val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer) + assertEquals(1000L, offsetAndMetadata.offset) + assertEquals(100, offsetAndMetadata.leaderEpoch.get) + assertEquals("metadata", offsetAndMetadata.metadata) + assertEquals(2000L, offsetAndMetadata.commitTimestamp) + } + + @Test + def testDeserializeFutureGroupMetadataValue(): Unit = { + // Copy of GroupMetadataValue.MemberMetadata.SCHEMA_4 with a few + // additional tagged fields. + val futureMemberSchema = new Schema( + new Field("member_id", Type.COMPACT_STRING, ""), + new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""), + new Field("client_id", Type.COMPACT_STRING, ""), + new Field("client_host", Type.COMPACT_STRING, ""), + new Field("rebalance_timeout", Type.INT32, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.COMPACT_BYTES, ""), + new Field("assignment", Type.COMPACT_BYTES, ""), + TaggedFieldsSection.of( + Int.box(0), new Field("member_foo", Type.STRING, ""), + Int.box(1), new Field("member_foo", Type.INT32, "") + ) + ) + + // Copy of GroupMetadataValue.SCHEMA_4 with a few + // additional tagged fields. + val futureGroupSchema = new Schema( + new Field("protocol_type", Type.COMPACT_STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""), + new Field("leader", Type.COMPACT_NULLABLE_STRING, ""), + new Field("current_state_timestamp", Type.INT64, ""), + new Field("members", new CompactArrayOf(futureMemberSchema), ""), + TaggedFieldsSection.of( + Int.box(0), new Field("group_foo", Type.STRING, ""), + Int.box(1), new Field("group_bar", Type.INT32, "") + ) + ) + + // Create a member with tagged fields. + val member = new Struct(futureMemberSchema) + member.set("member_id", "member_id") + member.set("group_instance_id", "group_instance_id") + member.set("client_id", "client_id") + member.set("client_host", "client_host") + member.set("rebalance_timeout", 1) + member.set("session_timeout", 2) + member.set("subscription", ByteBuffer.allocate(0)) + member.set("assignment", ByteBuffer.allocate(0)) + + val memberTaggedFields = new java.util.TreeMap[Integer, Any]() + memberTaggedFields.put(0, "foo") + memberTaggedFields.put(1, 4000) + member.set("_tagged_fields", memberTaggedFields) + + // Create a group with tagged fields. + val group = new Struct(futureGroupSchema) + group.set("protocol_type", "consumer") + group.set("generation", 10) + group.set("protocol", "range") + group.set("leader", "leader") + group.set("current_state_timestamp", 1000L) + group.set("members", Array(member)) + + val groupTaggedFields = new java.util.TreeMap[Integer, Any]() + groupTaggedFields.put(0, "foo") + groupTaggedFields.put(1, 4000) + group.set("_tagged_fields", groupTaggedFields) + + // Prepare the buffer. + val buffer = ByteBuffer.allocate(group.sizeOf() + 2) + buffer.put(0.toByte) + buffer.put(4.toByte) // Add 4 as version. + group.writeTo(buffer) + buffer.flip() + + // Read the buffer with the real schema and verify that tagged + // fields were read but ignored. + buffer.getShort() // Skip version. + val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 4.toShort) + assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag)) + assertEquals(Seq(0, 1), value.members().get(0).unknownTaggedFields().asScala.map(_.tag)) + + // Read the buffer with readGroupMessageValue. + buffer.rewind() + val groupMetadata = GroupMetadataManager.readGroupMessageValue("group", buffer, time) + assertEquals("consumer", groupMetadata.protocolType.get) + assertEquals("leader", groupMetadata.leaderOrNull) + assertTrue(groupMetadata.allMembers.contains("member_id")) + } + @Test def testLoadOffsetsWithEmptyControlBatch(): Unit = { val groupMetadataTopicPartition = groupTopicPartition diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala index 32e17d88a7b1e..59fddc03f6b12 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala @@ -19,10 +19,16 @@ package kafka.coordinator.transaction import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition +import kafka.internals.generated.TransactionLogValue +import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} +import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection +import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test +import java.nio.ByteBuffer +import scala.collection.Seq import scala.jdk.CollectionConverters._ class TransactionLogTest { @@ -135,4 +141,118 @@ class TransactionLogTest { assertEquals(Some(""), valueStringOpt) } + @Test + def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { + val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500) + val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata)) + assertEquals(0, txnLogValueBuffer.getShort) + } + + @Test + def testDeserializeHighestSupportedTransactionLogValue(): Unit = { + val txnPartitions = new TransactionLogValue.PartitionsSchema() + .setTopic("topic") + .setPartitionIds(java.util.Collections.singletonList(0)) + + val txnLogValue = new TransactionLogValue() + .setProducerId(100) + .setProducerEpoch(50.toShort) + .setTransactionStatus(CompleteCommit.id) + .setTransactionStartTimestampMs(750L) + .setTransactionLastUpdateTimestampMs(1000L) + .setTransactionTimeoutMs(500) + .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) + + val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) + val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get + + assertEquals(100, deserialized.producerId) + assertEquals(50, deserialized.producerEpoch) + assertEquals(CompleteCommit, deserialized.state) + assertEquals(750L, deserialized.txnStartTimestamp) + assertEquals(1000L, deserialized.txnLastUpdateTimestamp) + assertEquals(500, deserialized.txnTimeoutMs) + + val actualTxnPartitions = deserialized.topicPartitions + assertEquals(1, actualTxnPartitions.size) + assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0))) + } + + @Test + def testDeserializeFutureTransactionLogValue(): Unit = { + // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few + // additional tagged fields. + val futurePartitionsSchema = new Schema( + new Field("topic", Type.COMPACT_STRING, ""), + new Field("partition_ids", new CompactArrayOf(Type.INT32), ""), + TaggedFieldsSection.of( + Int.box(0), new Field("partition_foo", Type.STRING, ""), + Int.box(1), new Field("partition_foo", Type.INT32, "") + ) + ) + + // Create TransactionLogValue.PartitionsSchema with tagged fields + val txnPartitions = new Struct(futurePartitionsSchema) + txnPartitions.set("topic", "topic") + txnPartitions.set("partition_ids", Array(Integer.valueOf(1))) + val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]() + txnPartitionsTaggedFields.put(0, "foo") + txnPartitionsTaggedFields.put(1, 4000) + txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields) + + // Copy of TransactionLogValue.SCHEMA_1 with a few + // additional tagged fields. + val futureTransactionLogValueSchema = new Schema( + new Field("producer_id", Type.INT64, ""), + new Field("producer_epoch", Type.INT16, ""), + new Field("transaction_timeout_ms", Type.INT32, ""), + new Field("transaction_status", Type.INT8, ""), + new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""), + new Field("transaction_last_update_timestamp_ms", Type.INT64, ""), + new Field("transaction_start_timestamp_ms", Type.INT64, ""), + TaggedFieldsSection.of( + Int.box(0), new Field("txn_foo", Type.STRING, ""), + Int.box(1), new Field("txn_bar", Type.INT32, "") + ) + ) + + // Create TransactionLogValue with tagged fields + val transactionLogValue = new Struct(futureTransactionLogValueSchema) + transactionLogValue.set("producer_id", 1000L) + transactionLogValue.set("producer_epoch", 100.toShort) + transactionLogValue.set("transaction_timeout_ms", 1000) + transactionLogValue.set("transaction_status", CompleteCommit.id) + transactionLogValue.set("transaction_partitions", Array(txnPartitions)) + transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L) + transactionLogValue.set("transaction_start_timestamp_ms", 3000L) + val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]() + txnLogValueTaggedFields.put(0, "foo") + txnLogValueTaggedFields.put(1, 4000) + transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields) + + // Prepare the buffer. + val buffer = ByteBuffer.allocate(transactionLogValue.sizeOf() + 2) + buffer.put(0.toByte) + buffer.put(1.toByte) // Add 1 as version. + transactionLogValue.writeTo(buffer) + buffer.flip() + + // Read the buffer with the real schema and verify that tagged + // fields were read but ignored. + buffer.getShort() // Skip version. + val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 1.toShort) + assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag)) + assertEquals(Seq(0, 1), value.transactionPartitions().get(0).unknownTaggedFields().asScala.map(_.tag)) + + // Read the buffer with readTxnRecordValue. + buffer.rewind() + val txnMetadata = TransactionLog.readTxnRecordValue("transaction-id", buffer).get + assertEquals(1000L, txnMetadata.producerId) + assertEquals(100, txnMetadata.producerEpoch) + assertEquals(1000L, txnMetadata.txnTimeoutMs) + assertEquals(CompleteCommit, txnMetadata.state) + assertEquals(Set(new TopicPartition("topic", 1)), txnMetadata.topicPartitions) + assertEquals(2000L, txnMetadata.txnLastUpdateTimestamp) + assertEquals(3000L, txnMetadata.txnStartTimestamp) + } }