diff --git a/core/src/main/resources/common/message/GroupMetadataValue.json b/core/src/main/resources/common/message/GroupMetadataValue.json index 826a7c8d326d4..8405fbb991853 100644 --- a/core/src/main/resources/common/message/GroupMetadataValue.json +++ b/core/src/main/resources/common/message/GroupMetadataValue.json @@ -16,8 +16,11 @@ { "type": "data", "name": "GroupMetadataValue", - "validVersions": "0-3", - "flexibleVersions": "none", + // Version 4 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-4", + "flexibleVersions": "4+", "fields": [ { "name": "protocolType", "versions": "0+", "type": "string"}, { "name": "generation", "versions": "0+", "type": "int32" }, @@ -29,7 +32,7 @@ "commonStructs": [ { "name": "MemberMetadata", - "versions": "0-3", + "versions": "0+", "fields": [ { "name": "memberId", "versions": "0+", "type": "string" }, { "name": "groupInstanceId", "versions": "3+", "type": "string", "default": "null", "nullableVersions": "3+", "ignorable": true}, diff --git a/core/src/main/resources/common/message/OffsetCommitValue.json b/core/src/main/resources/common/message/OffsetCommitValue.json index db8a6281d43b6..2973c5ee12ab2 100644 --- a/core/src/main/resources/common/message/OffsetCommitValue.json +++ b/core/src/main/resources/common/message/OffsetCommitValue.json @@ -16,8 +16,11 @@ { "type": "data", "name": "OffsetCommitValue", - "validVersions": "0-3", - "flexibleVersions": "none", + // Version 4 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-4", + "flexibleVersions": "4+", "fields": [ { "name": "offset", "type": "int64", "versions": "0+" }, { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true}, diff --git a/core/src/main/resources/common/message/TransactionLogValue.json b/core/src/main/resources/common/message/TransactionLogValue.json index 7915c3d7cb715..c6efc772d58db 100644 --- a/core/src/main/resources/common/message/TransactionLogValue.json +++ b/core/src/main/resources/common/message/TransactionLogValue.json @@ -16,24 +16,27 @@ { "type": "data", "name": "TransactionLogValue", - "validVersions": "0", - "flexibleVersions": "none", + // Version 1 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-1", + "flexibleVersions": "1+", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "0", + { "name": "ProducerId", "type": "int64", "versions": "0+", "about": "Producer id in use by the transactional id"}, - { "name": "ProducerEpoch", "type": "int16", "versions": "0", + { "name": "ProducerEpoch", "type": "int16", "versions": "0+", "about": "Epoch associated with the producer id"}, - { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0", + { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+", "about": "Transaction timeout in milliseconds"}, - { "name": "TransactionStatus", "type": "int8", "versions": "0", + { "name": "TransactionStatus", "type": "int8", "versions": "0+", "about": "TransactionState the transaction is in"}, - { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0", "nullableVersions": "0", + { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+", "about": "Set of partitions involved in the transaction", "fields": [ - { "name": "Topic", "type": "string", "versions": "0"}, - { "name": "PartitionIds", "type": "[]int32", "versions": "0"}]}, - { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0", + { "name": "Topic", "type": "string", "versions": "0+"}, + { "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]}, + { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+", "about": "Time the transaction was last updated"}, - { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0", + { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+", "about": "Time the transaction was started"} ] } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 8afcfcfaef40e..543fbde7d2038 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1084,6 +1084,8 @@ object GroupMetadataManager { val version = if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort else if (apiVersion < KAFKA_2_1_IV1) 2.toShort + // Serialize with the highest supported non-flexible version + // until a tagged field is introduced or the version is bumped. else 3.toShort MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue() .setOffset(offsetAndMetadata.offset) @@ -1112,6 +1114,8 @@ object GroupMetadataManager { if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort else if (apiVersion < KAFKA_2_1_IV0) 1.toShort else if (apiVersion < KAFKA_2_3_IV0) 2.toShort + // Serialize with the highest supported non-flexible version + // until a tagged field is introduced or the version is bumped. else 3.toShort MessageUtil.toVersionPrefixedBytes(version, new GroupMetadataValue() diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index 30bd517c0093e..f07e7293fc23c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -81,7 +81,9 @@ object TransactionLog { .setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava) }.toList.asJava - MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, + // Serialize with the highest supported non-flexible version + // until a tagged field is introduced or the version is bumped. + MessageUtil.toVersionPrefixedBytes(0, new TransactionLogValue() .setProducerId(txnMetadata.producerId) .setProducerEpoch(txnMetadata.producerEpoch) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index ee2092a3bb0da..25d5e3756501d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -27,7 +27,8 @@ import javax.management.ObjectName import kafka.api._ import kafka.cluster.Partition import kafka.common.OffsetAndMetadata -import kafka.log.{AppendOrigin, UnifiedLog, LogAppendInfo} +import kafka.internals.generated.{GroupMetadataValue, OffsetCommitValue} +import kafka.log.{AppendOrigin, LogAppendInfo, UnifiedLog} import kafka.metrics.KafkaYammerMetrics import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal} import kafka.utils.{KafkaScheduler, MockTime, TestUtils} @@ -37,7 +38,9 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics} -import org.apache.kafka.common.protocol.{Errors, MessageUtil} +import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection +import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type} +import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -2336,6 +2339,217 @@ class GroupMetadataManagerTest { verifySerde(version) } + @Test + def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = { + val generation = 935 + val protocolType = "consumer" + val protocol = "range" + val memberId = "98098230493" + val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null) + )) + val record = TestUtils.records(Seq( + buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes) + )).records.asScala.head + assertEquals(3, record.value.getShort) + } + + @Test + def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = { + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + offsetCommitRecords.foreach { record => + assertEquals(3, record.value.getShort) + } + } + + @Test + def testDeserializeHighestSupportedGroupMetadataValueVersion(): Unit = { + val member = new GroupMetadataValue.MemberMetadata() + .setMemberId("member") + .setClientId("client") + .setClientHost("host") + + val generation = 935 + val protocolType = "consumer" + val protocol = "range" + val leader = "leader" + val groupMetadataValue = new GroupMetadataValue() + .setProtocolType(protocolType) + .setGeneration(generation) + .setProtocol(protocol) + .setLeader(leader) + .setMembers(java.util.Collections.singletonList(member)) + + val deserialized = GroupMetadataManager.readGroupMessageValue("groupId", + MessageUtil.toVersionPrefixedByteBuffer(4, groupMetadataValue), time) + + assertEquals(generation, deserialized.generationId) + assertEquals(protocolType, deserialized.protocolType.get) + assertEquals(protocol, deserialized.protocolName.get) + assertEquals(leader, deserialized.leaderOrNull) + + val actualMember = deserialized.allMemberMetadata.head + assertEquals(member.memberId, actualMember.memberId) + assertEquals(member.clientId, actualMember.clientId) + assertEquals(member.clientHost, actualMember.clientHost) + } + + @Test + def testDeserializeHighestSupportedOffsetCommitValueVersion(): Unit = { + val offsetCommitValue = new OffsetCommitValue() + .setOffset(1000L) + .setMetadata("metadata") + .setCommitTimestamp(1500L) + .setLeaderEpoch(1) + + val serialized = MessageUtil.toVersionPrefixedByteBuffer(4, offsetCommitValue) + val deserialized = GroupMetadataManager.readOffsetMessageValue(serialized) + + assertEquals(1000L, deserialized.offset) + assertEquals("metadata", deserialized.metadata) + assertEquals(1500L, deserialized.commitTimestamp) + assertEquals(1, deserialized.leaderEpoch.get) + } + + @Test + def testDeserializeFutureOffsetCommitValue(): Unit = { + // Copy of OffsetCommitValue.SCHEMA_4 with a few + // additional tagged fields. + val futureOffsetCommitSchema = new Schema( + new Field("offset", Type.INT64, ""), + new Field("leader_epoch", Type.INT32, ""), + new Field("metadata", Type.COMPACT_STRING, ""), + new Field("commit_timestamp", Type.INT64, ""), + TaggedFieldsSection.of( + Int.box(0), new Field("offset_foo", Type.STRING, ""), + Int.box(1), new Field("offset_bar", Type.INT32, "") + ) + ) + + // Create OffsetCommitValue with tagged fields + val offsetCommit = new Struct(futureOffsetCommitSchema) + offsetCommit.set("offset", 1000L) + offsetCommit.set("leader_epoch", 100) + offsetCommit.set("metadata", "metadata") + offsetCommit.set("commit_timestamp", 2000L) + val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]() + offsetCommitTaggedFields.put(0, "foo") + offsetCommitTaggedFields.put(1, 4000) + offsetCommit.set("_tagged_fields", offsetCommitTaggedFields) + + // Prepare the buffer. + val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2) + buffer.put(0.toByte) + buffer.put(4.toByte) // Add 4 as version. + offsetCommit.writeTo(buffer) + buffer.flip() + + // Read the buffer with the real schema and verify that tagged + // fields were read but ignored. + buffer.getShort() // Skip version. + val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort) + assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag)) + + // Read the buffer with readOffsetMessageValue. + buffer.rewind() + val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer) + assertEquals(1000L, offsetAndMetadata.offset) + assertEquals(100, offsetAndMetadata.leaderEpoch.get) + assertEquals("metadata", offsetAndMetadata.metadata) + assertEquals(2000L, offsetAndMetadata.commitTimestamp) + } + + @Test + def testDeserializeFutureGroupMetadataValue(): Unit = { + // Copy of GroupMetadataValue.MemberMetadata.SCHEMA_4 with a few + // additional tagged fields. + val futureMemberSchema = new Schema( + new Field("member_id", Type.COMPACT_STRING, ""), + new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""), + new Field("client_id", Type.COMPACT_STRING, ""), + new Field("client_host", Type.COMPACT_STRING, ""), + new Field("rebalance_timeout", Type.INT32, ""), + new Field("session_timeout", Type.INT32, ""), + new Field("subscription", Type.COMPACT_BYTES, ""), + new Field("assignment", Type.COMPACT_BYTES, ""), + TaggedFieldsSection.of( + Int.box(0), new Field("member_foo", Type.STRING, ""), + Int.box(1), new Field("member_foo", Type.INT32, "") + ) + ) + + // Copy of GroupMetadataValue.SCHEMA_4 with a few + // additional tagged fields. + val futureGroupSchema = new Schema( + new Field("protocol_type", Type.COMPACT_STRING, ""), + new Field("generation", Type.INT32, ""), + new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""), + new Field("leader", Type.COMPACT_NULLABLE_STRING, ""), + new Field("current_state_timestamp", Type.INT64, ""), + new Field("members", new CompactArrayOf(futureMemberSchema), ""), + TaggedFieldsSection.of( + Int.box(0), new Field("group_foo", Type.STRING, ""), + Int.box(1), new Field("group_bar", Type.INT32, "") + ) + ) + + // Create a member with tagged fields. + val member = new Struct(futureMemberSchema) + member.set("member_id", "member_id") + member.set("group_instance_id", "group_instance_id") + member.set("client_id", "client_id") + member.set("client_host", "client_host") + member.set("rebalance_timeout", 1) + member.set("session_timeout", 2) + member.set("subscription", ByteBuffer.allocate(0)) + member.set("assignment", ByteBuffer.allocate(0)) + + val memberTaggedFields = new java.util.TreeMap[Integer, Any]() + memberTaggedFields.put(0, "foo") + memberTaggedFields.put(1, 4000) + member.set("_tagged_fields", memberTaggedFields) + + // Create a group with tagged fields. + val group = new Struct(futureGroupSchema) + group.set("protocol_type", "consumer") + group.set("generation", 10) + group.set("protocol", "range") + group.set("leader", "leader") + group.set("current_state_timestamp", 1000L) + group.set("members", Array(member)) + + val groupTaggedFields = new java.util.TreeMap[Integer, Any]() + groupTaggedFields.put(0, "foo") + groupTaggedFields.put(1, 4000) + group.set("_tagged_fields", groupTaggedFields) + + // Prepare the buffer. + val buffer = ByteBuffer.allocate(group.sizeOf() + 2) + buffer.put(0.toByte) + buffer.put(4.toByte) // Add 4 as version. + group.writeTo(buffer) + buffer.flip() + + // Read the buffer with the real schema and verify that tagged + // fields were read but ignored. + buffer.getShort() // Skip version. + val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 4.toShort) + assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag)) + assertEquals(Seq(0, 1), value.members().get(0).unknownTaggedFields().asScala.map(_.tag)) + + // Read the buffer with readGroupMessageValue. + buffer.rewind() + val groupMetadata = GroupMetadataManager.readGroupMessageValue("group", buffer, time) + assertEquals("consumer", groupMetadata.protocolType.get) + assertEquals("leader", groupMetadata.leaderOrNull) + assertTrue(groupMetadata.allMembers.contains("member_id")) + } + @Test def testLoadOffsetsWithEmptyControlBatch(): Unit = { val groupMetadataTopicPartition = groupTopicPartition diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala index eb1284278d29b..332dd669b4e98 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala @@ -20,9 +20,12 @@ package kafka.coordinator.transaction import kafka.internals.generated.TransactionLogKey import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.MessageUtil +import kafka.internals.generated.TransactionLogValue +import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} +import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection +import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test import java.nio.ByteBuffer @@ -139,6 +142,120 @@ class TransactionLogTest { } @Test + def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { + val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500) + val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata)) + assertEquals(0, txnLogValueBuffer.getShort) + } + + @Test + def testDeserializeHighestSupportedTransactionLogValue(): Unit = { + val txnPartitions = new TransactionLogValue.PartitionsSchema() + .setTopic("topic") + .setPartitionIds(java.util.Collections.singletonList(0)) + + val txnLogValue = new TransactionLogValue() + .setProducerId(100) + .setProducerEpoch(50.toShort) + .setTransactionStatus(CompleteCommit.id) + .setTransactionStartTimestampMs(750L) + .setTransactionLastUpdateTimestampMs(1000L) + .setTransactionTimeoutMs(500) + .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) + + val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) + val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get + + assertEquals(100, deserialized.producerId) + assertEquals(50, deserialized.producerEpoch) + assertEquals(CompleteCommit, deserialized.state) + assertEquals(750L, deserialized.txnStartTimestamp) + assertEquals(1000L, deserialized.txnLastUpdateTimestamp) + assertEquals(500, deserialized.txnTimeoutMs) + + val actualTxnPartitions = deserialized.topicPartitions + assertEquals(1, actualTxnPartitions.size) + assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0))) + } + + @Test + def testDeserializeFutureTransactionLogValue(): Unit = { + // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few + // additional tagged fields. + val futurePartitionsSchema = new Schema( + new Field("topic", Type.COMPACT_STRING, ""), + new Field("partition_ids", new CompactArrayOf(Type.INT32), ""), + TaggedFieldsSection.of( + Int.box(0), new Field("partition_foo", Type.STRING, ""), + Int.box(1), new Field("partition_foo", Type.INT32, "") + ) + ) + + // Create TransactionLogValue.PartitionsSchema with tagged fields + val txnPartitions = new Struct(futurePartitionsSchema) + txnPartitions.set("topic", "topic") + txnPartitions.set("partition_ids", Array(Integer.valueOf(1))) + val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]() + txnPartitionsTaggedFields.put(0, "foo") + txnPartitionsTaggedFields.put(1, 4000) + txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields) + + // Copy of TransactionLogValue.SCHEMA_1 with a few + // additional tagged fields. + val futureTransactionLogValueSchema = new Schema( + new Field("producer_id", Type.INT64, ""), + new Field("producer_epoch", Type.INT16, ""), + new Field("transaction_timeout_ms", Type.INT32, ""), + new Field("transaction_status", Type.INT8, ""), + new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""), + new Field("transaction_last_update_timestamp_ms", Type.INT64, ""), + new Field("transaction_start_timestamp_ms", Type.INT64, ""), + TaggedFieldsSection.of( + Int.box(0), new Field("txn_foo", Type.STRING, ""), + Int.box(1), new Field("txn_bar", Type.INT32, "") + ) + ) + + // Create TransactionLogValue with tagged fields + val transactionLogValue = new Struct(futureTransactionLogValueSchema) + transactionLogValue.set("producer_id", 1000L) + transactionLogValue.set("producer_epoch", 100.toShort) + transactionLogValue.set("transaction_timeout_ms", 1000) + transactionLogValue.set("transaction_status", CompleteCommit.id) + transactionLogValue.set("transaction_partitions", Array(txnPartitions)) + transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L) + transactionLogValue.set("transaction_start_timestamp_ms", 3000L) + val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]() + txnLogValueTaggedFields.put(0, "foo") + txnLogValueTaggedFields.put(1, 4000) + transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields) + + // Prepare the buffer. + val buffer = ByteBuffer.allocate(transactionLogValue.sizeOf() + 2) + buffer.put(0.toByte) + buffer.put(1.toByte) // Add 1 as version. + transactionLogValue.writeTo(buffer) + buffer.flip() + + // Read the buffer with the real schema and verify that tagged + // fields were read but ignored. + buffer.getShort() // Skip version. + val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 1.toShort) + assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag)) + assertEquals(Seq(0, 1), value.transactionPartitions().get(0).unknownTaggedFields().asScala.map(_.tag)) + + // Read the buffer with readTxnRecordValue. + buffer.rewind() + val txnMetadata = TransactionLog.readTxnRecordValue("transaction-id", buffer).get + assertEquals(1000L, txnMetadata.producerId) + assertEquals(100, txnMetadata.producerEpoch) + assertEquals(1000L, txnMetadata.txnTimeoutMs) + assertEquals(CompleteCommit, txnMetadata.state) + assertEquals(Set(new TopicPartition("topic", 1)), txnMetadata.topicPartitions) + assertEquals(2000L, txnMetadata.txnLastUpdateTimestamp) + assertEquals(3000L, txnMetadata.txnStartTimestamp) + } + def testReadTxnRecordKeyCanReadUnknownMessage(): Unit = { val record = new TransactionLogKey() val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record)