From 1b8666b8bbd52469b9deebf906b2cf0773df07ae Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 16:38:14 +0100 Subject: [PATCH 01/14] MINOR: Improve coordinator records --- build.gradle | 3 +- .../CoordinatorRecordTypeGenerator.java | 204 ++++++++++++++++++ .../kafka/message/MessageGenerator.java | 5 + .../org/apache/kafka/message/MessageSpec.java | 18 ++ .../apache/kafka/message/MessageSpecType.java | 8 +- .../group/GroupCoordinatorShard.java | 76 +++++-- ...nsumerGroupCurrentMemberAssignmentKey.json | 9 +- ...umerGroupCurrentMemberAssignmentValue.json | 3 +- .../ConsumerGroupMemberMetadataKey.json | 9 +- .../ConsumerGroupMemberMetadataValue.json | 3 +- .../message/ConsumerGroupMetadataKey.json | 7 +- .../message/ConsumerGroupMetadataValue.json | 3 +- .../ConsumerGroupPartitionMetadataKey.json | 7 +- .../ConsumerGroupPartitionMetadataValue.json | 3 +- .../ConsumerGroupRegularExpressionKey.json | 9 +- .../ConsumerGroupRegularExpressionValue.json | 3 +- ...onsumerGroupTargetAssignmentMemberKey.json | 9 +- ...sumerGroupTargetAssignmentMemberValue.json | 3 +- ...sumerGroupTargetAssignmentMetadataKey.json | 7 +- ...merGroupTargetAssignmentMetadataValue.json | 3 +- .../common/message/GroupMetadataKey.json | 7 +- .../common/message/GroupMetadataValue.json | 3 +- .../common/message/OffsetCommitKey.json | 11 +- .../common/message/OffsetCommitKeyV0.json | 30 +++ .../common/message/OffsetCommitValue.json | 3 +- .../common/message/OffsetCommitValueV0.json | 37 ++++ .../ShareGroupCurrentMemberAssignmentKey.json | 9 +- ...hareGroupCurrentMemberAssignmentValue.json | 3 +- .../message/ShareGroupMemberMetadataKey.json | 9 +- .../ShareGroupMemberMetadataValue.json | 3 +- .../common/message/ShareGroupMetadataKey.json | 7 +- .../message/ShareGroupMetadataValue.json | 3 +- .../ShareGroupPartitionMetadataKey.json | 7 +- .../ShareGroupPartitionMetadataValue.json | 3 +- .../ShareGroupStatePartitionMetadataKey.json | 7 +- ...ShareGroupStatePartitionMetadataValue.json | 3 +- .../ShareGroupTargetAssignmentMemberKey.json | 9 +- ...ShareGroupTargetAssignmentMemberValue.json | 3 +- ...ShareGroupTargetAssignmentMetadataKey.json | 7 +- ...areGroupTargetAssignmentMetadataValue.json | 3 +- .../group/GroupCoordinatorShardTest.java | 74 ++++++- .../GroupMetadataManagerTestContext.java | 31 +-- .../group/OffsetMetadataManagerTest.java | 5 +- 43 files changed, 550 insertions(+), 119 deletions(-) create mode 100644 generator/src/main/java/org/apache/kafka/message/CoordinatorRecordTypeGenerator.java create mode 100644 group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json create mode 100644 group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json diff --git a/build.gradle b/build.gradle index 6d47cd22ed604..5139587d4cd95 100644 --- a/build.gradle +++ b/build.gradle @@ -1560,7 +1560,8 @@ project(':group-coordinator') { args = [ "-p", "org.apache.kafka.coordinator.group.generated", "-o", "${projectDir}/build/generated/main/java/org/apache/kafka/coordinator/group/generated", "-i", "src/main/resources/common/message", - "-m", "MessageDataGenerator", "JsonConverterGenerator" + "-m", "MessageDataGenerator", "JsonConverterGenerator", + "-t", "CoordinatorRecordTypeGenerator" ] inputs.dir("src/main/resources/common/message") .withPropertyName("messages") diff --git a/generator/src/main/java/org/apache/kafka/message/CoordinatorRecordTypeGenerator.java b/generator/src/main/java/org/apache/kafka/message/CoordinatorRecordTypeGenerator.java new file mode 100644 index 0000000000000..354fc835d480f --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/CoordinatorRecordTypeGenerator.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.message; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; + +public class CoordinatorRecordTypeGenerator implements TypeClassGenerator { + private final HeaderGenerator headerGenerator; + private final CodeBuffer buffer; + private final TreeMap records; + + private static final class CoordinatorRecord { + final short id; + MessageSpec key; + MessageSpec value; + + CoordinatorRecord(short id) { + this.id = id; + } + } + + public CoordinatorRecordTypeGenerator(String packageName) { + this.headerGenerator = new HeaderGenerator(packageName); + this.records = new TreeMap<>(); + this.buffer = new CodeBuffer(); + } + + @Override + public String outputName() { + return MessageGenerator.COORDINATOR_RECORD_TYPE_JAVA; + } + + @Override + public void registerMessageType(MessageSpec spec) { + switch (spec.type()) { + case COORDINATOR_KEY: { + short id = spec.apiKey().get(); + CoordinatorRecord record = records.computeIfAbsent(id, __ -> new CoordinatorRecord(id)); + if (record.key != null) { + throw new RuntimeException("Duplicate coordinator record key for type " + + id + ". Original claimant: " + record.key.name() + ". New " + + "claimant: " + spec.name()); + } + record.key = spec; + break; + } + + case COORDINATOR_VALUE: { + short id = spec.apiKey().get(); + CoordinatorRecord record = records.computeIfAbsent(id, __ -> new CoordinatorRecord(id)); + if (record.value != null) { + throw new RuntimeException("Duplicate coordinator record value for type " + + id + ". Original claimant: " + record.key.name() + ". New " + + "claimant: " + spec.name()); + } + record.value = spec; + break; + } + + default: + // Ignore + } + } + + @Override + public void generateAndWrite(BufferedWriter writer) throws IOException { + generate(); + write(writer); + } + + private void generate() { + buffer.printf("public enum CoordinatorRecordType {%n"); + buffer.incrementIndent(); + generateEnumValues(); + buffer.printf("%n"); + generateInstanceVariables(); + buffer.printf("%n"); + generateEnumConstructor(); + buffer.printf("%n"); + generateFromApiKey(); + buffer.printf("%n"); + generateAccessor("id", "short"); + buffer.printf("%n"); + generateAccessor("lowestSupportedVersion", "short"); + buffer.printf("%n"); + generateAccessor("highestSupportedVersion", "short"); + buffer.printf("%n"); + generateToString(); + buffer.decrementIndent(); + buffer.printf("}%n"); + headerGenerator.generate(); + } + + private String cleanName(String name) { + return name + .replace("Key", "") + .replace("Value", ""); + } + + private void generateEnumValues() { + int numProcessed = 0; + for (Map.Entry entry : records.entrySet()) { + MessageSpec key = entry.getValue().key; + if (key == null) { + throw new RuntimeException("Coordinator record " + entry.getKey() + " has not key."); + } + MessageSpec value = entry.getValue().value; + if (value == null) { + throw new RuntimeException("Coordinator record " + entry.getKey() + " has not key."); + } + String name = cleanName(key.name()); + numProcessed++; + buffer.printf("%s(\"%s\", (short) %d, (short) %d, (short) %d)%s%n", + MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT), + MessageGenerator.capitalizeFirst(name), + entry.getKey(), + value.validVersions().lowest(), + value.validVersions().highest(), + (numProcessed == records.size()) ? ";" : ","); + } + } + + private void generateInstanceVariables() { + buffer.printf("private final String name;%n"); + buffer.printf("private final short id;%n"); + buffer.printf("private final short lowestSupportedVersion;%n"); + buffer.printf("private final short highestSupportedVersion;%n"); + } + + private void generateEnumConstructor() { + buffer.printf("CoordinatorRecordType(String name, short id, short lowestSupportedVersion, short highestSupportedVersion) {%n"); + buffer.incrementIndent(); + buffer.printf("this.name = name;%n"); + buffer.printf("this.id = id;%n"); + buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n"); + buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n"); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateFromApiKey() { + buffer.printf("public static CoordinatorRecordType fromId(short id) {%n"); + buffer.incrementIndent(); + buffer.printf("switch (id) {%n"); + buffer.incrementIndent(); + for (Map.Entry entry : records.entrySet()) { + buffer.printf("case %d:%n", entry.getKey()); + buffer.incrementIndent(); + buffer.printf("return %s;%n", MessageGenerator. + toSnakeCase(cleanName(entry.getValue().key.name())).toUpperCase(Locale.ROOT)); + buffer.decrementIndent(); + } + buffer.printf("default:%n"); + buffer.incrementIndent(); + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Unknown metadata id \"" + + " + id);%n"); + buffer.decrementIndent(); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateAccessor(String name, String type) { + buffer.printf("public %s %s() {%n", type, name); + buffer.incrementIndent(); + buffer.printf("return this.%s;%n", name); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateToString() { + buffer.printf("@Override%n"); + buffer.printf("public String toString() {%n"); + buffer.incrementIndent(); + buffer.printf("return this.name();%n"); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void write(BufferedWriter writer) throws IOException { + headerGenerator.buffer().write(writer); + buffer.write(writer); + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java index 4d7903cd6affd..bcf4a6e4afda7 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java @@ -56,6 +56,8 @@ public final class MessageGenerator { static final String API_SCOPE_JAVA = "ApiScope.java"; + static final String COORDINATOR_RECORD_TYPE_JAVA = "CoordinatorRecordType.java"; + static final String METADATA_RECORD_TYPE_JAVA = "MetadataRecordType.java"; static final String METADATA_JSON_CONVERTERS_JAVA = "MetadataJsonConverters.java"; @@ -193,6 +195,9 @@ private static List createTypeClassGenerators(String package case "MetadataJsonConvertersGenerator": generators.add(new MetadataJsonConvertersGenerator(packageName)); break; + case "CoordinatorRecordTypeGenerator": + generators.add(new CoordinatorRecordTypeGenerator(packageName)); + break; default: throw new RuntimeException("Unknown type class generator type '" + type + "'"); } diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java index 66a720f2ffd8f..36c36a418f1f9 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java @@ -81,6 +81,24 @@ public MessageSpec(@JsonProperty("name") String name, "messages with type `request`"); } this.latestVersionUnstable = latestVersionUnstable; + + if (type == MessageSpecType.COORDINATOR_KEY) { + if (this.apiKey.isEmpty()) { + throw new RuntimeException("The ApiKey must be set for messages " + name + " with type `record-key`"); + } + if (!this.validVersions().equals(new Versions((short) 0, ((short) 0)))) { + throw new RuntimeException("The Versions must be set to `0` for messages " + name + " with type `record-key`"); + } + if (!this.flexibleVersions.empty()) { + throw new RuntimeException("The FlexibleVersions are not supported for messages " + name + " with type `record-key`"); + } + } + + if (type == MessageSpecType.COORDINATOR_VALUE) { + if (this.apiKey.isEmpty()) { + throw new RuntimeException("The ApiKey must be set for messages with type `record-key`"); + } + } } public StructSpec struct() { diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpecType.java b/generator/src/main/java/org/apache/kafka/message/MessageSpecType.java index a7e7b5e7bd452..3bc9bf6a9339a 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageSpecType.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageSpecType.java @@ -48,5 +48,11 @@ public enum MessageSpecType { * Other message spec types. */ @JsonProperty("data") - DATA + DATA, + + @JsonProperty("coordinator-key") + COORDINATOR_KEY, + + @JsonProperty("coordinator-value") + COORDINATOR_VALUE } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index a2ead8effa011..4dc25588c04ae 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -70,10 +71,13 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyV0; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValueV0; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; @@ -755,6 +759,28 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { offsetMetadataManager.onNewMetadataImage(newImage, delta); } + private static OffsetCommitKey convertOffsetCommitKeyV0( + OffsetCommitKeyV0 key + ) { + return new OffsetCommitKey() + .setGroup(key.group()) + .setTopic(key.topic()) + .setPartition(key.partition()); + } + + private static OffsetCommitValue convertOffsetCommitValueV0( + OffsetCommitValueV0 value + ) { + if (value == null) return null; + + return new OffsetCommitValue() + .setOffset(value.offset()) + .setCommitTimestamp(value.commitTimestamp()) + .setExpireTimestamp(value.expireTimestamp()) + .setMetadata(value.metadata()) + .setLeaderEpoch(value.leaderEpoch()); + } + /** * Replays the Record to update the hard state of the group coordinator. * @@ -775,9 +801,25 @@ public void replay( ApiMessageAndVersion key = record.key(); ApiMessageAndVersion value = record.value(); - switch (key.version()) { - case 0: - case 1: + CoordinatorRecordType recordType; + try { + recordType = CoordinatorRecordType.fromId(key.version()); + } catch (UnsupportedVersionException ex) { + throw new IllegalStateException("Received an unknown record type " + key.version() + + " in " + record, ex); + } + + switch (recordType) { + case OFFSET_COMMIT_V0: + offsetMetadataManager.replay( + offset, + producerId, + convertOffsetCommitKeyV0((OffsetCommitKeyV0) key.message()), + convertOffsetCommitValueV0((OffsetCommitValueV0) Utils.messageOrNull(value)) + ); + break; + + case OFFSET_COMMIT: offsetMetadataManager.replay( offset, producerId, @@ -786,98 +828,98 @@ public void replay( ); break; - case 2: + case GROUP_METADATA: groupMetadataManager.replay( (GroupMetadataKey) key.message(), (GroupMetadataValue) Utils.messageOrNull(value) ); break; - case 3: + case CONSUMER_GROUP_METADATA: groupMetadataManager.replay( (ConsumerGroupMetadataKey) key.message(), (ConsumerGroupMetadataValue) Utils.messageOrNull(value) ); break; - case 4: + case CONSUMER_GROUP_PARTITION_METADATA: groupMetadataManager.replay( (ConsumerGroupPartitionMetadataKey) key.message(), (ConsumerGroupPartitionMetadataValue) Utils.messageOrNull(value) ); break; - case 5: + case CONSUMER_GROUP_MEMBER_METADATA: groupMetadataManager.replay( (ConsumerGroupMemberMetadataKey) key.message(), (ConsumerGroupMemberMetadataValue) Utils.messageOrNull(value) ); break; - case 6: + case CONSUMER_GROUP_TARGET_ASSIGNMENT_METADATA: groupMetadataManager.replay( (ConsumerGroupTargetAssignmentMetadataKey) key.message(), (ConsumerGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value) ); break; - case 7: + case CONSUMER_GROUP_TARGET_ASSIGNMENT_MEMBER: groupMetadataManager.replay( (ConsumerGroupTargetAssignmentMemberKey) key.message(), (ConsumerGroupTargetAssignmentMemberValue) Utils.messageOrNull(value) ); break; - case 8: + case CONSUMER_GROUP_CURRENT_MEMBER_ASSIGNMENT: groupMetadataManager.replay( (ConsumerGroupCurrentMemberAssignmentKey) key.message(), (ConsumerGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value) ); break; - case 9: + case SHARE_GROUP_PARTITION_METADATA: groupMetadataManager.replay( (ShareGroupPartitionMetadataKey) key.message(), (ShareGroupPartitionMetadataValue) Utils.messageOrNull(value) ); break; - case 10: + case SHARE_GROUP_MEMBER_METADATA: groupMetadataManager.replay( (ShareGroupMemberMetadataKey) key.message(), (ShareGroupMemberMetadataValue) Utils.messageOrNull(value) ); break; - case 11: + case SHARE_GROUP_METADATA: groupMetadataManager.replay( (ShareGroupMetadataKey) key.message(), (ShareGroupMetadataValue) Utils.messageOrNull(value) ); break; - case 12: + case SHARE_GROUP_TARGET_ASSIGNMENT_METADATA: groupMetadataManager.replay( (ShareGroupTargetAssignmentMetadataKey) key.message(), (ShareGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value) ); break; - case 13: + case SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER: groupMetadataManager.replay( (ShareGroupTargetAssignmentMemberKey) key.message(), (ShareGroupTargetAssignmentMemberValue) Utils.messageOrNull(value) ); break; - case 14: + case SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT: groupMetadataManager.replay( (ShareGroupCurrentMemberAssignmentKey) key.message(), (ShareGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value) ); break; - case 16: + case CONSUMER_GROUP_REGULAR_EXPRESSION: groupMetadataManager.replay( (ConsumerGroupRegularExpressionKey) key.message(), (ConsumerGroupRegularExpressionValue) Utils.messageOrNull(value) diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentKey.json index 5be2615802a8d..22ea2457667d9 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentKey.json @@ -14,14 +14,15 @@ // limitations under the License. { - "type": "data", + "apiKey": 8, + "type": "coordinator-key", "name": "ConsumerGroupCurrentMemberAssignmentKey", - "validVersions": "8", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "8", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "8", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json index 8e1bec2196b2d..f94604e946c0b 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 8, + "type": "coordinator-value", "name": "ConsumerGroupCurrentMemberAssignmentValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataKey.json index dd230d7797d35..868b0bec9ee84 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataKey.json @@ -14,14 +14,15 @@ // limitations under the License. { - "type": "data", + "apiKey": 5, + "type": "coordinator-key", "name": "ConsumerGroupMemberMetadataKey", - "validVersions": "5", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "5", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "5", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json index eb404513b8fb9..ab10d12756660 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 5, + "type": "coordinator-value", "name": "ConsumerGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataKey.json index 3938f511c01b6..d67c850bc0fc1 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataKey.json @@ -14,12 +14,13 @@ // limitations under the License. { - "type": "data", + "apiKey": 3, + "type": "coordinator-key", "name": "ConsumerGroupMetadataKey", - "validVersions": "3", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "3", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json index 6c058cad982fb..c01cabb521ec6 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 3, + "type": "coordinator-value", "name": "ConsumerGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataKey.json index 1e7253ae78d89..e4c3dc1babcfa 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataKey.json @@ -14,12 +14,13 @@ // limitations under the License. { - "type": "data", + "apiKey": 4, + "type": "coordinator-key", "name": "ConsumerGroupPartitionMetadataKey", - "validVersions": "4", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "4", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json index cff3af124e445..413ee101b5b96 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 4, + "type": "coordinator-value", "name": "ConsumerGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json index 2efe257b82153..be66dc867eb51 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionKey.json @@ -14,14 +14,15 @@ // limitations under the License. { - "type": "data", + "apiKey": 16, + "type": "coordinator-key", "name": "ConsumerGroupRegularExpressionKey", - "validVersions": "16", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "16", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "RegularExpression", "type": "string", "versions": "16", + { "name": "RegularExpression", "type": "string", "versions": "0", "about": "The regular expression." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionValue.json index 8c4262379e35b..f55a9b0d892e2 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupRegularExpressionValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 16, + "type": "coordinator-value", "name": "ConsumerGroupRegularExpressionValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberKey.json index 838567dcec71c..169efb9097321 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberKey.json @@ -14,14 +14,15 @@ // limitations under the License. { - "type": "data", + "apiKey": 7, + "type": "coordinator-key", "name": "ConsumerGroupTargetAssignmentMemberKey", - "validVersions": "7", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "7", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "7", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberValue.json index 04a7a09aa8aeb..e96893852a8e4 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMemberValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 7, + "type": "coordinator-value", "name": "ConsumerGroupTargetAssignmentMemberValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataKey.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataKey.json index a358b5c8e9072..90f4ce0294d52 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataKey.json @@ -14,12 +14,13 @@ // limitations under the License. { - "type": "data", + "apiKey": 6, + "type": "coordinator-key", "name": "ConsumerGroupTargetAssignmentMetadataKey", - "validVersions": "6", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "6", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json index 26625416253dc..939794e1e5c73 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 6, + "type": "coordinator-value", "name": "ConsumerGroupTargetAssignmentMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/GroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/GroupMetadataKey.json index 2db7b7d3469cf..1a468638bd8cc 100644 --- a/group-coordinator/src/main/resources/common/message/GroupMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/GroupMetadataKey.json @@ -14,12 +14,13 @@ // limitations under the License. { - "type": "data", + "apiKey": 2, + "type": "coordinator-key", "name": "GroupMetadataKey", - "validVersions": "2", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "group", "type": "string", "versions": "2", + { "name": "group", "type": "string", "versions": "0", "about": "The group id."} ] } diff --git a/group-coordinator/src/main/resources/common/message/GroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/GroupMetadataValue.json index 4d5bce160f9a9..4bf87c9df7d0b 100644 --- a/group-coordinator/src/main/resources/common/message/GroupMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/GroupMetadataValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 2, + "type": "coordinator-value", "name": "GroupMetadataValue", // Version 4 is the first flexible version. // KIP-915: bumping the version will no longer make this record backward compatible. diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitKey.json b/group-coordinator/src/main/resources/common/message/OffsetCommitKey.json index 3e2d0de3b70a9..f5898f6860209 100644 --- a/group-coordinator/src/main/resources/common/message/OffsetCommitKey.json +++ b/group-coordinator/src/main/resources/common/message/OffsetCommitKey.json @@ -14,16 +14,17 @@ // limitations under the License. { - "type": "data", + "apiKey": 1, + "type": "coordinator-key", "name": "OffsetCommitKey", - "validVersions": "0-1", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "group", "type": "string", "versions": "0-1", + { "name": "group", "type": "string", "versions": "0", "about": "The consumer group id."}, - { "name": "topic", "type": "string", "versions": "0-1", + { "name": "topic", "type": "string", "versions": "0", "about": "The topic name."}, - { "name": "partition", "type": "int32", "versions": "0-1", + { "name": "partition", "type": "int32", "versions": "0", "about": "The topic partition index."} ] } diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json b/group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json new file mode 100644 index 0000000000000..d5609644650bb --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 0, + "type": "coordinator-key", + "name": "OffsetCommitKeyV0", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ + { "name": "group", "type": "string", "versions": "0", + "about": "The consumer group id."}, + { "name": "topic", "type": "string", "versions": "0", + "about": "The topic name."}, + { "name": "partition", "type": "int32", "versions": "0", + "about": "The topic partition index."} + ] +} diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json index 49919b05f08b2..8f7d32d544796 100644 --- a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json +++ b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json @@ -14,7 +14,8 @@ // limitations under the License. { - "type": "data", + "apiKey": 1, + "type": "coordinator-value", "name": "OffsetCommitValue", // Version 4 is the first flexible version. // KIP-915: bumping the version will no longer make this record backward compatible. diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json b/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json new file mode 100644 index 0000000000000..f1f12fc7e7e43 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 0, + "type": "coordinator-value", + "name": "OffsetCommitValueV0", + // Version 4 is the first flexible version. + // KIP-915: bumping the version will no longer make this record backward compatible. + // We suggest to add/remove only tagged fields to maintain backward compatibility. + "validVersions": "0-4", + "flexibleVersions": "4+", + "fields": [ + { "name": "offset", "type": "int64", "versions": "0+", + "about": "The offset that the consumer wants to store (for this partition)."}, + { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true, + "about": "The leader epoch of the last consumed record."}, + { "name": "metadata", "type": "string", "versions": "0+", + "about": "Any metadata the client wants to keep."}, + { "name": "commitTimestamp", "type": "int64", "versions": "0+", + "about": "The time at which the commit was added to the log."}, + { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true, + "about": "The time at which the offset will expire."} + ] +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentKey.json index 1aa74133cfcb2..c88926933f1fa 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentKey.json @@ -15,14 +15,15 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 14, + "type": "coordinator-key", "name": "ShareGroupCurrentMemberAssignmentKey", - "validVersions": "14", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "14", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "14", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentValue.json index 109ae6e870fd4..64959a42cc82a 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupCurrentMemberAssignmentValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 14, + "type": "coordinator-value", "name": "ShareGroupCurrentMemberAssignmentValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json index cbe1a8dd8718d..e5e0250502be1 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json @@ -15,14 +15,15 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 10, + "type": "coordinator-key", "name": "ShareGroupMemberMetadataKey", - "validVersions": "10", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "10", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "10", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json index c5d4c7abd4504..e4cf5e1690074 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 10, + "type": "coordinator-value", "name": "ShareGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json index 309b67ba31ff7..4c3fe1affb8a4 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json @@ -15,12 +15,13 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 11, + "type": "coordinator-key", "name": "ShareGroupMetadataKey", - "validVersions": "11", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "11", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json index 02ca3eacd0454..5ff037a2fcc1f 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 11, + "type": "coordinator-value", "name": "ShareGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json index 8a34f5dfc893d..146b9fdcb9103 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json @@ -15,12 +15,13 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 9, + "type": "coordinator-key", "name": "ShareGroupPartitionMetadataKey", - "validVersions": "9", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "9", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json index 22424e93b967d..af63f047126e5 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 9, + "type": "coordinator-value", "name": "ShareGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataKey.json index 6bae1b6d0af91..d24e7bb9d4a49 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataKey.json @@ -15,12 +15,13 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 15, + "type": "coordinator-key", "name": "ShareGroupStatePartitionMetadataKey", - "validVersions": "15", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "15", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json index 2e797910002d3..18a9df0885fef 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 15, + "type": "coordinator-value", "name": "ShareGroupStatePartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberKey.json index 80a80112cba75..e8b364d289651 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberKey.json @@ -15,14 +15,15 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 13, + "type": "coordinator-key", "name": "ShareGroupTargetAssignmentMemberKey", - "validVersions": "13", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "13", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "13", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberValue.json index 1b460bf6a62c0..f6b52f3c86b99 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMemberValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 13, + "type": "coordinator-value", "name": "ShareGroupTargetAssignmentMemberValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataKey.json index ccf394d068ea3..202c57de7d55b 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataKey.json @@ -15,12 +15,13 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 12, + "type": "coordinator-key", "name": "ShareGroupTargetAssignmentMetadataKey", - "validVersions": "12", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "12", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json index 52a38e865c00d..0e0028855fbf5 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json @@ -15,7 +15,8 @@ // KIP-932 is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 12, + "type": "coordinator-value", "name": "ShareGroupTargetAssignmentMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index cb68771c8a5f1..7792f5de942b8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -56,7 +56,9 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyV0; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValueV0; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; @@ -337,12 +339,34 @@ public void testReplayOffsetCommit() { metricsShard ); - OffsetCommitKey key = new OffsetCommitKey(); - OffsetCommitValue value = new OffsetCommitValue(); + OffsetCommitKey key = new OffsetCommitKey() + .setGroup("goo") + .setTopic("foo") + .setPartition(0); + OffsetCommitValue value = new OffsetCommitValue() + .setOffset(100L) + .setCommitTimestamp(12345L) + .setExpireTimestamp(6789L) + .setMetadata("Metadata") + .setLeaderEpoch(10); coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord( - new ApiMessageAndVersion(key, (short) 0), - new ApiMessageAndVersion(value, (short) 0) + new ApiMessageAndVersion( + new OffsetCommitKeyV0() + .setGroup("goo") + .setTopic("foo") + .setPartition(0), + (short) 0 + ), + new ApiMessageAndVersion( + new OffsetCommitValueV0() + .setOffset(100L) + .setCommitTimestamp(12345L) + .setExpireTimestamp(6789L) + .setMetadata("Metadata") + .setLeaderEpoch(10), + (short) 0 + ) )); coordinator.replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord( @@ -382,12 +406,34 @@ public void testReplayTransactionalOffsetCommit() { metricsShard ); - OffsetCommitKey key = new OffsetCommitKey(); - OffsetCommitValue value = new OffsetCommitValue(); + OffsetCommitKey key = new OffsetCommitKey() + .setGroup("goo") + .setTopic("foo") + .setPartition(0); + OffsetCommitValue value = new OffsetCommitValue() + .setOffset(100L) + .setCommitTimestamp(12345L) + .setExpireTimestamp(6789L) + .setMetadata("Metadata") + .setLeaderEpoch(10); coordinator.replay(0L, 100L, (short) 0, new CoordinatorRecord( - new ApiMessageAndVersion(key, (short) 0), - new ApiMessageAndVersion(value, (short) 0) + new ApiMessageAndVersion( + new OffsetCommitKeyV0() + .setGroup("goo") + .setTopic("foo") + .setPartition(0), + (short) 0 + ), + new ApiMessageAndVersion( + new OffsetCommitValueV0() + .setOffset(100L) + .setCommitTimestamp(12345L) + .setExpireTimestamp(6789L) + .setMetadata("Metadata") + .setLeaderEpoch(10), + (short) 0 + ) )); coordinator.replay(1L, 101L, (short) 1, new CoordinatorRecord( @@ -427,10 +473,18 @@ public void testReplayOffsetCommitWithNullValue() { metricsShard ); - OffsetCommitKey key = new OffsetCommitKey(); + OffsetCommitKey key = new OffsetCommitKey() + .setGroup("goo") + .setTopic("foo") + .setPartition(0); coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord( - new ApiMessageAndVersion(key, (short) 0), + new ApiMessageAndVersion( + new OffsetCommitKeyV0() + .setGroup("goo") + .setTopic("foo") + .setPartition(0), + (short) 0), null )); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 0c930f2986259..cb833274cbc33 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -72,6 +72,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; @@ -1520,99 +1521,99 @@ public void replay( throw new IllegalStateException("Received a null key in " + record); } - switch (key.version()) { - case GroupMetadataKey.HIGHEST_SUPPORTED_VERSION: + switch (CoordinatorRecordType.fromId(key.version())) { + case GROUP_METADATA: groupMetadataManager.replay( (GroupMetadataKey) key.message(), (GroupMetadataValue) messageOrNull(value) ); break; - case ConsumerGroupMemberMetadataKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_MEMBER_METADATA: groupMetadataManager.replay( (ConsumerGroupMemberMetadataKey) key.message(), (ConsumerGroupMemberMetadataValue) messageOrNull(value) ); break; - case ConsumerGroupMetadataKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_METADATA: groupMetadataManager.replay( (ConsumerGroupMetadataKey) key.message(), (ConsumerGroupMetadataValue) messageOrNull(value) ); break; - case ConsumerGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_PARTITION_METADATA: groupMetadataManager.replay( (ConsumerGroupPartitionMetadataKey) key.message(), (ConsumerGroupPartitionMetadataValue) messageOrNull(value) ); break; - case ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_TARGET_ASSIGNMENT_MEMBER: groupMetadataManager.replay( (ConsumerGroupTargetAssignmentMemberKey) key.message(), (ConsumerGroupTargetAssignmentMemberValue) messageOrNull(value) ); break; - case ConsumerGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_TARGET_ASSIGNMENT_METADATA: groupMetadataManager.replay( (ConsumerGroupTargetAssignmentMetadataKey) key.message(), (ConsumerGroupTargetAssignmentMetadataValue) messageOrNull(value) ); break; - case ConsumerGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_CURRENT_MEMBER_ASSIGNMENT: groupMetadataManager.replay( (ConsumerGroupCurrentMemberAssignmentKey) key.message(), (ConsumerGroupCurrentMemberAssignmentValue) messageOrNull(value) ); break; - case ShareGroupMemberMetadataKey.HIGHEST_SUPPORTED_VERSION: + case SHARE_GROUP_MEMBER_METADATA: groupMetadataManager.replay( (ShareGroupMemberMetadataKey) key.message(), (ShareGroupMemberMetadataValue) messageOrNull(value) ); break; - case ShareGroupMetadataKey.HIGHEST_SUPPORTED_VERSION: + case SHARE_GROUP_METADATA: groupMetadataManager.replay( (ShareGroupMetadataKey) key.message(), (ShareGroupMetadataValue) messageOrNull(value) ); break; - case ShareGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION: + case SHARE_GROUP_PARTITION_METADATA: groupMetadataManager.replay( (ShareGroupPartitionMetadataKey) key.message(), (ShareGroupPartitionMetadataValue) messageOrNull(value) ); break; - case ShareGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION: + case SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER: groupMetadataManager.replay( (ShareGroupTargetAssignmentMemberKey) key.message(), (ShareGroupTargetAssignmentMemberValue) messageOrNull(value) ); break; - case ShareGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION: + case SHARE_GROUP_TARGET_ASSIGNMENT_METADATA: groupMetadataManager.replay( (ShareGroupTargetAssignmentMetadataKey) key.message(), (ShareGroupTargetAssignmentMetadataValue) messageOrNull(value) ); break; - case ShareGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION: + case SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT: groupMetadataManager.replay( (ShareGroupCurrentMemberAssignmentKey) key.message(), (ShareGroupCurrentMemberAssignmentValue) messageOrNull(value) ); break; - case ConsumerGroupRegularExpressionKey.HIGHEST_SUPPORTED_VERSION: + case CONSUMER_GROUP_REGULAR_EXPRESSION: groupMetadataManager.replay( (ConsumerGroupRegularExpressionKey) key.message(), (ConsumerGroupRegularExpressionValue) messageOrNull(value) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 00ec5e9739ef8..528d59f04b7d4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; @@ -496,8 +497,8 @@ private void replay( throw new IllegalStateException("Received a null key in " + record); } - switch (key.version()) { - case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION: + switch (CoordinatorRecordType.fromId(key.version())) { + case OFFSET_COMMIT: offsetMetadataManager.replay( lastWrittenOffset, producerId, From 3565ab88fb13c875f06ae93a3b8708e82ebb6f68 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 16:46:04 +0100 Subject: [PATCH 02/14] update record helpers --- .../group/GroupCoordinatorRecordHelpers.java | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 05cf4d5bd3a2b..8695d84d97cfb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -35,6 +35,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; @@ -89,7 +90,7 @@ public static CoordinatorRecord newConsumerGroupMemberSubscriptionRecord( new ConsumerGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 5 + CoordinatorRecordType.CONSUMER_GROUP_MEMBER_METADATA.id() ), new ApiMessageAndVersion( new ConsumerGroupMemberMetadataValue() @@ -123,7 +124,7 @@ public static CoordinatorRecord newConsumerGroupMemberSubscriptionTombstoneRecor new ConsumerGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 5 + CoordinatorRecordType.CONSUMER_GROUP_MEMBER_METADATA.id() ), null // Tombstone. ); @@ -153,7 +154,7 @@ public static CoordinatorRecord newConsumerGroupSubscriptionMetadataRecord( new ApiMessageAndVersion( new ConsumerGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 4 + CoordinatorRecordType.CONSUMER_GROUP_PARTITION_METADATA.id() ), new ApiMessageAndVersion( value, @@ -175,7 +176,7 @@ public static CoordinatorRecord newConsumerGroupSubscriptionMetadataTombstoneRec new ApiMessageAndVersion( new ConsumerGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 4 + CoordinatorRecordType.CONSUMER_GROUP_PARTITION_METADATA.id() ), null // Tombstone. ); @@ -196,7 +197,7 @@ public static CoordinatorRecord newConsumerGroupEpochRecord( new ApiMessageAndVersion( new ConsumerGroupMetadataKey() .setGroupId(groupId), - (short) 3 + CoordinatorRecordType.CONSUMER_GROUP_METADATA.id() ), new ApiMessageAndVersion( new ConsumerGroupMetadataValue() @@ -219,7 +220,7 @@ public static CoordinatorRecord newConsumerGroupEpochTombstoneRecord( new ApiMessageAndVersion( new ConsumerGroupMetadataKey() .setGroupId(groupId), - (short) 3 + CoordinatorRecordType.CONSUMER_GROUP_METADATA.id() ), null // Tombstone. ); @@ -254,7 +255,7 @@ public static CoordinatorRecord newConsumerGroupTargetAssignmentRecord( new ConsumerGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 7 + CoordinatorRecordType.CONSUMER_GROUP_TARGET_ASSIGNMENT_MEMBER.id() ), new ApiMessageAndVersion( new ConsumerGroupTargetAssignmentMemberValue() @@ -280,7 +281,7 @@ public static CoordinatorRecord newConsumerGroupTargetAssignmentTombstoneRecord( new ConsumerGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 7 + CoordinatorRecordType.CONSUMER_GROUP_TARGET_ASSIGNMENT_MEMBER.id() ), null // Tombstone. ); @@ -301,7 +302,7 @@ public static CoordinatorRecord newConsumerGroupTargetAssignmentEpochRecord( new ApiMessageAndVersion( new ConsumerGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 6 + CoordinatorRecordType.CONSUMER_GROUP_TARGET_ASSIGNMENT_METADATA.id() ), new ApiMessageAndVersion( new ConsumerGroupTargetAssignmentMetadataValue() @@ -324,7 +325,7 @@ public static CoordinatorRecord newConsumerGroupTargetAssignmentEpochTombstoneRe new ApiMessageAndVersion( new ConsumerGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 6 + CoordinatorRecordType.CONSUMER_GROUP_TARGET_ASSIGNMENT_METADATA.id() ), null // Tombstone. ); @@ -346,7 +347,7 @@ public static CoordinatorRecord newConsumerGroupCurrentAssignmentRecord( new ConsumerGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 8 + CoordinatorRecordType.CONSUMER_GROUP_CURRENT_MEMBER_ASSIGNMENT.id() ), new ApiMessageAndVersion( new ConsumerGroupCurrentMemberAssignmentValue() @@ -376,7 +377,7 @@ public static CoordinatorRecord newConsumerGroupCurrentAssignmentTombstoneRecord new ConsumerGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 8 + CoordinatorRecordType.CONSUMER_GROUP_CURRENT_MEMBER_ASSIGNMENT.id() ), null // Tombstone ); @@ -403,7 +404,7 @@ public static CoordinatorRecord newConsumerGroupRegularExpressionRecord( new ConsumerGroupRegularExpressionKey() .setGroupId(groupId) .setRegularExpression(regex), - (short) 16 + CoordinatorRecordType.CONSUMER_GROUP_REGULAR_EXPRESSION.id() ), new ApiMessageAndVersion( new ConsumerGroupRegularExpressionValue() @@ -431,7 +432,7 @@ public static CoordinatorRecord newConsumerGroupRegularExpressionTombstone( new ConsumerGroupRegularExpressionKey() .setGroupId(groupId) .setRegularExpression(regex), - (short) 16 + CoordinatorRecordType.CONSUMER_GROUP_REGULAR_EXPRESSION.id() ), null // Tombstone ); @@ -480,7 +481,7 @@ public static CoordinatorRecord newGroupMetadataRecord( new ApiMessageAndVersion( new GroupMetadataKey() .setGroup(group.groupId()), - (short) 2 + CoordinatorRecordType.GROUP_METADATA.id() ), new ApiMessageAndVersion( new GroupMetadataValue() @@ -508,7 +509,7 @@ public static CoordinatorRecord newGroupMetadataTombstoneRecord( new ApiMessageAndVersion( new GroupMetadataKey() .setGroup(groupId), - (short) 2 + CoordinatorRecordType.GROUP_METADATA.id() ), null // Tombstone ); @@ -529,7 +530,7 @@ public static CoordinatorRecord newEmptyGroupMetadataRecord( new ApiMessageAndVersion( new GroupMetadataKey() .setGroup(group.groupId()), - (short) 2 + CoordinatorRecordType.GROUP_METADATA.id() ), new ApiMessageAndVersion( new GroupMetadataValue() @@ -569,7 +570,7 @@ public static CoordinatorRecord newOffsetCommitRecord( .setGroup(groupId) .setTopic(topic) .setPartition(partitionId), - (short) 1 + CoordinatorRecordType.OFFSET_COMMIT.id() ), new ApiMessageAndVersion( new OffsetCommitValue() @@ -603,7 +604,7 @@ public static CoordinatorRecord newOffsetCommitTombstoneRecord( .setGroup(groupId) .setTopic(topic) .setPartition(partitionId), - (short) 1 + CoordinatorRecordType.OFFSET_COMMIT.id() ), null ); @@ -627,7 +628,7 @@ public static CoordinatorRecord newShareGroupMemberSubscriptionRecord( new ShareGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 10 + CoordinatorRecordType.SHARE_GROUP_MEMBER_METADATA.id() ), new ApiMessageAndVersion( new ShareGroupMemberMetadataValue() @@ -656,7 +657,7 @@ public static CoordinatorRecord newShareGroupMemberSubscriptionTombstoneRecord( new ShareGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 10 + CoordinatorRecordType.SHARE_GROUP_MEMBER_METADATA.id() ), null // Tombstone. ); @@ -686,7 +687,7 @@ public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord( new ApiMessageAndVersion( new ShareGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 9 + CoordinatorRecordType.SHARE_GROUP_PARTITION_METADATA.id() ), new ApiMessageAndVersion( value, @@ -708,7 +709,7 @@ public static CoordinatorRecord newShareGroupSubscriptionMetadataTombstoneRecord new ApiMessageAndVersion( new ShareGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 9 + CoordinatorRecordType.SHARE_GROUP_PARTITION_METADATA.id() ), null // Tombstone. ); @@ -729,7 +730,7 @@ public static CoordinatorRecord newShareGroupEpochRecord( new ApiMessageAndVersion( new ShareGroupMetadataKey() .setGroupId(groupId), - (short) 11 + CoordinatorRecordType.SHARE_GROUP_METADATA.id() ), new ApiMessageAndVersion( new ShareGroupMetadataValue() @@ -752,7 +753,7 @@ public static CoordinatorRecord newShareGroupEpochTombstoneRecord( new ApiMessageAndVersion( new ShareGroupMetadataKey() .setGroupId(groupId), - (short) 11 + CoordinatorRecordType.SHARE_GROUP_METADATA.id() ), null // Tombstone. ); @@ -787,7 +788,7 @@ public static CoordinatorRecord newShareGroupTargetAssignmentRecord( new ShareGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 13 + CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER.id() ), new ApiMessageAndVersion( new ShareGroupTargetAssignmentMemberValue() @@ -813,7 +814,7 @@ public static CoordinatorRecord newShareGroupTargetAssignmentTombstoneRecord( new ShareGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 13 + CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER.id() ), null // Tombstone. ); @@ -834,7 +835,7 @@ public static CoordinatorRecord newShareGroupTargetAssignmentEpochRecord( new ApiMessageAndVersion( new ShareGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 12 + CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_METADATA.id() ), new ApiMessageAndVersion( new ShareGroupTargetAssignmentMetadataValue() @@ -857,7 +858,7 @@ public static CoordinatorRecord newShareGroupTargetAssignmentEpochTombstoneRecor new ApiMessageAndVersion( new ShareGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 12 + CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_METADATA.id() ), null // Tombstone. ); @@ -879,7 +880,7 @@ public static CoordinatorRecord newShareGroupCurrentAssignmentRecord( new ShareGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 14 + CoordinatorRecordType.SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT.id() ), new ApiMessageAndVersion( new ShareGroupCurrentMemberAssignmentValue() @@ -908,7 +909,7 @@ public static CoordinatorRecord newShareGroupCurrentAssignmentTombstoneRecord( new ShareGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 14 + CoordinatorRecordType.SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT.id() ), null // Tombstone ); From c52401c830ab1b4e1e6a737a2a01110344638479 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 20:28:06 +0100 Subject: [PATCH 03/14] update old coordinator --- .../group/GroupMetadataManager.scala | 34 ++++++++++++------- .../group/GroupMetadataManagerTest.scala | 5 ++- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 0c667aa7fa5cf..beac83c4e5b46 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -29,6 +29,7 @@ import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock import kafka.utils._ import org.apache.kafka.common.compress.Compression +import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.{Metrics, Sensor} import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter} @@ -40,7 +41,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.coordinator.group.{OffsetAndMetadata, OffsetConfig} -import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} +import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} import org.apache.kafka.server.metrics.KafkaMetricsGroup @@ -1069,7 +1070,7 @@ object GroupMetadataManager { * @return key for offset commit message */ def offsetCommitKey(groupId: String, topicPartition: TopicPartition): Array[Byte] = { - MessageUtil.toVersionPrefixedBytes(OffsetCommitKey.HIGHEST_SUPPORTED_VERSION, + MessageUtil.toVersionPrefixedBytes(CoordinatorRecordType.OFFSET_COMMIT.id(), new OffsetCommitKey() .setGroup(groupId) .setTopic(topicPartition.topic) @@ -1083,7 +1084,7 @@ object GroupMetadataManager { * @return key bytes for group metadata message */ def groupMetadataKey(groupId: String): Array[Byte] = { - MessageUtil.toVersionPrefixedBytes(GroupMetadataKeyData.HIGHEST_SUPPORTED_VERSION, + MessageUtil.toVersionPrefixedBytes(CoordinatorRecordType.GROUP_METADATA.id(), new GroupMetadataKeyData() .setGroup(groupId)) } @@ -1164,16 +1165,23 @@ object GroupMetadataManager { */ def readMessageKey(buffer: ByteBuffer): BaseKey = { val version = buffer.getShort - if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) { - // version 0 and 1 refer to offset - val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), version) - OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) - } else if (version >= GroupMetadataKeyData.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKeyData.HIGHEST_SUPPORTED_VERSION) { - // version 2 refers to group metadata - val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), version) - GroupMetadataKey(version, key.group) - } else { - UnknownKey(version) + try { + CoordinatorRecordType.fromId(version) match { + case CoordinatorRecordType.OFFSET_COMMIT | CoordinatorRecordType.OFFSET_COMMIT_V0 => + // version 0 and 1 refer to offset + val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort) + OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) + + case CoordinatorRecordType.GROUP_METADATA => + // version 2 refers to group metadata + val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), 0.toShort) + GroupMetadataKey(version, key.group) + + case _ => + UnknownKey(version) + } + } catch { + case _: UnsupportedVersionException => UnknownKey(version) } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index f16dba44cbd37..b4328ea751240 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetAndMetadata, OffsetConfig} -import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue} +import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, OffsetCommitValue} import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -3115,8 +3115,7 @@ class GroupMetadataManagerTest { // Should ignore unknown record val unknownKey = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey() - val lowestUnsupportedVersion = (org.apache.kafka.coordinator.group.generated.GroupMetadataKey - .HIGHEST_SUPPORTED_VERSION + 1).toShort + val lowestUnsupportedVersion = (CoordinatorRecordType.GROUP_METADATA.id + 1).toShort val unknownMessage1 = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, unknownKey) val unknownMessage2 = MessageUtil.toVersionPrefixedBytes(lowestUnsupportedVersion, unknownKey) From c39272270644a4acbd6896027617468c95fba199 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 20:43:26 +0100 Subject: [PATCH 04/14] fix checkstyle --- .../src/main/java/org/apache/kafka/message/MessageSpec.java | 1 + 1 file changed, 1 insertion(+) diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java index 36c36a418f1f9..3bad8016c448e 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java @@ -42,6 +42,7 @@ public final class MessageSpec { private final boolean latestVersionUnstable; @JsonCreator + @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) public MessageSpec(@JsonProperty("name") String name, @JsonProperty("validVersions") String validVersions, @JsonProperty("deprecatedVersions") String deprecatedVersions, From ace122f8a7e8711033dde2f3c96351a5b5793e4c Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 20:48:34 +0100 Subject: [PATCH 05/14] cleanup --- .../group/GroupCoordinatorShard.java | 4 +-- .../common/message/OffsetCommitValueV0.json | 13 ++------- .../group/GroupCoordinatorShardTest.java | 28 ++++++++++++------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 4dc25588c04ae..feecdff0a9d1e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -776,9 +776,7 @@ private static OffsetCommitValue convertOffsetCommitValueV0( return new OffsetCommitValue() .setOffset(value.offset()) .setCommitTimestamp(value.commitTimestamp()) - .setExpireTimestamp(value.expireTimestamp()) - .setMetadata(value.metadata()) - .setLeaderEpoch(value.leaderEpoch()); + .setMetadata(value.metadata()); } /** diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json b/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json index f1f12fc7e7e43..2319aba55812f 100644 --- a/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json +++ b/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json @@ -17,21 +17,14 @@ "apiKey": 0, "type": "coordinator-value", "name": "OffsetCommitValueV0", - // Version 4 is the first flexible version. - // KIP-915: bumping the version will no longer make this record backward compatible. - // We suggest to add/remove only tagged fields to maintain backward compatibility. - "validVersions": "0-4", - "flexibleVersions": "4+", + "validVersions": "0", + "flexibleVersions": "none", "fields": [ { "name": "offset", "type": "int64", "versions": "0+", "about": "The offset that the consumer wants to store (for this partition)."}, - { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true, - "about": "The leader epoch of the last consumed record."}, { "name": "metadata", "type": "string", "versions": "0+", "about": "Any metadata the client wants to keep."}, { "name": "commitTimestamp", "type": "int64", "versions": "0+", - "about": "The time at which the commit was added to the log."}, - { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true, - "about": "The time at which the offset will expire."} + "about": "The time at which the commit was added to the log."} ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 7792f5de942b8..703f9be4be87d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -362,9 +362,7 @@ public void testReplayOffsetCommit() { new OffsetCommitValueV0() .setOffset(100L) .setCommitTimestamp(12345L) - .setExpireTimestamp(6789L) - .setMetadata("Metadata") - .setLeaderEpoch(10), + .setMetadata("Metadata"), (short) 0 ) )); @@ -377,8 +375,14 @@ public void testReplayOffsetCommit() { verify(offsetMetadataManager, times(1)).replay( 0L, RecordBatch.NO_PRODUCER_ID, - key, - value + new OffsetCommitKey() + .setGroup("goo") + .setTopic("foo") + .setPartition(0), + new OffsetCommitValue() + .setOffset(100L) + .setCommitTimestamp(12345L) + .setMetadata("Metadata") ); verify(offsetMetadataManager, times(1)).replay( @@ -429,9 +433,7 @@ public void testReplayTransactionalOffsetCommit() { new OffsetCommitValueV0() .setOffset(100L) .setCommitTimestamp(12345L) - .setExpireTimestamp(6789L) - .setMetadata("Metadata") - .setLeaderEpoch(10), + .setMetadata("Metadata"), (short) 0 ) )); @@ -444,8 +446,14 @@ public void testReplayTransactionalOffsetCommit() { verify(offsetMetadataManager, times(1)).replay( 0L, 100L, - key, - value + new OffsetCommitKey() + .setGroup("goo") + .setTopic("foo") + .setPartition(0), + new OffsetCommitValue() + .setOffset(100L) + .setCommitTimestamp(12345L) + .setMetadata("Metadata") ); verify(offsetMetadataManager, times(1)).replay( From a1c62c12ca09d3ec3ef19a184079af66612891fd Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 21:19:14 +0100 Subject: [PATCH 06/14] fixup --- .../GroupMetadataMessageFormatter.java | 27 ++++++++++++------- .../consumer/OffsetsMessageFormatter.java | 27 ++++++++++++------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java index 754c43193c8cd..9e6926421a3e4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.tools.consumer; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; @@ -36,7 +38,7 @@ public class GroupMetadataMessageFormatter extends ApiMessageFormatter { @Override protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) { return readToGroupMetadataKey(byteBuffer) - .map(logKey -> transferKeyMessageToJsonNode(logKey, version)) + .map(logKey -> transferKeyMessageToJsonNode(logKey, (short) 0)) .orElseGet(() -> new TextNode(UNKNOWN)); } @@ -49,12 +51,19 @@ protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) { private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { short version = byteBuffer.getShort(); - if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION - && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) { - return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version)); - } else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) { - return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version)); - } else { + try { + switch (CoordinatorRecordType.fromId(version)) { + case OFFSET_COMMIT_V0: + case OFFSET_COMMIT: + return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); + + case GROUP_METADATA: + return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0)); + + default: + return Optional.empty(); + } + } catch (UnsupportedVersionException ex) { return Optional.empty(); } } @@ -71,8 +80,8 @@ private JsonNode transferKeyMessageToJsonNode(ApiMessage message, short version) private Optional readToGroupMetadataValue(ByteBuffer byteBuffer) { short version = byteBuffer.getShort(); - if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION - && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) { + if (version >= CoordinatorRecordType.GROUP_METADATA.lowestSupportedVersion() + && version <= CoordinatorRecordType.GROUP_METADATA.highestSupportedVersion()) { return Optional.of(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version)); } else { return Optional.empty(); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java index 2927ed46c9f7f..bb823344e604d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.tools.consumer; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter; @@ -39,7 +41,7 @@ public class OffsetsMessageFormatter extends ApiMessageFormatter { @Override protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) { return readToGroupMetadataKey(byteBuffer) - .map(logKey -> transferKeyMessageToJsonNode(logKey, version)) + .map(logKey -> transferKeyMessageToJsonNode(logKey, (short) 0)) .orElseGet(() -> new TextNode(UNKNOWN)); } @@ -52,12 +54,19 @@ protected JsonNode readToValueJson(ByteBuffer byteBuffer, short version) { private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { short version = byteBuffer.getShort(); - if (version >= OffsetCommitKey.LOWEST_SUPPORTED_VERSION - && version <= OffsetCommitKey.HIGHEST_SUPPORTED_VERSION) { - return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), version)); - } else if (version >= GroupMetadataKey.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataKey.HIGHEST_SUPPORTED_VERSION) { - return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), version)); - } else { + try { + switch (CoordinatorRecordType.fromId(version)) { + case OFFSET_COMMIT_V0: + case OFFSET_COMMIT: + return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); + + case GROUP_METADATA: + return Optional.of(new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0)); + + default: + return Optional.empty(); + } + } catch (UnsupportedVersionException ex) { return Optional.empty(); } } @@ -74,8 +83,8 @@ private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersio private Optional readToOffsetMessageValue(ByteBuffer byteBuffer) { short version = byteBuffer.getShort(); - if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION - && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) { + if (version >= CoordinatorRecordType.OFFSET_COMMIT.lowestSupportedVersion() + && version <= CoordinatorRecordType.OFFSET_COMMIT.highestSupportedVersion()) { return Optional.of(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version)); } else { return Optional.empty(); From fdc2c2fa20538231eb53ac64d189af8a99be7b70 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 18 Dec 2024 21:32:28 +0100 Subject: [PATCH 07/14] update dump-log --- .../scala/kafka/tools/DumpLogSegments.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index da5565b87d414..8e333d378854b 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -422,45 +422,45 @@ object DumpLogSegments { class OffsetsMessageParser extends MessageParser[String, String] { private val serde = new GroupCoordinatorRecordSerde() - private def prepareKey(message: Message, version: Short): String = { + private def prepareKey(message: Message, recordType: Short): String = { val messageAsJson = message match { case m: OffsetCommitKey => - OffsetCommitKeyJsonConverter.write(m, version) + OffsetCommitKeyJsonConverter.write(m, 0.toShort) case m: GroupMetadataKey => - GroupMetadataKeyJsonConverter.write(m, version) + GroupMetadataKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupMetadataKey => - ConsumerGroupMetadataKeyJsonConverter.write(m, version) + ConsumerGroupMetadataKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupPartitionMetadataKey => - ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, version) + ConsumerGroupPartitionMetadataKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupMemberMetadataKey => - ConsumerGroupMemberMetadataKeyJsonConverter.write(m, version) + ConsumerGroupMemberMetadataKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupTargetAssignmentMetadataKey => - ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version) + ConsumerGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupTargetAssignmentMemberKey => - ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, version) + ConsumerGroupTargetAssignmentMemberKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupCurrentMemberAssignmentKey => - ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version) + ConsumerGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 0.toShort) case m: ConsumerGroupRegularExpressionKey => - ConsumerGroupRegularExpressionKeyJsonConverter.write(m, version) + ConsumerGroupRegularExpressionKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupMetadataKey => - ShareGroupMetadataKeyJsonConverter.write(m, version) + ShareGroupMetadataKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupPartitionMetadataKey => - ShareGroupPartitionMetadataKeyJsonConverter.write(m, version) + ShareGroupPartitionMetadataKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupMemberMetadataKey => - ShareGroupMemberMetadataKeyJsonConverter.write(m, version) + ShareGroupMemberMetadataKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupTargetAssignmentMetadataKey => - ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, version) + ShareGroupTargetAssignmentMetadataKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupTargetAssignmentMemberKey => - ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, version) + ShareGroupTargetAssignmentMemberKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupCurrentMemberAssignmentKey => - ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, version) + ShareGroupCurrentMemberAssignmentKeyJsonConverter.write(m, 0.toShort) case m: ShareGroupStatePartitionMetadataKey => - ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, version) - case _ => throw new UnknownRecordTypeException(version) + ShareGroupStatePartitionMetadataKeyJsonConverter.write(m, 0.toShort) + case _ => throw new UnknownRecordTypeException(recordType) } val json = new ObjectNode(JsonNodeFactory.instance) - json.set("type", new TextNode(version.toString)) + json.set("type", new TextNode(recordType.toString)) json.set("data", messageAsJson) json.toString } From e1bd0084a06d2e4d41add69bfdae3d59d7ef3b52 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 19 Dec 2024 08:29:52 +0100 Subject: [PATCH 08/14] fix typos --- .../main/java/org/apache/kafka/message/MessageSpec.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java index 3bad8016c448e..2901557daabe3 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java @@ -85,19 +85,19 @@ public MessageSpec(@JsonProperty("name") String name, if (type == MessageSpecType.COORDINATOR_KEY) { if (this.apiKey.isEmpty()) { - throw new RuntimeException("The ApiKey must be set for messages " + name + " with type `record-key`"); + throw new RuntimeException("The ApiKey must be set for messages " + name + " with type `coordinator-key`"); } if (!this.validVersions().equals(new Versions((short) 0, ((short) 0)))) { - throw new RuntimeException("The Versions must be set to `0` for messages " + name + " with type `record-key`"); + throw new RuntimeException("The Versions must be set to `0` for messages " + name + " with type `coordinator-key`"); } if (!this.flexibleVersions.empty()) { - throw new RuntimeException("The FlexibleVersions are not supported for messages " + name + " with type `record-key`"); + throw new RuntimeException("The FlexibleVersions are not supported for messages " + name + " with type `coordinator-key`"); } } if (type == MessageSpecType.COORDINATOR_VALUE) { if (this.apiKey.isEmpty()) { - throw new RuntimeException("The ApiKey must be set for messages with type `record-key`"); + throw new RuntimeException("The ApiKey must be set for messages with type `coordinator-value`"); } } } From 8d9d7aca3e17e6c4d8a8bd274bc78ede86440226 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 19 Dec 2024 08:33:49 +0100 Subject: [PATCH 09/14] rename offset commit v0 --- .../coordinator/group/GroupMetadataManager.scala | 2 +- .../coordinator/group/GroupCoordinatorShard.java | 14 +++++++------- ...CommitKeyV0.json => LegacyOffsetCommitKey.json} | 2 +- ...itValueV0.json => LegacyOffsetCommitValue.json} | 8 ++++---- .../group/GroupCoordinatorShardTest.java | 14 +++++++------- .../consumer/GroupMetadataMessageFormatter.java | 2 +- .../tools/consumer/OffsetsMessageFormatter.java | 2 +- 7 files changed, 22 insertions(+), 22 deletions(-) rename group-coordinator/src/main/resources/common/message/{OffsetCommitKeyV0.json => LegacyOffsetCommitKey.json} (97%) rename group-coordinator/src/main/resources/common/message/{OffsetCommitValueV0.json => LegacyOffsetCommitValue.json} (88%) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index beac83c4e5b46..8e1dda9ea4b46 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1167,7 +1167,7 @@ object GroupMetadataManager { val version = buffer.getShort try { CoordinatorRecordType.fromId(version) match { - case CoordinatorRecordType.OFFSET_COMMIT | CoordinatorRecordType.OFFSET_COMMIT_V0 => + case CoordinatorRecordType.OFFSET_COMMIT | CoordinatorRecordType.LEGACY_OFFSET_COMMIT => // version 0 and 1 refer to offset val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort) OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index feecdff0a9d1e..8360240ce478f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -74,10 +74,10 @@ import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyV0; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValueV0; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; @@ -760,7 +760,7 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { } private static OffsetCommitKey convertOffsetCommitKeyV0( - OffsetCommitKeyV0 key + LegacyOffsetCommitKey key ) { return new OffsetCommitKey() .setGroup(key.group()) @@ -769,7 +769,7 @@ private static OffsetCommitKey convertOffsetCommitKeyV0( } private static OffsetCommitValue convertOffsetCommitValueV0( - OffsetCommitValueV0 value + LegacyOffsetCommitValue value ) { if (value == null) return null; @@ -808,12 +808,12 @@ public void replay( } switch (recordType) { - case OFFSET_COMMIT_V0: + case LEGACY_OFFSET_COMMIT: offsetMetadataManager.replay( offset, producerId, - convertOffsetCommitKeyV0((OffsetCommitKeyV0) key.message()), - convertOffsetCommitValueV0((OffsetCommitValueV0) Utils.messageOrNull(value)) + convertOffsetCommitKeyV0((LegacyOffsetCommitKey) key.message()), + convertOffsetCommitValueV0((LegacyOffsetCommitValue) Utils.messageOrNull(value)) ); break; diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json b/group-coordinator/src/main/resources/common/message/LegacyOffsetCommitKey.json similarity index 97% rename from group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json rename to group-coordinator/src/main/resources/common/message/LegacyOffsetCommitKey.json index d5609644650bb..b244be13175c6 100644 --- a/group-coordinator/src/main/resources/common/message/OffsetCommitKeyV0.json +++ b/group-coordinator/src/main/resources/common/message/LegacyOffsetCommitKey.json @@ -16,7 +16,7 @@ { "apiKey": 0, "type": "coordinator-key", - "name": "OffsetCommitKeyV0", + "name": "LegacyOffsetCommitKey", "validVersions": "0", "flexibleVersions": "none", "fields": [ diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json b/group-coordinator/src/main/resources/common/message/LegacyOffsetCommitValue.json similarity index 88% rename from group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json rename to group-coordinator/src/main/resources/common/message/LegacyOffsetCommitValue.json index 2319aba55812f..c81af345c6e67 100644 --- a/group-coordinator/src/main/resources/common/message/OffsetCommitValueV0.json +++ b/group-coordinator/src/main/resources/common/message/LegacyOffsetCommitValue.json @@ -16,15 +16,15 @@ { "apiKey": 0, "type": "coordinator-value", - "name": "OffsetCommitValueV0", + "name": "LegacyOffsetCommitValue", "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "offset", "type": "int64", "versions": "0+", + { "name": "offset", "type": "int64", "versions": "0", "about": "The offset that the consumer wants to store (for this partition)."}, - { "name": "metadata", "type": "string", "versions": "0+", + { "name": "metadata", "type": "string", "versions": "0", "about": "Any metadata the client wants to keep."}, - { "name": "commitTimestamp", "type": "int64", "versions": "0+", + { "name": "commitTimestamp", "type": "int64", "versions": "0", "about": "The time at which the commit was added to the log."} ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 703f9be4be87d..4e449a4b8b17f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -55,10 +55,10 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyV0; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValueV0; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; @@ -352,14 +352,14 @@ public void testReplayOffsetCommit() { coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord( new ApiMessageAndVersion( - new OffsetCommitKeyV0() + new LegacyOffsetCommitKey() .setGroup("goo") .setTopic("foo") .setPartition(0), (short) 0 ), new ApiMessageAndVersion( - new OffsetCommitValueV0() + new LegacyOffsetCommitValue() .setOffset(100L) .setCommitTimestamp(12345L) .setMetadata("Metadata"), @@ -423,14 +423,14 @@ public void testReplayTransactionalOffsetCommit() { coordinator.replay(0L, 100L, (short) 0, new CoordinatorRecord( new ApiMessageAndVersion( - new OffsetCommitKeyV0() + new LegacyOffsetCommitKey() .setGroup("goo") .setTopic("foo") .setPartition(0), (short) 0 ), new ApiMessageAndVersion( - new OffsetCommitValueV0() + new LegacyOffsetCommitValue() .setOffset(100L) .setCommitTimestamp(12345L) .setMetadata("Metadata"), @@ -488,7 +488,7 @@ public void testReplayOffsetCommitWithNullValue() { coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord( new ApiMessageAndVersion( - new OffsetCommitKeyV0() + new LegacyOffsetCommitKey() .setGroup("goo") .setTopic("foo") .setPartition(0), diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java index 9e6926421a3e4..1fca941ad85b6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java @@ -53,7 +53,7 @@ private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { short version = byteBuffer.getShort(); try { switch (CoordinatorRecordType.fromId(version)) { - case OFFSET_COMMIT_V0: + case LEGACY_OFFSET_COMMIT: case OFFSET_COMMIT: return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java index bb823344e604d..ca154a94077a4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java @@ -56,7 +56,7 @@ private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { short version = byteBuffer.getShort(); try { switch (CoordinatorRecordType.fromId(version)) { - case OFFSET_COMMIT_V0: + case LEGACY_OFFSET_COMMIT: case OFFSET_COMMIT: return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); From 7075b338d6bca7c2d027d8d3660f313a00e604ac Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 19 Dec 2024 08:58:21 +0100 Subject: [PATCH 10/14] cleanups --- .../kafka/common/protocol/MessageUtil.java | 32 ++++++++++++++++--- .../group/GroupMetadataManager.scala | 4 +-- .../group/GroupCoordinatorRecordSerde.java | 13 ++++++++ 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java index ab6600a7d059f..90db613841de9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java @@ -225,11 +225,35 @@ public static ByteBuffer toVersionPrefixedByteBuffer(final short version, final public static byte[] toVersionPrefixedBytes(final short version, final Message message) { ByteBuffer buffer = toVersionPrefixedByteBuffer(version, message); - // take the inner array directly if it is full with data + // take the inner array directly if it is full of data. if (buffer.hasArray() && - buffer.arrayOffset() == 0 && - buffer.position() == 0 && - buffer.limit() == buffer.array().length) return buffer.array(); + buffer.arrayOffset() == 0 && + buffer.position() == 0 && + buffer.limit() == buffer.array().length) return buffer.array(); + else return Utils.toArray(buffer); + } + + public static ByteBuffer toCoordinatorTypePrefixedByteBuffer(final short type, final Message message) { + if (message.highestSupportedVersion() != 0 || message.lowestSupportedVersion() != 0) { + throw new IllegalArgumentException("Cannot serialize a message with a different version than 0."); + } + + ObjectSerializationCache cache = new ObjectSerializationCache(); + int messageSize = message.size(cache, (short) 0); + ByteBufferAccessor bytes = new ByteBufferAccessor(ByteBuffer.allocate(messageSize + 2)); + bytes.writeShort(type); + message.write(bytes, cache, (short) 0); + bytes.flip(); + return bytes.buffer(); + } + + public static byte[] toCoordinatorTypePrefixedBytes(final short type, final Message message) { + ByteBuffer buffer = toCoordinatorTypePrefixedByteBuffer(type, message); + // take the inner array directly if it is full of data. + if (buffer.hasArray() && + buffer.arrayOffset() == 0 && + buffer.position() == 0 && + buffer.limit() == buffer.array().length) return buffer.array(); else return Utils.toArray(buffer); } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 8e1dda9ea4b46..80b14d4dca35c 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1070,7 +1070,7 @@ object GroupMetadataManager { * @return key for offset commit message */ def offsetCommitKey(groupId: String, topicPartition: TopicPartition): Array[Byte] = { - MessageUtil.toVersionPrefixedBytes(CoordinatorRecordType.OFFSET_COMMIT.id(), + MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.OFFSET_COMMIT.id(), new OffsetCommitKey() .setGroup(groupId) .setTopic(topicPartition.topic) @@ -1084,7 +1084,7 @@ object GroupMetadataManager { * @return key bytes for group metadata message */ def groupMetadataKey(groupId: String): Array[Byte] = { - MessageUtil.toVersionPrefixedBytes(CoordinatorRecordType.GROUP_METADATA.id(), + MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.GROUP_METADATA.id(), new GroupMetadataKeyData() .setGroup(groupId)) } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java index 38abc233f5d44..afa489502d8b0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java @@ -17,7 +17,9 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; @@ -56,6 +58,17 @@ * Please ensure any new record added here stays in sync with DumpLogSegments. */ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde { + // This method is temporary until the share coordinator is converted to + // using the new coordinator records. + @Override + public byte[] serializeKey(CoordinatorRecord record) { + // Record does not accept a null key. + return MessageUtil.toCoordinatorTypePrefixedBytes( + record.key().version(), + record.key().message() + ); + } + @Override protected ApiMessage apiMessageKeyFor(short recordVersion) { switch (recordVersion) { From 31888f5ac1eeab27af01b1f7ba6c25c7869ffaa9 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 19 Dec 2024 09:19:57 +0100 Subject: [PATCH 11/14] more cleanups --- .../coordinator/group/GroupMetadataManager.scala | 11 ++++++++--- .../consumer/GroupMetadataMessageFormatter.java | 7 ++++++- .../tools/consumer/OffsetsMessageFormatter.java | 14 ++++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 80b14d4dca35c..b5d0348db30ef 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -41,7 +41,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.coordinator.group.{OffsetAndMetadata, OffsetConfig} -import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} +import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, LegacyOffsetCommitKey, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} import org.apache.kafka.server.metrics.KafkaMetricsGroup @@ -1167,13 +1167,18 @@ object GroupMetadataManager { val version = buffer.getShort try { CoordinatorRecordType.fromId(version) match { + case CoordinatorRecordType.LEGACY_OFFSET_COMMIT => + // version 0 refers to the legacy offset commit. + val key = new LegacyOffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort) + OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) + case CoordinatorRecordType.OFFSET_COMMIT | CoordinatorRecordType.LEGACY_OFFSET_COMMIT => - // version 0 and 1 refer to offset + // version 1 refers to offset commit. val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort) OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) case CoordinatorRecordType.GROUP_METADATA => - // version 2 refers to group metadata + // version 2 refers to group metadata. val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), 0.toShort) GroupMetadataKey(version, key.group) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java index 1fca941ad85b6..bc42f55d89f9f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java @@ -24,6 +24,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import com.fasterxml.jackson.databind.JsonNode; @@ -54,6 +55,8 @@ private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { try { switch (CoordinatorRecordType.fromId(version)) { case LEGACY_OFFSET_COMMIT: + return Optional.of(new LegacyOffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); + case OFFSET_COMMIT: return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); @@ -69,7 +72,9 @@ private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { } private JsonNode transferKeyMessageToJsonNode(ApiMessage message, short version) { - if (message instanceof OffsetCommitKey) { + if (message instanceof LegacyOffsetCommitKey) { + return NullNode.getInstance(); + } else if (message instanceof OffsetCommitKey) { return NullNode.getInstance(); } else if (message instanceof GroupMetadataKey) { return GroupMetadataKeyJsonConverter.write((GroupMetadataKey) message, version); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java index ca154a94077a4..5f9dc4c23ab59 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.LegacyOffsetCommitKeyJsonConverter; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; @@ -41,7 +43,7 @@ public class OffsetsMessageFormatter extends ApiMessageFormatter { @Override protected JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) { return readToGroupMetadataKey(byteBuffer) - .map(logKey -> transferKeyMessageToJsonNode(logKey, (short) 0)) + .map(logKey -> transferKeyMessageToJsonNode(logKey)) .orElseGet(() -> new TextNode(UNKNOWN)); } @@ -57,6 +59,8 @@ private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { try { switch (CoordinatorRecordType.fromId(version)) { case LEGACY_OFFSET_COMMIT: + return Optional.of(new LegacyOffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); + case OFFSET_COMMIT: return Optional.of(new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0)); @@ -71,9 +75,11 @@ private Optional readToGroupMetadataKey(ByteBuffer byteBuffer) { } } - private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersion) { - if (logKey instanceof OffsetCommitKey) { - return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) logKey, keyVersion); + private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey) { + if (logKey instanceof LegacyOffsetCommitKey) { + return LegacyOffsetCommitKeyJsonConverter.write((LegacyOffsetCommitKey) logKey, (short) 0); + } else if (logKey instanceof OffsetCommitKey) { + return OffsetCommitKeyJsonConverter.write((OffsetCommitKey) logKey, (short) 0); } else if (logKey instanceof GroupMetadataKey) { return NullNode.getInstance(); } else { From e1106c6659e90977bb1593fccc48814fe1b301d6 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 19 Dec 2024 12:03:34 +0100 Subject: [PATCH 12/14] more --- .../group/GroupMetadataManagerTest.scala | 31 ++++++++++++++++++- ...treamsGroupCurrentMemberAssignmentKey.json | 9 +++--- ...eamsGroupCurrentMemberAssignmentValue.json | 3 +- .../StreamsGroupMemberMetadataKey.json | 9 +++--- .../StreamsGroupMemberMetadataValue.json | 3 +- .../message/StreamsGroupMetadataKey.json | 7 +++-- .../message/StreamsGroupMetadataValue.json | 3 +- .../StreamsGroupPartitionMetadataKey.json | 7 +++-- .../StreamsGroupPartitionMetadataValue.json | 3 +- ...StreamsGroupTargetAssignmentMemberKey.json | 9 +++--- ...reamsGroupTargetAssignmentMemberValue.json | 3 +- ...reamsGroupTargetAssignmentMetadataKey.json | 7 +++-- ...amsGroupTargetAssignmentMetadataValue.json | 3 +- .../message/StreamsGroupTopologyKey.json | 7 +++-- .../message/StreamsGroupTopologyValue.json | 3 +- 15 files changed, 75 insertions(+), 32 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index b4328ea751240..214295b332ca3 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -43,7 +43,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetAndMetadata, OffsetConfig} -import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, OffsetCommitValue} +import org.apache.kafka.coordinator.group.generated.{CoordinatorRecordType, GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -3143,4 +3143,33 @@ class GroupMetadataManagerTest { assertTrue(group.offset(topicPartition).map(_.expireTimestampMs).get.isEmpty) } } + + @Test + def testOffsetCommitKey(): Unit = { + val bytes = GroupMetadataManager.offsetCommitKey( + "foo", + new TopicPartition("__consumer_offsets", 0) + ) + val buffer = ByteBuffer.wrap(bytes) + assertEquals(1.toShort, buffer.getShort) + assertEquals( + new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort), + new OffsetCommitKey() + .setGroup("foo") + .setTopic("__consumer_offsets") + .setPartition(0) + ) + } + + @Test + def testGroupMetadataKey(): Unit = { + val bytes = GroupMetadataManager.groupMetadataKey("foo") + val buffer = ByteBuffer.wrap(bytes) + assertEquals(2.toShort, buffer.getShort()) + assertEquals( + new GroupMetadataKeyData(new ByteBufferAccessor(buffer), 0.toShort), + new GroupMetadataKeyData() + .setGroup("foo") + ) + } } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json index c9134865b58f5..236d66de03cb5 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json @@ -15,14 +15,15 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 22, + "type": "coordinator-key", "name": "StreamsGroupCurrentMemberAssignmentKey", - "validVersions": "22", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "22", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." }, - { "name": "MemberId", "type": "string", "versions": "22", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json index 371527b632512..463c1e84e1702 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 22, + "type": "coordinator-value", "name": "StreamsGroupCurrentMemberAssignmentValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json index 4e447a497ce31..ae1fbc8d1a758 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json @@ -15,14 +15,15 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 19, + "type": "coordinator-key", "name": "StreamsGroupMemberMetadataKey", - "validVersions": "19", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "19", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." }, - { "name": "MemberId", "type": "string", "versions": "19", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json index cc2b35e30d87d..1ecc047f17a84 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 19, + "type": "coordinator-value", "name": "StreamsGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json index 14171e9c2661b..3d583ebb66ed5 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json @@ -15,12 +15,13 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 17, + "type": "coordinator-key", "name": "StreamsGroupMetadataKey", - "validVersions": "17", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "17", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json index 7d63be55eff06..2b4d371570aa4 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 17, + "type": "coordinator-value", "name": "StreamsGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json index 0364ae6cabb6c..cb82e930a09c4 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json @@ -15,12 +15,13 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 18, + "type": "coordinator-key", "name": "StreamsGroupPartitionMetadataKey", - "validVersions": "18", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "18", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json index 73bf194fae1ea..1f5eb8e8dcb24 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 18, + "type": "coordinator-value", "name": "StreamsGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json index b3a82c174115c..7563f01faded4 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json @@ -15,14 +15,15 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 21, + "type": "coordinator-key", "name": "StreamsGroupTargetAssignmentMemberKey", - "validVersions": "21", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "21", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." }, - { "name": "MemberId", "type": "string", "versions": "21", + { "name": "MemberId", "type": "string", "versions": "0", "about": "The member ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json index 8ec2eac70a4ab..c96dd608c7fa6 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 21, + "type": "coordinator-value", "name": "StreamsGroupTargetAssignmentMemberValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json index 44a924f8f16dd..22fb861083a6b 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json @@ -15,12 +15,13 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 20, + "type": "coordinator-key", "name": "StreamsGroupTargetAssignmentMetadataKey", - "validVersions": "20", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "20", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json index b35fc7e1ddade..b9de317cbde6e 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 20, + "type": "coordinator-value", "name": "StreamsGroupTargetAssignmentMetadataValue", "validVersions": "0", "flexibleVersions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json index 84b6599573d9a..ac2b8d5932aba 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json @@ -15,12 +15,13 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 23, + "type": "coordinator-key", "name": "StreamsGroupTopologyKey", - "validVersions": "23", + "validVersions": "0", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "23", + { "name": "GroupId", "type": "string", "versions": "0", "about": "The group ID." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json index 4fa702c34e9b7..26ac1ff66750b 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json @@ -15,7 +15,8 @@ // The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "type": "data", + "apiKey": 23, + "type": "coordinator-value", "name": "StreamsGroupTopologyValue", "validVersions": "0", "flexibleVersions": "0+", From fbed9a8bb1c8cfb0b51467a2034461061453015a Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 19 Dec 2024 18:08:27 +0100 Subject: [PATCH 13/14] fix --- .../scala/kafka/coordinator/group/GroupMetadataManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index b5d0348db30ef..653430f730442 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1172,7 +1172,7 @@ object GroupMetadataManager { val key = new LegacyOffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort) OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) - case CoordinatorRecordType.OFFSET_COMMIT | CoordinatorRecordType.LEGACY_OFFSET_COMMIT => + case CoordinatorRecordType.OFFSET_COMMIT => // version 1 refers to offset commit. val key = new OffsetCommitKey(new ByteBufferAccessor(buffer), 0.toShort) OffsetKey(version, GroupTopicPartition(key.group, new TopicPartition(key.topic, key.partition))) From 68451a8ea3cbf75568b62fabaca246d5ad1cde7f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 20 Dec 2024 07:51:58 +0100 Subject: [PATCH 14/14] rename methods --- .../kafka/coordinator/group/GroupCoordinatorShard.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 8360240ce478f..f65919019e707 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -759,7 +759,7 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { offsetMetadataManager.onNewMetadataImage(newImage, delta); } - private static OffsetCommitKey convertOffsetCommitKeyV0( + private static OffsetCommitKey convertLegacyOffsetCommitKey( LegacyOffsetCommitKey key ) { return new OffsetCommitKey() @@ -768,7 +768,7 @@ private static OffsetCommitKey convertOffsetCommitKeyV0( .setPartition(key.partition()); } - private static OffsetCommitValue convertOffsetCommitValueV0( + private static OffsetCommitValue convertLegacyOffsetCommitValue( LegacyOffsetCommitValue value ) { if (value == null) return null; @@ -812,8 +812,8 @@ public void replay( offsetMetadataManager.replay( offset, producerId, - convertOffsetCommitKeyV0((LegacyOffsetCommitKey) key.message()), - convertOffsetCommitValueV0((LegacyOffsetCommitValue) Utils.messageOrNull(value)) + convertLegacyOffsetCommitKey((LegacyOffsetCommitKey) key.message()), + convertLegacyOffsetCommitValue((LegacyOffsetCommitValue) Utils.messageOrNull(value)) ); break;