Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
{
"type": "data",
"name": "GroupMetadataValue",
"validVersions": "0-3",
"flexibleVersions": "none",
// Version 4 is the first flexible version.
// KIP-915: bumping the version will no longer make this record backward compatible.
// We suggest to add/remove only tagged fields to maintain backward compatibility.
"validVersions": "0-4",
"flexibleVersions": "4+",
"fields": [
{ "name": "protocolType", "versions": "0+", "type": "string"},
{ "name": "generation", "versions": "0+", "type": "int32" },
Expand All @@ -29,7 +32,7 @@
"commonStructs": [
{
"name": "MemberMetadata",
"versions": "0-3",
"versions": "0+",
"fields": [
{ "name": "memberId", "versions": "0+", "type": "string" },
{ "name": "groupInstanceId", "versions": "3+", "type": "string", "default": "null", "nullableVersions": "3+", "ignorable": true},
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/resources/common/message/OffsetCommitValue.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
{
"type": "data",
"name": "OffsetCommitValue",
"validVersions": "0-3",
"flexibleVersions": "none",
// Version 4 is the first flexible version.
// KIP-915: bumping the version will no longer make this record backward compatible.
// We suggest to add/remove only tagged fields to maintain backward compatibility.
"validVersions": "0-4",
"flexibleVersions": "4+",
"fields": [
{ "name": "offset", "type": "int64", "versions": "0+" },
{ "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true},
Expand Down
25 changes: 14 additions & 11 deletions core/src/main/resources/common/message/TransactionLogValue.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@
{
"type": "data",
"name": "TransactionLogValue",
"validVersions": "0",
"flexibleVersions": "none",
// Version 1 is the first flexible version.
// KIP-915: bumping the version will no longer make this record backward compatible.
// We suggest to add/remove only tagged fields to maintain backward compatibility.
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "ProducerId", "type": "int64", "versions": "0",
{ "name": "ProducerId", "type": "int64", "versions": "0+",
"about": "Producer id in use by the transactional id"},
{ "name": "ProducerEpoch", "type": "int16", "versions": "0",
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "Epoch associated with the producer id"},
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0",
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
"about": "Transaction timeout in milliseconds"},
{ "name": "TransactionStatus", "type": "int8", "versions": "0",
{ "name": "TransactionStatus", "type": "int8", "versions": "0+",
"about": "TransactionState the transaction is in"},
{ "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0", "nullableVersions": "0",
{ "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0+", "nullableVersions": "0+",
"about": "Set of partitions involved in the transaction", "fields": [
{ "name": "Topic", "type": "string", "versions": "0"},
{ "name": "PartitionIds", "type": "[]int32", "versions": "0"}]},
{ "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0",
{ "name": "Topic", "type": "string", "versions": "0+"},
{ "name": "PartitionIds", "type": "[]int32", "versions": "0+"}]},
{ "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0+",
"about": "Time the transaction was last updated"},
{ "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0",
{ "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0+",
"about": "Time the transaction was started"}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,8 @@ object GroupMetadataManager {
val version =
if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
else if (apiVersion < KAFKA_2_1_IV1) 2.toShort
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
else 3.toShort
MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue()
.setOffset(offsetAndMetadata.offset)
Expand Down Expand Up @@ -1112,6 +1114,8 @@ object GroupMetadataManager {
if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
else 3.toShort

MessageUtil.toVersionPrefixedBytes(version, new GroupMetadataValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ object TransactionLog {
.setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava)
}.toList.asJava

MessageUtil.toVersionPrefixedBytes(TransactionLogValue.HIGHEST_SUPPORTED_VERSION,
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
MessageUtil.toVersionPrefixedBytes(0,
new TransactionLogValue()
.setProducerId(txnMetadata.producerId)
.setProducerEpoch(txnMetadata.producerEpoch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import javax.management.ObjectName
import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{AppendOrigin, UnifiedLog, LogAppendInfo}
import kafka.internals.generated.{GroupMetadataValue, OffsetCommitValue}
import kafka.log.{AppendOrigin, LogAppendInfo, UnifiedLog}
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
Expand All @@ -37,7 +38,9 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics}
import org.apache.kafka.common.protocol.{Errors, MessageUtil}
import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
Expand Down Expand Up @@ -2336,6 +2339,217 @@ class GroupMetadataManagerTest {
verifySerde(version)
}

@Test
def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val memberId = "98098230493"
val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava, null)
))
val record = TestUtils.records(Seq(
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
)).records.asScala.head
assertEquals(3, record.value.getShort)
}

@Test
def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
)

val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
offsetCommitRecords.foreach { record =>
assertEquals(3, record.value.getShort)
}
}

@Test
def testDeserializeHighestSupportedGroupMetadataValueVersion(): Unit = {
val member = new GroupMetadataValue.MemberMetadata()
.setMemberId("member")
.setClientId("client")
.setClientHost("host")

val generation = 935
val protocolType = "consumer"
val protocol = "range"
val leader = "leader"
val groupMetadataValue = new GroupMetadataValue()
.setProtocolType(protocolType)
.setGeneration(generation)
.setProtocol(protocol)
.setLeader(leader)
.setMembers(java.util.Collections.singletonList(member))

val deserialized = GroupMetadataManager.readGroupMessageValue("groupId",
MessageUtil.toVersionPrefixedByteBuffer(4, groupMetadataValue), time)

assertEquals(generation, deserialized.generationId)
assertEquals(protocolType, deserialized.protocolType.get)
assertEquals(protocol, deserialized.protocolName.get)
assertEquals(leader, deserialized.leaderOrNull)

val actualMember = deserialized.allMemberMetadata.head
assertEquals(member.memberId, actualMember.memberId)
assertEquals(member.clientId, actualMember.clientId)
assertEquals(member.clientHost, actualMember.clientHost)
}

@Test
def testDeserializeHighestSupportedOffsetCommitValueVersion(): Unit = {
val offsetCommitValue = new OffsetCommitValue()
.setOffset(1000L)
.setMetadata("metadata")
.setCommitTimestamp(1500L)
.setLeaderEpoch(1)

val serialized = MessageUtil.toVersionPrefixedByteBuffer(4, offsetCommitValue)
val deserialized = GroupMetadataManager.readOffsetMessageValue(serialized)

assertEquals(1000L, deserialized.offset)
assertEquals("metadata", deserialized.metadata)
assertEquals(1500L, deserialized.commitTimestamp)
assertEquals(1, deserialized.leaderEpoch.get)
}

@Test
def testDeserializeFutureOffsetCommitValue(): Unit = {
// Copy of OffsetCommitValue.SCHEMA_4 with a few
// additional tagged fields.
val futureOffsetCommitSchema = new Schema(
new Field("offset", Type.INT64, ""),
new Field("leader_epoch", Type.INT32, ""),
new Field("metadata", Type.COMPACT_STRING, ""),
new Field("commit_timestamp", Type.INT64, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("offset_foo", Type.STRING, ""),
Int.box(1), new Field("offset_bar", Type.INT32, "")
)
)

// Create OffsetCommitValue with tagged fields
val offsetCommit = new Struct(futureOffsetCommitSchema)
offsetCommit.set("offset", 1000L)
offsetCommit.set("leader_epoch", 100)
offsetCommit.set("metadata", "metadata")
offsetCommit.set("commit_timestamp", 2000L)
val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]()
offsetCommitTaggedFields.put(0, "foo")
offsetCommitTaggedFields.put(1, 4000)
offsetCommit.set("_tagged_fields", offsetCommitTaggedFields)

// Prepare the buffer.
val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(4.toByte) // Add 4 as version.
offsetCommit.writeTo(buffer)
buffer.flip()

// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 4.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))

// Read the buffer with readOffsetMessageValue.
buffer.rewind()
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(1000L, offsetAndMetadata.offset)
assertEquals(100, offsetAndMetadata.leaderEpoch.get)
assertEquals("metadata", offsetAndMetadata.metadata)
assertEquals(2000L, offsetAndMetadata.commitTimestamp)
}

@Test
def testDeserializeFutureGroupMetadataValue(): Unit = {
// Copy of GroupMetadataValue.MemberMetadata.SCHEMA_4 with a few
// additional tagged fields.
val futureMemberSchema = new Schema(
new Field("member_id", Type.COMPACT_STRING, ""),
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
new Field("client_id", Type.COMPACT_STRING, ""),
new Field("client_host", Type.COMPACT_STRING, ""),
new Field("rebalance_timeout", Type.INT32, ""),
new Field("session_timeout", Type.INT32, ""),
new Field("subscription", Type.COMPACT_BYTES, ""),
new Field("assignment", Type.COMPACT_BYTES, ""),
TaggedFieldsSection.of(
Int.box(0), new Field("member_foo", Type.STRING, ""),
Int.box(1), new Field("member_foo", Type.INT32, "")
)
)

// Copy of GroupMetadataValue.SCHEMA_4 with a few
// additional tagged fields.
val futureGroupSchema = new Schema(
new Field("protocol_type", Type.COMPACT_STRING, ""),
new Field("generation", Type.INT32, ""),
new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
new Field("current_state_timestamp", Type.INT64, ""),
new Field("members", new CompactArrayOf(futureMemberSchema), ""),
TaggedFieldsSection.of(
Int.box(0), new Field("group_foo", Type.STRING, ""),
Int.box(1), new Field("group_bar", Type.INT32, "")
)
)

// Create a member with tagged fields.
val member = new Struct(futureMemberSchema)
member.set("member_id", "member_id")
member.set("group_instance_id", "group_instance_id")
member.set("client_id", "client_id")
member.set("client_host", "client_host")
member.set("rebalance_timeout", 1)
member.set("session_timeout", 2)
member.set("subscription", ByteBuffer.allocate(0))
member.set("assignment", ByteBuffer.allocate(0))

val memberTaggedFields = new java.util.TreeMap[Integer, Any]()
memberTaggedFields.put(0, "foo")
memberTaggedFields.put(1, 4000)
member.set("_tagged_fields", memberTaggedFields)

// Create a group with tagged fields.
val group = new Struct(futureGroupSchema)
group.set("protocol_type", "consumer")
group.set("generation", 10)
group.set("protocol", "range")
group.set("leader", "leader")
group.set("current_state_timestamp", 1000L)
group.set("members", Array(member))

val groupTaggedFields = new java.util.TreeMap[Integer, Any]()
groupTaggedFields.put(0, "foo")
groupTaggedFields.put(1, 4000)
group.set("_tagged_fields", groupTaggedFields)

// Prepare the buffer.
val buffer = ByteBuffer.allocate(group.sizeOf() + 2)
buffer.put(0.toByte)
buffer.put(4.toByte) // Add 4 as version.
group.writeTo(buffer)
buffer.flip()

// Read the buffer with the real schema and verify that tagged
// fields were read but ignored.
buffer.getShort() // Skip version.
val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 4.toShort)
assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
assertEquals(Seq(0, 1), value.members().get(0).unknownTaggedFields().asScala.map(_.tag))

// Read the buffer with readGroupMessageValue.
buffer.rewind()
val groupMetadata = GroupMetadataManager.readGroupMessageValue("group", buffer, time)
assertEquals("consumer", groupMetadata.protocolType.get)
assertEquals("leader", groupMetadata.leaderOrNull)
assertTrue(groupMetadata.allMembers.contains("member_id"))
}

@Test
def testLoadOffsetsWithEmptyControlBatch(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
Expand Down
Loading