Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from kafka.admin._configs import (
AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType,
ConfigType, ConfigSourceType)
from kafka.admin._groups import MemberToRemove
from kafka.admin._groups import GroupState, GroupType, MemberToRemove
from kafka.admin._partitions import NewPartitions, OffsetSpec
from kafka.admin._topics import NewTopic
from kafka.admin._users import (
Expand All @@ -18,6 +18,7 @@
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'UpdateFeatureType',
'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__
'GroupState', 'GroupType', 'MemberToRemove',
'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
56 changes: 48 additions & 8 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from enum import Enum
import itertools
import logging
from collections import defaultdict
Expand All @@ -19,6 +20,7 @@
ConsumerProtocolAssignment, ConsumerProtocolSubscription, ConsumerProtocolType,
)
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import EnumHelper

if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager
Expand Down Expand Up @@ -106,10 +108,16 @@ def describe_groups(self, group_ids, group_coordinator_id=None, include_authoriz

# -- List groups --------------------------------------------------

def _list_groups_request(self):
# TODO: KIP-518: StatesFilter
# TODO: KIP-848: TypesFilter
return ListGroupsRequest()
@staticmethod
def _list_groups_request(states_filter=None, types_filter=None):
kwargs = {'min_version': 0}
if states_filter:
kwargs['states_filter'] = [GroupState.value_for(s) for s in states_filter]
kwargs['min_version'] = 4
if types_filter:
kwargs['types_filter'] = [GroupType.value_for(t) for t in types_filter]
kwargs['min_version'] = 5
return ListGroupsRequest(**kwargs)

def _list_groups_process_response(self, response):
"""Process a ListGroupsResponse into a list of groups."""
Expand All @@ -120,17 +128,18 @@ def _list_groups_process_response(self, response):
.format(response))
return [group.to_dict() for group in response.groups]

async def _async_list_groups(self, broker_ids=None):
async def _async_list_groups(self, broker_ids=None, states_filter=None, types_filter=None):
if broker_ids is None:
broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()]
groups = []
for broker_id in broker_ids:
request = self._list_groups_request()
request = self._list_groups_request(states_filter=states_filter,
types_filter=types_filter)
response = await self._manager.send(request, node_id=broker_id)
groups.extend(self._list_groups_process_response(response))
return groups

def list_groups(self, broker_ids=None):
def list_groups(self, broker_ids=None, states_filter=None, types_filter=None):
"""List all consumer groups known to the cluster.

This returns a list of Group dicts. The tuples are
Expand All @@ -150,11 +159,22 @@ def list_groups(self, broker_ids=None):
groups. If set to None, will query all brokers in the cluster.
Explicitly specifying broker(s) can be useful for determining which
consumer groups are coordinated by those broker(s). Default: None
states_filter (list, optional): Filter groups by state. Values
may be :class:`GroupState` members, their string names
(case-insensitive, hyphen or underscore), or raw protocol
strings (e.g. ``['Stable', 'Empty']``). Requires broker
>= 3.0 (KIP-518). Default: None (no filter).
types_filter (list, optional): Filter groups by type. Values
may be :class:`GroupType` members, their string names
(case-insensitive), or raw protocol strings (e.g.
``['consumer', 'classic', 'share']``). Requires broker
>= 4.0 (KIP-848). Default: None (no filter).

Returns:
List of group data dicts, with key/vals from ListGroupsRequest
"""
return self._manager.run(self._async_list_groups, broker_ids)
return self._manager.run(self._async_list_groups, broker_ids,
states_filter, types_filter)

# -- List group offsets -------------------------------------------

Expand Down Expand Up @@ -604,3 +624,23 @@ def __eq__(self, other):

def __hash__(self):
return hash((self.member_id, self.group_instance_id, self.reason))


class GroupState(EnumHelper, str, Enum):
"""Consumer group states as reported by the broker (KIP-518, KIP-848)."""
UNKNOWN = 'Unknown'
PREPARING_REBALANCE = 'PreparingRebalance'
COMPLETING_REBALANCE = 'CompletingRebalance'
STABLE = 'Stable'
DEAD = 'Dead'
EMPTY = 'Empty'
ASSIGNING = 'Assigning'
RECONCILING = 'Reconciling'


class GroupType(EnumHelper, str, Enum):
"""Consumer group protocol types (KIP-848)."""
UNKNOWN = 'Unknown'
CLASSIC = 'classic'
CONSUMER = 'consumer'
SHARE = 'share'
21 changes: 19 additions & 2 deletions kafka/cli/admin/groups/list.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
from kafka.admin import GroupState, GroupType


class ListGroups:
COMMAND = 'list'
HELP = 'List Groups'

