Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ project(':core') {
api libs.scalaLibrary

implementation project(':server-common')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
implementation project(':metadata')
Expand Down Expand Up @@ -1345,6 +1346,66 @@ project(':metadata') {
}
}

project(':group-coordinator:group-coordinator-api') {
base {
archivesName = "kafka-group-coordinator-api"
}

dependencies {
implementation project(':clients')
}

task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile

doLast {
def data = [
commitId: commitId,
version: version,
]

receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}

sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}

jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}

clean.doFirst {
delete "$buildDir/kafka/"
}

javadoc {
include "**/org/apache/kafka/coordinator/group/api/**"
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-group-coordinator.xml")
}
}

project(':group-coordinator') {
base {
archivesName = "kafka-group-coordinator"
Expand All @@ -1358,6 +1419,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':storage')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
Expand Down Expand Up @@ -2902,6 +2964,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server-side partition assignor for consumer groups used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
* The new consumer group protocol is in preview so this interface is considered
* unstable until Apache Kafka 4.0.
*/
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;
import java.util.Objects;

/**
* The partition assignment for a consumer group.
*/
@InterfaceStability.Unstable
public class GroupAssignment {
/**
* The member assignments keyed by member id.
Expand All @@ -31,8 +34,7 @@ public class GroupAssignment {
public GroupAssignment(
Map<String, MemberAssignment> members
) {
Objects.requireNonNull(members);
this.members = members;
this.members = Objects.requireNonNull(members);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that not all interfaces have @InterfaceStability.Unstable. Could you share the context to me?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For another, GroupSpec#MemberSubscriptionSpec return the interface but GroupSpec#memberAssignment return a map struct. If it is public API now, maybe return MemberAssignment is more flexible if we want to enrich it in the future?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that not all interfaces have @InterfaceStability.Unstable. Could you share the context to me?

Fixed. Thanks.

For another, GroupSpec#MemberSubscriptionSpec return the interface but GroupSpec#memberAssignment return a map struct. If it is public API now, maybe return MemberAssignment is more flexible if we want to enrich it in the future?

Good question. Reusing MemberAssignment is not ideal because we want to have the ability to pass internal objects. In this case, it could be backed by the Assignment. So, we could think of using an interface though.

On thing to consider is whether the MemberAssignment returned by the assignor would always be the same as the one provided to the assignor. The actual assignment (the map) is for sure. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we could think of using an interface though.

+1

On thing to consider is whether the MemberAssignment returned by the assignor would always be the same as the one provided to the assignor. The actual assignment (the map) is for sure. What do you think?

not sure whether I have got the point. The member assignment in GroupAssignment returned from PartitionAssignor#assign is the new assignment. Hence, it should be different to assignment in GroupSpec.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You suggested to use MemberAssignment in your previous comment. MemberAssignment is already used in GroupAssignment returned by PartitionAssignor#assign. I am not sure if you meant to reuse the same MemberAssignment in both places.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You suggested to use MemberAssignment in your previous comment. MemberAssignment is already used in GroupAssignment returned by PartitionAssignor#assign. I am not sure if you meant to reuse the same MemberAssignment in both places.

ummm, I guess I did not notice MemberAssignment is used by GroupAssignment before :_

Anyway, having a interface to replace Map struct is good way to me. Or we can keep current version if the tag "unstable" give the room to modify those public stuff :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am playing with this. I will suggest something a bit later today.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 I gave it a go. You can check the last commit to see how it looks like. I also moved some classes between internal packages while here.

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
* The group metadata specifications required to compute the target assignment.
*/
@InterfaceStability.Unstable
public interface GroupSpec {
/**
* @return All the member Ids of the consumer group.
Expand All @@ -45,18 +45,18 @@ public interface GroupSpec {
/**
* Gets the member subscription specification for a member.
*
* @param memberId The member Id.
* @param memberId The member Id.
* @return The member's subscription metadata.
* @throws IllegalArgumentException If the member Id isn't found.
*/
MemberSubscriptionSpec memberSubscription(String memberId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why did we rename it to MemberSubscription?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have MemberAssignment, MemberSubscriptionSpec looked weird. So I renamed it to align them.

MemberSubscription memberSubscription(String memberId);

/**
* Gets the current assignment of the member.
*
* @param memberId The member Id.
* @return A map of topic Ids to sets of partition numbers.
* An empty map is returned if the member Id isn't found.
* @param memberId The member Id.
* @return The member's assignment or an empty assignment if the
* member does not have one.
*/
Map<Uuid, Set<Integer>> memberAssignment(String memberId);
MemberAssignment memberAssignment(String memberId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;
import java.util.Set;

/**
* The partition assignment for a consumer group member.
*/
@InterfaceStability.Unstable
public interface MemberAssignment {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! I was wondering if this should be in a separate PR? Since it's not related to moving files to different modules and we're adding a new interface and making changes as a consequence?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have done a separate PR but it is also fine doing it here as the main reviewer asked for it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i neglected this PR aims to complete the migration, but I do love this change 😄

/**
* @return The assigned partitions keyed by topic Ids.
*/
Map<Uuid, Set<Integer>> partitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Optional;
import java.util.Set;

/**
* Interface representing the subscription metadata for a group member.
*/
public interface MemberSubscriptionSpec {
@InterfaceStability.Unstable
public interface MemberSubscription {
/**
* Gets the rack Id if present.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server-side partition assignor used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
* The new consumer group protocol is in preview so this interface is considered
* unstable until Apache Kafka 4.0.
*/
@InterfaceStability.Unstable
public interface PartitionAssignor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.errors.ApiException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* The subscription type followed by a consumer group.
*/
@InterfaceStability.Unstable
public enum SubscriptionType {
/**
* A homogeneous subscription type means that all the members
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
Expand Down Expand Up @@ -1926,7 +1926,7 @@ private Assignment updateTargetAssignment(

MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.targetPartitions());
return new Assignment(newMemberAssignment.partitions());
} else {
return Assignment.EMPTY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.server.common.TopicIdPartition;

import java.util.Collection;
Expand All @@ -42,7 +45,7 @@ protected static void addPartitionToAssignment(
int partition
) {
memberAssignments.get(memberId)
.targetPartitions()
.partitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
}
Expand Down
Loading