Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -42,9 +43,11 @@ public class ConsumerGroupDescription {
private final GroupState groupState;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
private final Optional<Integer> groupEpoch;
private final Optional<Integer> targetAssignmentEpoch;

/**
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node)}.
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
Expand All @@ -57,7 +60,7 @@ public ConsumerGroupDescription(String groupId,
}

/**
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node, Set)}.
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
Expand All @@ -71,7 +74,7 @@ public ConsumerGroupDescription(String groupId,
}

/**
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set)}.
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
Expand All @@ -90,25 +93,8 @@ public ConsumerGroupDescription(String groupId,
this.groupState = GroupState.parse(state.name());
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}

public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
GroupState groupState,
Node coordinator) {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, groupState, coordinator, Collections.emptySet());
}

public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
GroupState groupState,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, groupState, coordinator, authorizedOperations);
this.groupEpoch = Optional.empty();
this.targetAssignmentEpoch = Optional.empty();
}

public ConsumerGroupDescription(String groupId,
Expand All @@ -118,7 +104,9 @@ public ConsumerGroupDescription(String groupId,
GroupType type,
GroupState groupState,
Node coordinator,
Set<AclOperation> authorizedOperations) {
Set<AclOperation> authorizedOperations,
Optional<Integer> groupEpoch,
Optional<Integer> targetAssignmentEpoch) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
Expand All @@ -127,6 +115,8 @@ public ConsumerGroupDescription(String groupId,
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
this.groupEpoch = groupEpoch;
this.targetAssignmentEpoch = targetAssignmentEpoch;
}

@Override
Expand All @@ -141,12 +131,15 @@ public boolean equals(final Object o) {
type == that.type &&
groupState == that.groupState &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
Objects.equals(authorizedOperations, that.authorizedOperations) &&
Objects.equals(groupEpoch, that.groupEpoch) &&
Objects.equals(targetAssignmentEpoch, that.targetAssignmentEpoch);
}

@Override
public int hashCode() {
return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, authorizedOperations);
return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator,
authorizedOperations, groupEpoch, targetAssignmentEpoch);
}

/**
Expand Down Expand Up @@ -215,6 +208,24 @@ public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}

/**
* The epoch of the consumer group.
* The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it
* is a {@link GroupType#CLASSIC} group.
*/
public Optional<Integer> groupEpoch() {
return groupEpoch;
}

/**
* The epoch of the target assignment.
* The optional is set to an integer if it is a {@link GroupType#CONSUMER} group, and to empty if it
* is a {@link GroupType#CLASSIC} group.
*/
public Optional<Integer> targetAssignmentEpoch() {
return targetAssignmentEpoch;
}

@Override
public String toString() {
return "(groupId=" + groupId +
Expand All @@ -225,6 +236,8 @@ public String toString() {
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
", groupEpoch=" + groupEpoch.orElse(null) +
", targetAssignmentEpoch=" + targetAssignmentEpoch.orElse(null) +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.GroupType;

import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -30,13 +32,18 @@ public class MemberDescription {
private final String host;
private final MemberAssignment assignment;
private final Optional<MemberAssignment> targetAssignment;
private final Optional<Integer> memberEpoch;
private final Optional<Boolean> upgraded;

public MemberDescription(String memberId,
public MemberDescription(
String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
Optional<MemberAssignment> targetAssignment
Optional<MemberAssignment> targetAssignment,
Optional<Integer> memberEpoch,
Optional<Boolean> upgraded
) {
this.memberId = memberId == null ? "" : memberId;
this.groupInstanceId = groupInstanceId;
Expand All @@ -45,8 +52,38 @@ public MemberDescription(String memberId,
this.assignment = assignment == null ?
new MemberAssignment(Collections.emptySet()) : assignment;
this.targetAssignment = targetAssignment;
this.memberEpoch = memberEpoch;
this.upgraded = upgraded;
}

/**
* @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
*/
@Deprecated
public MemberDescription(
String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
Optional<MemberAssignment> targetAssignment
) {
this(
memberId,
groupInstanceId,
clientId,
host,
assignment,
targetAssignment,
Optional.empty(),
Optional.empty()
);
}

/**
* @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
*/
@Deprecated
public MemberDescription(
String memberId,
Optional<String> groupInstanceId,
Expand All @@ -64,6 +101,10 @@ public MemberDescription(
);
}

/**
* @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
*/
@Deprecated
public MemberDescription(String memberId,
String clientId,
String host,
Expand All @@ -81,12 +122,14 @@ public boolean equals(Object o) {
clientId.equals(that.clientId) &&
host.equals(that.host) &&
assignment.equals(that.assignment) &&
targetAssignment.equals(that.targetAssignment);
targetAssignment.equals(that.targetAssignment) &&
memberEpoch.equals(that.memberEpoch) &&
upgraded.equals(that.upgraded);
}

@Override
public int hashCode() {
return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment);
return Objects.hash(memberId, groupInstanceId, clientId, host, assignment, targetAssignment, memberEpoch, upgraded);
}

/**
Expand Down Expand Up @@ -131,13 +174,35 @@ public Optional<MemberAssignment> targetAssignment() {
return targetAssignment;
}

/**
* The epoch of the group member.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's also mention that this is only provided in the CONSUMER group case.

* The optional is set to an integer if the member is in a {@link GroupType#CONSUMER} group, and to empty if it
* is in a {@link GroupType#CLASSIC} group.
*/
public Optional<Integer> memberEpoch() {
return memberEpoch;
}

/**
* The flag indicating whether a member within a {@link GroupType#CONSUMER} group uses the
* {@link GroupType#CONSUMER} protocol.
* The optional is set to true if it does, to false if it does not, and to empty if it is unknown or if the group
* is a {@link GroupType#CLASSIC} group.
*/
public Optional<Boolean> upgraded() {
return upgraded;
}

@Override
public String toString() {
return "(memberId=" + memberId +
", groupInstanceId=" + groupInstanceId.orElse("null") +
", clientId=" + clientId +
", host=" + host +
", assignment=" + assignment +
", targetAssignment=" + targetAssignment + ")";
", targetAssignment=" + targetAssignment +
", memberEpoch=" + memberEpoch.orElse(null) +
", upgraded=" + upgraded.orElse(null) +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public ApiResult<CoordinatorKey, ClassicGroupDescription> handleResponse(
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(partitions)));
new MemberAssignment(partitions),
Optional.empty(),
Optional.empty(),
Optional.empty()));
});

final ClassicGroupDescription classicGroupDescription =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ private ApiResult<CoordinatorKey, ConsumerGroupDescription> handledConsumerGroup
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(convertAssignment(groupMember.assignment())),
Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment())))
Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment()))),
Optional.of(groupMember.memberEpoch()),
groupMember.memberType() == -1 ? Optional.empty() : Optional.of(groupMember.memberType() == 1)
))
);