@classmethod
def add_arguments(cls, parser):
pass
state_choices = sorted(s.value for s in GroupState)
type_choices = sorted(t.value for t in GroupType)
parser.add_argument(
'--state', type=str, action='append', dest='states_filter', default=[],
help='Filter by group state (repeatable). One of: '
+ ', '.join(state_choices)
+ '. Case-insensitive; names also accepted. '
'Requires broker >= 3.0 (KIP-518).')
parser.add_argument(
'--type', type=str, action='append', dest='types_filter', default=[],
help='Filter by group type (repeatable). One of: '
+ ', '.join(type_choices)
+ '. Requires broker >= 4.0 (KIP-848).')

@classmethod
def command(cls, client, args):
return client.list_groups()
return client.list_groups(
states_filter=args.states_filter or None,
types_filter=args.types_filter or None)
91 changes: 90 additions & 1 deletion test/admin/test_admin_groups.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import pytest

from kafka.admin import KafkaAdminClient, MemberToRemove
from kafka.admin import GroupState, GroupType, KafkaAdminClient, MemberToRemove
import kafka.errors as Errors
from kafka.errors import (
GroupIdNotFoundError,
GroupSubscribedToTopicError,
NoError,
UnknownMemberIdError,
UnsupportedVersionError,
)
from kafka.protocol.admin import ListGroupsRequest, ListGroupsResponse
from kafka.protocol.consumer import (
LeaveGroupRequest, LeaveGroupResponse,
OffsetCommitRequest, OffsetCommitResponse,
Expand Down Expand Up @@ -426,3 +428,90 @@ def test_fallback_requires_member_id(self, broker, admin):
[MemberToRemove()],
group_coordinator_id=0,
)


# ---------------------------------------------------------------------------
# list_groups
# ---------------------------------------------------------------------------


def _capture_list_groups(captured, response=None):
def handler(api_key, api_version, correlation_id, request_bytes):
captured['request'] = ListGroupsRequest.decode(
request_bytes, version=api_version, header=True)
captured['version'] = api_version
if response is not None:
return response
return ListGroupsResponse(throttle_time_ms=0, error_code=0, groups=[])
return handler


class TestListGroupsMockBroker:

def test_no_filters_uses_default_version(self, broker, admin):
captured = {}
broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured))
admin.list_groups()
req = captured['request']
assert req.states_filter in (None, [])
assert req.types_filter in (None, [])

def test_states_filter_propagates(self, broker, admin):
captured = {}
broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured))
admin.list_groups(states_filter=['Stable', 'Empty'])
assert captured['version'] >= 4
assert list(captured['request'].states_filter) == ['Stable', 'Empty']

def test_types_filter_propagates(self, broker, admin):
captured = {}
broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured))
admin.list_groups(types_filter=['consumer', 'share'])
assert captured['version'] >= 5
assert list(captured['request'].types_filter) == ['consumer', 'share']

def test_both_filters_propagate(self, broker, admin):
captured = {}
broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured))
admin.list_groups(states_filter=['Stable'], types_filter=['consumer'])
assert captured['version'] >= 5
assert list(captured['request'].states_filter) == ['Stable']
assert list(captured['request'].types_filter) == ['consumer']

def test_enum_filters_normalize_to_protocol_strings(self, broker, admin):
"""Enum members, lowercase names, and raw protocol strings all produce
the canonical wire value."""
captured = {}
broker.respond_fn(ListGroupsRequest, _capture_list_groups(captured))
admin.list_groups(
states_filter=[GroupState.STABLE, 'empty', 'preparing-rebalance'],
types_filter=[GroupType.CONSUMER, 'CLASSIC'])
assert list(captured['request'].states_filter) == [
'Stable', 'Empty', 'PreparingRebalance']
assert list(captured['request'].types_filter) == ['consumer', 'classic']

def test_response_groups_returned(self, broker, admin):
ListedGroup = ListGroupsResponse.ListedGroup
response = ListGroupsResponse(
throttle_time_ms=0, error_code=0,
groups=[
ListedGroup(group_id='g1', protocol_type='consumer',
group_state='Stable', group_type='consumer'),
ListedGroup(group_id='g2', protocol_type='consumer',
group_state='Empty', group_type='classic'),
])
broker.respond_fn(ListGroupsRequest, _capture_list_groups({}, response))
result = admin.list_groups(states_filter=['Stable', 'Empty'],
types_filter=['consumer', 'classic'])
ids = sorted(g['group_id'] for g in result)
assert ids == ['g1', 'g2']

@pytest.mark.parametrize("broker", [(2, 3, 0)], indirect=True)
def test_states_filter_rejected_on_pre_518_broker(self, broker, admin):
with pytest.raises(Errors.IncompatibleBrokerVersion):
admin.list_groups(states_filter=['Stable'])

@pytest.mark.parametrize("broker", [(3, 7, 0)], indirect=True)
def test_types_filter_rejected_on_pre_848_broker(self, broker, admin):
with pytest.raises(Errors.IncompatibleBrokerVersion):
admin.list_groups(types_filter=['consumer'])
Loading