Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public ConsumerGroupDescription(String groupId,
}

ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
ConsumerGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2716,8 +2716,9 @@ void handleResponse(AbstractResponse abstractResponse) {
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
partitions = new HashSet<>(assignment.partitions());
}
final MemberDescription memberDescription =
new MemberDescription(groupMember.memberId(),
final MemberDescription memberDescription = new MemberDescription(
groupMember.memberId(),
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(partitions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Objects;
import java.util.Optional;

/**
* A detailed description of a single group instance in the cluster.
*/
public class MemberDescription {
private final String memberId;
private final Optional<String> groupInstanceId;
private final String clientId;
private final String host;
private final MemberAssignment assignment;

public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) {
MemberDescription(String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment) {
this.memberId = memberId == null ? "" : memberId;
this.groupInstanceId = groupInstanceId;
this.clientId = clientId == null ? "" : clientId;
this.host = host == null ? "" : host;
this.assignment = assignment == null ?
new MemberAssignment(Collections.<TopicPartition>emptySet()) : assignment;
new MemberAssignment(Collections.emptySet()) : assignment;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -62,6 +67,13 @@ public String consumerId() {
return memberId;
}

/**
* The instance id of the group member.
*/
public Optional<String> groupInstanceId() {
return groupInstanceId;
}

/**
* The client id of the group member.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ public DescribeGroupsResponse(Struct struct, short version) {

public static DescribedGroupMember groupMember(
final String memberId,
final String groupInstanceId,
final String clientId,
final String clientHost,
final byte[] assignment,
final byte[] metadata) {
return new DescribedGroupMember()
.setMemberId(memberId)
.setGroupInstanceId(groupInstanceId)
.setClientId(clientId)
.setClientHost(clientHost)
.setMemberAssignment(assignment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"name": "DescribeGroupsRequest",
// Versions 1 and 2 are the same as version 0.
// Starting in version 3, authorized operations can be requested.
"validVersions": "0-3",
// Starting in version 4, the response will include group.instance.id info for members.
"validVersions": "0-4",
"fields": [
{ "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The names of the groups to describe" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
// Version 1 added throttle time.
// Starting in version 2, on quota violation, brokers send out responses before throttling.
// Starting in version 3, brokers can send authorized operations.
"validVersions": "0-3",
// Starting in version 4, the response will include group.instance.id info for members.
"validVersions": "0-4",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
Expand All @@ -42,6 +43,9 @@
"about": "The group members.", "fields": [
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "4+",
"nullableVersions": "4+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "ClientId", "type": "string", "versions": "0+",
"about": "The client ID used in the member's latest join group request." },
{ "name": "ClientHost", "type": "string", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
Expand Down Expand Up @@ -1151,7 +1152,7 @@ public void testDescribeConsumerGroups() throws Exception {

DescribeGroupsResponseData data = new DescribeGroupsResponseData();

//Retriable errors should be retried
//Retriable errors should be retried
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-0",
Errors.COORDINATOR_LOAD_IN_PROGRESS,
Expand Down Expand Up @@ -1204,16 +1205,21 @@ public void testDescribeConsumerGroups() throws Exception {
byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
memberAssignment.get(memberAssignmentBytes);

DescribedGroupMember memberOne = DescribeGroupsResponse.groupMember("0", "instance1", "clientId0", "clientHost", memberAssignmentBytes, null);
DescribedGroupMember memberTwo = DescribeGroupsResponse.groupMember("1", "instance2", "clientId1", "clientHost", memberAssignmentBytes, null);

List<MemberDescription> expectedMemberDescriptions = new ArrayList<>();
expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne,
new MemberAssignment(new HashSet<>(topicPartitions))));
expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo,
new MemberAssignment(new HashSet<>(topicPartitions))));
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-0",
Errors.NONE,
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
asList(
DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
),
asList(memberOne, memberTwo),
Collections.emptySet()));

env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
Expand All @@ -1224,6 +1230,7 @@ public void testDescribeConsumerGroups() throws Exception {
assertEquals(1, result.describedGroups().size());
assertEquals("group-0", groupDescription.groupId());
assertEquals(2, groupDescription.members().size());
assertEquals(expectedMemberDescriptions, groupDescription.members());
}
}

Expand Down Expand Up @@ -1266,8 +1273,8 @@ public void testDescribeMultipleConsumerGroups() throws Exception {
ConsumerProtocol.PROTOCOL_TYPE,
"",
asList(
DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));

Expand All @@ -1279,8 +1286,8 @@ public void testDescribeMultipleConsumerGroups() throws Exception {
"connect",
"",
asList(
DescribeGroupsResponse.groupMember("0", "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", "clientId1", "clientHost", memberAssignmentBytes, null)
DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));

Expand Down Expand Up @@ -1511,6 +1518,15 @@ public void testIncrementalAlterConfigs() throws Exception {
}
}

private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
Optional.ofNullable(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
assignment);
}

@SafeVarargs
private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
for (T element : elements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ private DescribeGroupsResponse createDescribeGroupResponse() {
String clientId = "consumer-1";
String clientHost = "localhost";
DescribeGroupsResponseData describeGroupsResponseData = new DescribeGroupsResponseData();
DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId",
DescribeGroupsResponseData.DescribedGroupMember member = DescribeGroupsResponse.groupMember("memberId", null,
clientId, clientHost, new byte[0], new byte[0]);
DescribeGroupsResponseData.DescribedGroup metadata = DescribeGroupsResponse.groupMetadata("test-group", Errors.NONE,
"STABLE", "consumer", "roundrobin", asList(member), Collections.emptySet());
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val members = summary.members.map { member =>
new DescribeGroupsResponseData.DescribedGroupMember()
.setMemberId(member.memberId)
.setGroupInstanceId(member.groupInstanceId.orNull)
.setClientId(member.clientId)
.setClientHost(member.clientHost)
.setMemberAssignment(member.assignment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2515,9 +2515,30 @@ class GroupCoordinatorTest {
}

@Test
def testDescribeGroupStable() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
def testDescribeGroupStableForDynamicMember() {
val joinGroupResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)

EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))

val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)

EasyMock.reset(replicaManager)
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
assertEquals(Errors.NONE, error)
assertEquals(protocolType, summary.protocolType)
assertEquals("range", summary.protocol)
assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
}

@Test
def testDescribeGroupStableForStaticMember() {
val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
Expand All @@ -2535,6 +2556,7 @@ class GroupCoordinatorTest {
assertEquals(protocolType, summary.protocolType)
assertEquals("range", summary.protocol)
assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
assertEquals(List(leaderInstanceId), summary.members.map(_.groupInstanceId))
}

@Test
Expand Down