Expand All @@ -235,7 +237,9 @@ private ApiResult<CoordinatorKey, ConsumerGroupDescription> handledConsumerGroup
GroupType.CONSUMER,
GroupState.parse(describedGroup.groupState()),
coordinator,
authorizedOperations
authorizedOperations,
Optional.of(describedGroup.groupEpoch()),
Optional.of(describedGroup.assignmentEpoch())
);
completed.put(groupIdKey, consumerGroupDescription);
}
Expand Down Expand Up @@ -281,7 +285,10 @@ private ApiResult<CoordinatorKey, ConsumerGroupDescription> handledClassicGroupR
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(partitions)));
new MemberAssignment(partitions),
Optional.empty(),
Optional.empty(),
Optional.empty()));
}
final ConsumerGroupDescription consumerGroupDescription =
new ConsumerGroupDescription(groupIdKey.idValue, protocolType.isEmpty(),
Expand All @@ -290,7 +297,9 @@ private ApiResult<CoordinatorKey, ConsumerGroupDescription> handledClassicGroupR
GroupType.CLASSIC,
GroupState.parse(describedGroup.groupState()),
coordinator,
authorizedOperations);
authorizedOperations,
Optional.empty(),
Optional.empty());
completed.put(groupIdKey, consumerGroupDescription);
} else {
failed.put(groupIdKey, new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4057,6 +4057,7 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception {
.setTopicName("foo")
.setPartitions(singletonList(1))
)))
.setMemberType((byte) 1)
)),
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp2")
Expand Down Expand Up @@ -4110,14 +4111,18 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception {
),
Optional.of(new MemberAssignment(
Collections.singleton(new TopicPartition("foo", 1))
))
)),
Optional.of(10),
Optional.of(true)
)
),
"range",
GroupType.CONSUMER,
GroupState.STABLE,
env.cluster().controller(),
Collections.emptySet()
Collections.emptySet(),
Optional.of(10),
Optional.of(10)
));
expectedResult.put("grp2", new ConsumerGroupDescription(
"grp2",
Expand All @@ -4130,14 +4135,19 @@ public void testDescribeOldAndNewConsumerGroups() throws Exception {
"clientHost",
new MemberAssignment(
Collections.singleton(new TopicPartition("bar", 0))
)
),
Optional.empty(),
Optional.empty(),
Optional.empty()
)
),
"range",
GroupType.CLASSIC,
GroupState.STABLE,
env.cluster().controller(),
Collections.emptySet()
Collections.emptySet(),
Optional.empty(),
Optional.empty()
));

assertEquals(expectedResult, result.all().get());
Expand Down Expand Up @@ -8674,7 +8684,10 @@ private static MemberDescription convertToMemberDescriptions(DescribedGroupMembe
Optional.ofNullable(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
assignment);
assignment,
Optional.empty(),
Optional.empty(),
Optional.empty());
}

private static ShareMemberDescription convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member,
Expand Down
Loading