diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 72728df93..daa853ffa 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -6,7 +6,7 @@ from kafka.admin._configs import ( AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) -from kafka.admin._groups import MemberToRemove +from kafka.admin._groups import GroupState, GroupType, MemberToRemove from kafka.admin._partitions import NewPartitions, OffsetSpec from kafka.admin._topics import NewTopic from kafka.admin._users import ( @@ -18,6 +18,7 @@ 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', 'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', 'UpdateFeatureType', - 'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__ + 'GroupState', 'GroupType', 'MemberToRemove', + 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__ 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index e7937d8cd..2da8238c0 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -2,6 +2,7 @@ from __future__ import annotations +from enum import Enum import itertools import logging from collections import defaultdict @@ -19,6 +20,7 @@ ConsumerProtocolAssignment, ConsumerProtocolSubscription, ConsumerProtocolType, ) from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.util import EnumHelper if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager @@ -106,10 +108,16 @@ def describe_groups(self, group_ids, group_coordinator_id=None, include_authoriz # -- List groups -------------------------------------------------- - def _list_groups_request(self): - # TODO: KIP-518: StatesFilter - # TODO: KIP-848: TypesFilter - return ListGroupsRequest() + @staticmethod + def _list_groups_request(states_filter=None, types_filter=None): + kwargs = {'min_version': 0} + if states_filter: + kwargs['states_filter'] = [GroupState.value_for(s) for s in states_filter] + kwargs['min_version'] = 4 + if types_filter: + kwargs['types_filter'] = [GroupType.value_for(t) for t in types_filter] + kwargs['min_version'] = 5 + return ListGroupsRequest(**kwargs) def _list_groups_process_response(self, response): """Process a ListGroupsResponse into a list of groups.""" @@ -120,17 +128,18 @@ def _list_groups_process_response(self, response): .format(response)) return [group.to_dict() for group in response.groups] - async def _async_list_groups(self, broker_ids=None): + async def _async_list_groups(self, broker_ids=None, states_filter=None, types_filter=None): if broker_ids is None: broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()] groups = [] for broker_id in broker_ids: - request = self._list_groups_request() + request = self._list_groups_request(states_filter=states_filter, + types_filter=types_filter) response = await self._manager.send(request, node_id=broker_id) groups.extend(self._list_groups_process_response(response)) return groups - def list_groups(self, broker_ids=None): + def list_groups(self, broker_ids=None, states_filter=None, types_filter=None): """List all consumer groups known to the cluster. This returns a list of Group dicts. The tuples are @@ -150,11 +159,22 @@ def list_groups(self, broker_ids=None): groups. If set to None, will query all brokers in the cluster. Explicitly specifying broker(s) can be useful for determining which consumer groups are coordinated by those broker(s). Default: None + states_filter (list, optional): Filter groups by state. Values + may be :class:`GroupState` members, their string names + (case-insensitive, hyphen or underscore), or raw protocol + strings (e.g. ``['Stable', 'Empty']``). Requires broker + >= 3.0 (KIP-518). Default: None (no filter). + types_filter (list, optional): Filter groups by type. Values + may be :class:`GroupType` members, their string names + (case-insensitive), or raw protocol strings (e.g. + ``['consumer', 'classic', 'share']``). Requires broker + >= 4.0 (KIP-848). Default: None (no filter). Returns: List of group data dicts, with key/vals from ListGroupsRequest """ - return self._manager.run(self._async_list_groups, broker_ids) + return self._manager.run(self._async_list_groups, broker_ids, + states_filter, types_filter) # -- List group offsets ------------------------------------------- @@ -604,3 +624,23 @@ def __eq__(self, other): def __hash__(self): return hash((self.member_id, self.group_instance_id, self.reason)) + + +class GroupState(EnumHelper, str, Enum): + """Consumer group states as reported by the broker (KIP-518, KIP-848).""" + UNKNOWN = 'Unknown' + PREPARING_REBALANCE = 'PreparingRebalance' + COMPLETING_REBALANCE = 'CompletingRebalance' + STABLE = 'Stable' + DEAD = 'Dead' + EMPTY = 'Empty' + ASSIGNING = 'Assigning' + RECONCILING = 'Reconciling' + + +class GroupType(EnumHelper, str, Enum): + """Consumer group protocol types (KIP-848).""" + UNKNOWN = 'Unknown' + CLASSIC = 'classic' + CONSUMER = 'consumer' + SHARE = 'share' diff --git a/kafka/cli/admin/groups/list.py b/kafka/cli/admin/groups/list.py index 5acd1475a..50c1395b3 100644 --- a/kafka/cli/admin/groups/list.py +++ b/kafka/cli/admin/groups/list.py @@ -1,11 +1,28 @@ +from kafka.admin import GroupState, GroupType + + class ListGroups: COMMAND = 'list' HELP = 'List Groups' @classmethod def add_arguments(cls, parser): - pass + state_choices = sorted(s.value for s in GroupState) + type_choices = sorted(t.value for t in GroupType) + parser.add_argument( + '--state', type=str, action='append', dest='states_filter', default=[], + help='Filter by group state (repeatable). One of: ' + + ', '.join(state_choices) + + '. Case-insensitive; names also accepted. ' + 'Requires broker >= 3.0 (KIP-518).') + parser.add_argument( + '--type', type=str, action='append', dest='types_filter', default=[], + help='Filter by group type (repeatable). One of: ' + + ', '.join(type_choices) + + '. Requires broker >= 4.0 (KIP-848).') @classmethod def command(cls, client, args): - return client.list_groups() + return client.list_groups( + states_filter=args.states_filter or None, + types_filter=args.types_filter or None) diff --git a/test/admin/test_admin_groups.py b/test/admin/test_admin_groups.py index 43eca03cc..bb3041d42 100644 --- a/test/admin/test_admin_groups.py +++ b/test/admin/test_admin_groups.py @@ -1,6 +1,7 @@ import pytest -from kafka.admin import KafkaAdminClient, MemberToRemove +from kafka.admin import GroupState, GroupType, KafkaAdminClient, MemberToRemove +import kafka.errors as Errors from kafka.errors import ( GroupIdNotFoundError, GroupSubscribedToTopicError, @@ -8,6 +9,7 @@ UnknownMemberIdError, UnsupportedVersionError, ) +from kafka.protocol.admin import ListGroupsRequest, ListGroupsResponse from kafka.protocol.consumer import ( LeaveGroupRequest, LeaveGroupResponse, OffsetCommitRequest, OffsetCommitResponse, @@ -426,3 +428,90 @@ def test_fallback_requires_member_id(self, broker, admin): [MemberToRemove()], group_coordinator_id=0, ) + + +# --------------------------------------------------------------------------- +# list_groups +# --------------------------------------------------------------------------- + + +def _capture_list_groups(captured, response=None): + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = ListGroupsRequest.decode( + request_bytes, version=api_version, header=True) + captured['version'] = api_version + if response is not None: + return response + return ListGroupsResponse(throttle_time_ms=0, error_code=0, groups=[]) + return handler + + +class TestListGroupsMockBroker: + + def test_no_filters_uses_default_version(self, broker, admin): + captured = {} + broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured)) + admin.list_groups() + req = captured['request'] + assert req.states_filter in (None, []) + assert req.types_filter in (None, []) + + def test_states_filter_propagates(self, broker, admin): + captured = {} + broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured)) + admin.list_groups(states_filter=['Stable', 'Empty']) + assert captured['version'] >= 4 + assert list(captured['request'].states_filter) == ['Stable', 'Empty'] + + def test_types_filter_propagates(self, broker, admin): + captured = {} + broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured)) + admin.list_groups(types_filter=['consumer', 'share']) + assert captured['version'] >= 5 + assert list(captured['request'].types_filter) == ['consumer', 'share'] + + def test_both_filters_propagate(self, broker, admin): + captured = {} + broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured)) + admin.list_groups(states_filter=['Stable'], types_filter=['consumer']) + assert captured['version'] >= 5 + assert list(captured['request'].states_filter) == ['Stable'] + assert list(captured['request'].types_filter) == ['consumer'] + + def test_enum_filters_normalize_to_protocol_strings(self, broker, admin): + """Enum members, lowercase names, and raw protocol strings all produce + the canonical wire value.""" + captured = {} + broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured)) + admin.list_groups( + states_filter=[GroupState.STABLE, 'empty', 'preparing-rebalance'], + types_filter=[GroupType.CONSUMER, 'CLASSIC']) + assert list(captured['request'].states_filter) == [ + 'Stable', 'Empty', 'PreparingRebalance'] + assert list(captured['request'].types_filter) == ['consumer', 'classic'] + + def test_response_groups_returned(self, broker, admin): + ListedGroup = ListGroupsResponse.ListedGroup + response = ListGroupsResponse( + throttle_time_ms=0, error_code=0, + groups=[ + ListedGroup(group_id='g1', protocol_type='consumer', + group_state='Stable', group_type='consumer'), + ListedGroup(group_id='g2', protocol_type='consumer', + group_state='Empty', group_type='classic'), + ]) + broker.respond_fn(ListGroupsRequest, _capture_list_groups({}, response)) + result = admin.list_groups(states_filter=['Stable', 'Empty'], + types_filter=['consumer', 'classic']) + ids = sorted(g['group_id'] for g in result) + assert ids == ['g1', 'g2'] + + @pytest.mark.parametrize("broker", [(2, 3, 0)], indirect=True) + def test_states_filter_rejected_on_pre_518_broker(self, broker, admin): + with pytest.raises(Errors.IncompatibleBrokerVersion): + admin.list_groups(states_filter=['Stable']) + + @pytest.mark.parametrize("broker", [(3, 7, 0)], indirect=True) + def test_types_filter_rejected_on_pre_848_broker(self, broker, admin): + with pytest.raises(Errors.IncompatibleBrokerVersion): + admin.list_groups(types_filter=['consumer'])