Skip to content
Merged
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,8 @@ project(':group-coordinator') {
args = [ "-p", "org.apache.kafka.coordinator.group.generated",
"-o", "${projectDir}/build/generated/main/java/org/apache/kafka/coordinator/group/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator", "JsonConverterGenerator"
"-m", "MessageDataGenerator", "JsonConverterGenerator",
"-t", "CoordinatorRecordTypeGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,35 @@ public static ByteBuffer toVersionPrefixedByteBuffer(final short version, final

public static byte[] toVersionPrefixedBytes(final short version, final Message message) {
ByteBuffer buffer = toVersionPrefixedByteBuffer(version, message);
// take the inner array directly if it is full with data
// take the inner array directly if it is full of data.
if (buffer.hasArray() &&
buffer.arrayOffset() == 0 &&
buffer.position() == 0 &&
buffer.limit() == buffer.array().length) return buffer.array();
buffer.arrayOffset() == 0 &&
buffer.position() == 0 &&
buffer.limit() == buffer.array().length) return buffer.array();
else return Utils.toArray(buffer);
}

public static ByteBuffer toCoordinatorTypePrefixedByteBuffer(final short type, final Message message) {
if (message.highestSupportedVersion() != 0 || message.lowestSupportedVersion() != 0) {
throw new IllegalArgumentException("Cannot serialize a message with a different version than 0.");
}

ObjectSerializationCache cache = new ObjectSerializationCache();
int messageSize = message.size(cache, (short) 0);
ByteBufferAccessor bytes = new ByteBufferAccessor(ByteBuffer.allocate(messageSize + 2));
bytes.writeShort(type);
message.write(bytes, cache, (short) 0);
bytes.flip();
return bytes.buffer();
}

public static byte[] toCoordinatorTypePrefixedBytes(final short type, final Message message) {
ByteBuffer buffer = toCoordinatorTypePrefixedByteBuffer(type, message);
// take the inner array directly if it is full of data.
if (buffer.hasArray() &&
buffer.arrayOffset() == 0 &&
buffer.position() == 0 &&
buffer.limit() == buffer.array().length) return buffer.array();
else return Utils.toArray(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
import kafka.utils._
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{Metrics, Sensor}
import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter}
Expand All @@ -40,7 +41,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.coordinator.group.{OffsetAndMetadata, OffsetConfig}
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, LegacyOffsetCommitKey, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -1069,7 +1070,7 @@ object GroupMetadataManager {
* @return key for offset commit message
*/
def offsetCommitKey(groupId: String, topicPartition: TopicPartition): Array[Byte] = {
MessageUtil.toVersionPrefixedBytes(OffsetCommitKey.HIGHEST_SUPPORTED_VERSION,
MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.OFFSET_COMMIT.id(),
new OffsetCommitKey()
.setGroup(groupId)
.setTopic(topicPartition.topic)
Expand All @@ -1083,7 +1084,7 @@ object GroupMetadataManager {
* @return key bytes for group metadata message
*/
def groupMetadataKey(groupId: String): Array[Byte] = {
MessageUtil.toVersionPrefixedBytes(GroupMetadataKeyData.HIGHEST_SUPPORTED_VERSION,
MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.GROUP_METADATA.id(),
new GroupMetadataKeyData()
.setGroup(groupId))
}
Expand Down Expand Up @@ -1164,16 +1165,28 @@ object GroupMetadataManager {
*/
def readMessageKey(buffer: ByteBuffer): BaseKey = {
val version = buffer.getShort
if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) {
// version 0 and 1 refer to offset
val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), version)
OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition)))
} else if (version >= GroupMetadataKeyData.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKeyData.HIGHEST_SUPPORTED_VERSION) {
// version 2 refers to group metadata
val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), version)
GroupMetadataKey(version, key.group)
} else {
UnknownKey(version)
try {
CoordinatorRecordType.fromId(version) match {
case CoordinatorRecordType.LEGACY_OFFSET_COMMIT =>
// version 0 refers to the legacy offset commit.
val key = new LegacyOffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort)
OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition)))

case CoordinatorRecordType.OFFSET_COMMIT =>
// version 1 refers to offset commit.
val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort)
OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition)))

