Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ class GroupMetadataManager(val brokerId: Int,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
val groupPartition = partitionFor(group.groupId)
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition))
timestamp = timestamp, magicValue = magicValue)

val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
partitionOpt.foreach { partition =>
Expand All @@ -168,12 +169,12 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Short => Unit): DelayedStore = {
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
timestamp = time.milliseconds(),
magicValue = getMessageFormatVersion(partitionFor(group.groupId))
)
timestamp = timestamp,
magicValue = magicValue)

val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))

Expand Down Expand Up @@ -252,11 +253,12 @@ class GroupMetadataManager(val brokerId: Int,

// construct the message set to append
val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
new Message(
key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
timestamp = time.milliseconds(),
magicValue = getMessageFormatVersion(partitionFor(groupId))
timestamp = timestamp,
magicValue = magicValue
)
}.toSeq

Expand Down Expand Up @@ -556,8 +558,8 @@ class GroupMetadataManager(val brokerId: Int,
val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)

(offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(),
magicValue = getMessageFormatVersion(offsetsPartition)))
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
(offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
}.groupBy { case (partition, tombstone) => partition }

// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
Expand Down Expand Up @@ -626,11 +628,13 @@ class GroupMetadataManager(val brokerId: Int,
config.offsetsTopicNumPartitions
}

private def getMessageFormatVersion(partition: Int): Byte = {
private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
}
val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds()
(messageFormatVersion, timestamp)
}

/**
Expand Down