case CoordinatorRecordType.GROUP_METADATA =>
// version 2 refers to group metadata.
val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), 0.toShort)
GroupMetadataKey(version, key.group)

case _ =>
UnknownKey(version)
}
} catch {
case _: UnsupportedVersionException => UnknownKey(version)
}
}

Expand Down
38 changes: 19 additions & 19 deletions core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,45 +422,45 @@ object DumpLogSegments {
class OffsetsMessageParser extends MessageParser[String, String] {
private val serde = new GroupCoordinatorRecordSerde()

private def prepareKey(message: Message, version: Short): String = {
private def prepareKey(message: Message, recordType: Short): String = {
val messageAsJson = message match {
case m: OffsetCommitKey =>
OffsetCommitKeyJsonConverter.write(m, version)
OffsetCommitKeyJsonConverter.write(m, 0.toShort)
case m: GroupMetadataKey =>
GroupMetadataKeyJsonConverter.write(m, version)
GroupMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupMetadataKey =>
ConsumerGroupMetadataKeyJsonConverter.write(m, version)
ConsumerGroupMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupPartitionMetadataKey =>
ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version)
ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupMemberMetadataKey =>
ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version)
ConsumerGroupMemberMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupTargetAssignmentMetadataKey =>
ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version)
ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupTargetAssignmentMemberKey =>
ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupCurrentMemberAssignmentKey =>
ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version)
ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 0.toShort)
case m: ConsumerGroupRegularExpressionKey =>
ConsumerGroupRegularExpressionKeyJsonConverter.write(m, version)
ConsumerGroupRegularExpressionKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupMetadataKey =>
ShareGroupMetadataKeyJsonConverter.write(m, version)
ShareGroupMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupPartitionMetadataKey =>
ShareGroupPartitionMetadataKeyJsonConverter.write(m, version)
ShareGroupPartitionMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupMemberMetadataKey =>
ShareGroupMemberMetadataKeyJsonConverter.write(m, version)
ShareGroupMemberMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupTargetAssignmentMetadataKey =>
ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version)
ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupTargetAssignmentMemberKey =>
ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, version)
ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupCurrentMemberAssignmentKey =>
ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version)
ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 0.toShort)
case m: ShareGroupStatePartitionMetadataKey =>
ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, 0.toShort)
case _ => throw new UnknownRecordTypeException(recordType)
}

val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("type", new TextNode(recordType.toString))
json.set("data", messageAsJson)
json.toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetAndMetadata, OffsetConfig}
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue}
import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaYammerMetrics
Expand Down Expand Up @@ -3115,8 +3115,7 @@ class GroupMetadataManagerTest {

// Should ignore unknown record
val unknownKey = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey()
val lowestUnsupportedVersion = (org.apache.kafka.coordinator.group.generated.GroupMetadataKey
.HIGHEST_SUPPORTED_VERSION + 1).toShort
val lowestUnsupportedVersion = (CoordinatorRecordType.GROUP_METADATA.id + 1).toShort

val unknownMessage1 = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, unknownKey)
val unknownMessage2 = MessageUtil.toVersionPrefixedBytes(lowestUnsupportedVersion, unknownKey)
Expand Down Expand Up @@ -3144,4 +3143,33 @@ class GroupMetadataManagerTest {
assertTrue(group.offset(topicPartition).map(_.expireTimestampMs).get.isEmpty)
}
}

@Test
def testOffsetCommitKey(): Unit = {
val bytes = GroupMetadataManager.offsetCommitKey(
"foo",
new TopicPartition("__consumer_offsets", 0)
)
val buffer = ByteBuffer.wrap(bytes)
assertEquals(1.toShort, buffer.getShort)
assertEquals(
new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort),
new OffsetCommitKey()
.setGroup("foo")
.setTopic("__consumer_offsets")
.setPartition(0)
)
}

@Test
def testGroupMetadataKey(): Unit = {
val bytes = GroupMetadataManager.groupMetadataKey("foo")
val buffer = ByteBuffer.wrap(bytes)
assertEquals(2.toShort, buffer.getShort())
assertEquals(
new GroupMetadataKeyData(new ByteBufferAccessor(buffer), 0.toShort),
new GroupMetadataKeyData()
.setGroup("foo")
)
}
}
Loading