diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index daa853ffa..985eee6f6 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -7,7 +7,7 @@ AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) from kafka.admin._groups import GroupState, GroupType, MemberToRemove -from kafka.admin._partitions import NewPartitions, OffsetSpec +from kafka.admin._partitions import NewPartitions, OffsetSpec, OffsetTimestamp from kafka.admin._topics import NewTopic from kafka.admin._users import ( ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion) @@ -19,6 +19,6 @@ 'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', 'UpdateFeatureType', 'GroupState', 'GroupType', 'MemberToRemove', - 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__ + 'OffsetSpec', 'OffsetTimestamp', # NewTopic + NewPartitions are deprecated and not included in __all__ 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 2da8238c0..861406104 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -14,6 +14,7 @@ from kafka.protocol.admin import DeleteGroupsRequest, DescribeGroupsRequest, ListGroupsRequest from kafka.protocol.consumer import ( LeaveGroupRequest, OffsetCommitRequest, OffsetDeleteRequest, OffsetFetchRequest, + OffsetSpec, OffsetTimestamp, ) from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID from kafka.protocol.consumer.metadata import ( @@ -382,31 +383,83 @@ def _reset_group_offsets_process_response(response, to_reset): } return results + @staticmethod + def _clamp_offset(raw, earliest, latest): + if raw < 0 or raw > latest: + return latest + if raw < earliest: + return earliest + return raw + async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None): if not offset_specs: return {} + all_tps = set(offset_specs.keys()) + + explicit_offsets = {} + for tp, val in list(offset_specs.items()): + if isinstance(val, (OffsetSpec, OffsetTimestamp)): + pass + elif isinstance(val, int): + explicit_offsets[tp] = offset_specs.pop(tp) + else: + raise TypeError( + f'Unsupported reset target for {tp}: {val!r} ' + '(expected OffsetSpec, OffsetTimestamp, or int offset)') + if group_coordinator_id is None: group_coordinator_id = await self._find_coordinator_id(group_id) - current = await self._async_list_group_offsets(group_id, group_coordinator_id, offset_specs.keys()) - offsets = await self._async_list_partition_offsets(offset_specs) + + current = await self._async_list_group_offsets(group_id, group_coordinator_id, all_tps) + earliest = await self._async_list_partition_offsets({tp: OffsetSpec.EARLIEST for tp in all_tps}) + latest = await self._async_list_partition_offsets({tp: OffsetSpec.LATEST for tp in all_tps}) + + offsets = {} + if offset_specs: + offsets = await self._async_list_partition_offsets(offset_specs) + to_reset = {} - for tp in offsets: - to_reset[tp] = current[tp]._replace(offset=offsets[tp].offset) + for tp in all_tps: + if tp in offsets: + raw = offsets[tp].offset + else: + raw = explicit_offsets[tp] + clamped = self._clamp_offset(raw, earliest[tp].offset, latest[tp].offset) + if tp in current: + to_reset[tp] = current[tp]._replace(offset=clamped) + else: + to_reset[tp] = OffsetAndMetadata(offset=clamped, metadata='', leader_epoch=None) + request = self._alter_group_offsets_request(group_id, to_reset) response = await self._manager.send(request, node_id=group_coordinator_id) return self._reset_group_offsets_process_response(response, to_reset) def reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None): - """Reset committed offsets for a consumer group to earliest or latest. + """Reset committed offsets for a consumer group. The group must have no active members (i.e. be empty or dead) for the reset to succeed; otherwise individual partitions may return ``UNKNOWN_MEMBER_ID`` or similar errors. + Each dict value selects how the target offset is produced. All + resulting offsets are clamped to the partition's + ``[earliest, latest]`` range; values that resolve to + ``UNKNOWN_OFFSET`` (e.g. a timestamp beyond the last record) are + clamped to ``latest``. + Arguments: group_id (str): The consumer group id. offset_specs (dict): A dict mapping :class:`~kafka.TopicPartition` to - :class:`~kafka.admin.OffsetSpec`. + one of: + + * :class:`~kafka.admin.OffsetSpec` (e.g. ``OffsetSpec.EARLIEST``, + ``OffsetSpec.LATEST``, ``OffsetSpec.MAX_TIMESTAMP``): + resolved server-side via ListOffsets. + * :class:`~kafka.admin.OffsetTimestamp` (ms since epoch): + resolved server-side to the earliest offset whose timestamp + is ``>=`` the given value. + * Plain ``int``: an explicit committed offset (no server-side + resolution), which is still clamped to the valid range. Keyword Arguments: group_coordinator_id (int, optional): The node_id of the group's @@ -415,7 +468,8 @@ def reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None) Returns: dict: A dict mapping :class:`~kafka.TopicPartition` to dict of - {'error': :class:`~kafka.errors.KafkaError` class, 'offset': int} + {'error': :class:`~kafka.errors.KafkaError` class, 'offset': int}. + The ``offset`` value is the post-clamp value that was committed. """ return self._manager.run( self._async_reset_group_offsets, group_id, offset_specs, group_coordinator_id) diff --git a/kafka/admin/_partitions.py b/kafka/admin/_partitions.py index 61b711d0a..d185874ac 100644 --- a/kafka/admin/_partitions.py +++ b/kafka/admin/_partitions.py @@ -20,7 +20,9 @@ ElectionType, ListPartitionReassignmentsRequest, ) -from kafka.protocol.consumer import ListOffsetsRequest, IsolationLevel, OffsetSpec +from kafka.protocol.consumer import ( + ListOffsetsRequest, IsolationLevel, OffsetSpec, OffsetTimestamp, +) from kafka.structs import TopicPartition, OffsetAndTimestamp diff --git a/kafka/cli/admin/groups/reset_offsets.py b/kafka/cli/admin/groups/reset_offsets.py index 66c19d34a..f82d7b810 100644 --- a/kafka/cli/admin/groups/reset_offsets.py +++ b/kafka/cli/admin/groups/reset_offsets.py @@ -1,7 +1,9 @@ +import re from collections import defaultdict +from datetime import datetime, timedelta, timezone -from kafka.admin import OffsetSpec -from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.admin import OffsetSpec, OffsetTimestamp +from kafka.structs import TopicPartition class ResetGroupOffsets: @@ -11,39 +13,49 @@ class ResetGroupOffsets: @classmethod def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, required=True) - parser.add_argument( - '-s', '--spec', type=str, - help='Spec may be one of earliest, latest, max-timestamp, earliest-local, ' - 'latest-tiered, or a millisecond timestamp. ' - 'Applies to all topic/partitions currently in group. Mutually exclusive ' - 'with --partition') parser.add_argument( '-p', '--partition', type=str, action='append', dest='partitions', default=[], - help='TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a ' - 'single partition, a closed range (0-2), an open range (1-), or ' - 'a single wildcard "*" for all partitions. SPEC may be one of ' - 'earliest, latest, max-timestamp, earliest-local, latest-tiered, ' - 'or a millisecond timestamp.') + help='TOPIC:PARTITION pair (repeatable). Scopes the reset to these ' + 'partitions. If omitted, the reset applies to every partition ' + 'currently committed by the group.') + mode = parser.add_mutually_exclusive_group(required=True) + mode.add_argument( + '-s', '--spec', type=str, + help='Spec may be one of earliest, latest, max-timestamp, ' + 'earliest-local, latest-tiered, or a millisecond timestamp.') + mode.add_argument( + '--to-offset', type=int, dest='to_offset', + help='Reset all in-scope partitions to this explicit offset ' + '(clamped to [earliest, latest]).') + mode.add_argument( + '--shift-by', type=int, dest='shift_by', + help='Shift each in-scope committed offset by N positions (may be ' + 'negative); clamped to [earliest, latest]. Requires a current ' + 'commit for each in-scope partition.') + mode.add_argument( + '--by-duration', type=str, dest='by_duration', + help='Reset to the offset at (now - DURATION). DURATION is ISO-8601, ' + 'e.g. P7D, PT1H, PT30M.') + mode.add_argument( + '--to-datetime', type=str, dest='to_datetime', + help='Reset to the offset at the given ISO-8601 datetime (UTC assumed ' + 'if no tz offset is provided).') + mode.add_argument( + '--to-current', action='store_true', dest='to_current', + help='Re-commit the current committed offsets, clamped to ' + '[earliest, latest]. Useful to heal out-of-range offsets. ' + 'Requires a current commit for each in-scope partition.') @classmethod def command(cls, client, args): - if not args.spec and not args.partitions: - raise ValueError('One of --spec or --partition is required') - elif args.spec and args.partitions: - raise ValueError('Only one of --spec and --partition are allowed') group = client.describe_groups([args.group_id]) state = group[args.group_id]['group_state'] if state not in ('Empty', 'Dead'): raise RuntimeError(f'Group {args.group_id} is {state}, expecting Empty or Dead!') - offset_specs = {} - if args.spec: - offsets = client.list_group_offsets(args.group_id) - spec = OffsetSpec.build_from(args.spec) - offset_specs = {tp: spec for tp in offsets} - else: - offset_specs = cls._parse_partition_specs(args.partitions) - result = client.reset_group_offsets(args.group_id, offset_specs) + + targets = cls._build_targets(client, args) + result = client.reset_group_offsets(args.group_id, targets) output = defaultdict(dict) for tp, res in result.items(): res['error'] = res['error'].__name__ @@ -51,14 +63,77 @@ def command(cls, client, args): return dict(output) @classmethod - def _parse_partition_specs(cls, partitions): - tp_offsets = {} - for entry in partitions: - topic, partition, spec_str = entry.rsplit(':', 2) - spec = OffsetSpec.build_from(spec_str) - tp = TopicPartition(topic, int(partition)) - if tp in tp_offsets: - # Passing multiple specs for a single partition results in an InvalidRequestError - raise ValueError('Only one spec allowed per partition') - tp_offsets[tp] = spec - return tp_offsets + def _build_targets(cls, client, args): + explicit_scope = [cls._parse_tp(p) for p in args.partitions] if args.partitions else None + + if args.spec is not None: + scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id)) + spec = cls._parse_spec(args.spec) + return {tp: spec for tp in scope} + + if args.to_offset is not None: + scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id)) + return {tp: args.to_offset for tp in scope} + + if args.by_duration: + delta = cls._parse_duration(args.by_duration) + ts = OffsetTimestamp(int((datetime.now(timezone.utc) - delta).timestamp() * 1000)) + scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id)) + return {tp: ts for tp in scope} + + if args.to_datetime: + dt = cls._parse_datetime(args.to_datetime) + ts = OffsetTimestamp(int(dt.timestamp() * 1000)) + scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id)) + return {tp: ts for tp in scope} + + current = client.list_group_offsets(args.group_id) + scope = explicit_scope if explicit_scope is not None else list(current) + missing = [tp for tp in scope if tp not in current] + if missing: + raise ValueError( + f'No committed offset for {missing}; --shift-by / --to-current ' + 'require a current commit per in-scope partition') + + if args.shift_by is not None: + return {tp: current[tp].offset + args.shift_by for tp in scope} + if args.to_current: + return {tp: current[tp].offset for tp in scope} + raise ValueError('No reset mode selected') + + @classmethod + def _parse_tp(cls, entry): + topic, partition = entry.rsplit(':', 1) + return TopicPartition(topic, int(partition)) + + @classmethod + def _parse_spec(cls, spec_str): + try: + return OffsetSpec.build_from(spec_str) + except ValueError: + return OffsetTimestamp(int(spec_str)) + + @classmethod + def _parse_duration(cls, duration): + m = re.match( + r'^P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+(?:\.\d+)?)S)?)?$', + duration) + if not m or not any(m.groups()): + raise ValueError(f'Invalid ISO-8601 duration: {duration}') + days, hours, mins, secs = m.groups() + return timedelta( + days=int(days) if days else 0, + hours=int(hours) if hours else 0, + minutes=int(mins) if mins else 0, + seconds=float(secs) if secs else 0, + ) + + @classmethod + def _parse_datetime(cls, dt_str): + try: + dt = datetime.fromisoformat(dt_str) + except ValueError: + raise ValueError(f'Invalid ISO-8601 datetime: {dt_str}') + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt diff --git a/kafka/protocol/consumer/offsets.py b/kafka/protocol/consumer/offsets.py index 20ff3f2c6..0369b756f 100644 --- a/kafka/protocol/consumer/offsets.py +++ b/kafka/protocol/consumer/offsets.py @@ -26,6 +26,20 @@ class OffsetSpec(EnumHelper, IntEnum): LATEST_TIERED = -5 # the latest offset of the partition in remote storage (KIP-1005) +class OffsetTimestamp(int): + """Millisecond-timestamp spec for partition offset lookup. + + Wraps an int so it can be distinguished from a bare offset. Use with + :meth:`KafkaAdminClient.reset_group_offsets` (and anywhere else a spec + may be mixed with explicit offsets) to request "earliest offset whose + timestamp is >= N ms". + """ + __slots__ = () + + def __repr__(self): + return f'OffsetTimestamp({int(self)})' + + class ListOffsetsRequest(ApiMessage): @classmethod def min_version_for_timestamp(cls, ts): @@ -54,7 +68,8 @@ class OffsetForLeaderEpochResponse(ApiMessage): pass __all__ = [ - 'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', + 'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', + 'OffsetSpec', 'OffsetTimestamp', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse', ] diff --git a/kafka/protocol/consumer/offsets.pyi b/kafka/protocol/consumer/offsets.pyi index 66c1a59ac..a456df2da 100644 --- a/kafka/protocol/consumer/offsets.pyi +++ b/kafka/protocol/consumer/offsets.pyi @@ -6,7 +6,7 @@ from enum import IntEnum from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse'] +__all__ = ['UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', 'OffsetTimestamp', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse'] UNKNOWN_OFFSET: int @@ -26,6 +26,9 @@ class OffsetSpec(EnumHelper, IntEnum): EARLIEST_LOCAL: int LATEST_TIERED: int +class OffsetTimestamp(int): + ... + class ListOffsetsRequest(ApiMessage): class ListOffsetsTopic(DataContainer): class ListOffsetsPartition(DataContainer): diff --git a/test/admin/test_admin_groups.py b/test/admin/test_admin_groups.py index bb3041d42..9857f82ba 100644 --- a/test/admin/test_admin_groups.py +++ b/test/admin/test_admin_groups.py @@ -1,6 +1,10 @@ import pytest -from kafka.admin import GroupState, GroupType, KafkaAdminClient, MemberToRemove +from kafka.admin import ( + KafkaAdminClient, + GroupState, GroupType, MemberToRemove, + OffsetTimestamp, +) import kafka.errors as Errors from kafka.errors import ( GroupIdNotFoundError, @@ -12,9 +16,12 @@ from kafka.protocol.admin import ListGroupsRequest, ListGroupsResponse from kafka.protocol.consumer import ( LeaveGroupRequest, LeaveGroupResponse, + ListOffsetsRequest, ListOffsetsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, + OffsetFetchRequest, OffsetFetchResponse, ) +from kafka.protocol.metadata import MetadataResponse from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID from kafka.structs import OffsetAndMetadata, TopicPartition @@ -515,3 +522,192 @@ def test_states_filter_rejected_on_pre_518_broker(self, broker, admin): def test_types_filter_rejected_on_pre_848_broker(self, broker, admin): with pytest.raises(Errors.IncompatibleBrokerVersion): admin.list_groups(types_filter=['consumer']) + + +# --------------------------------------------------------------------------- +# reset_group_offsets +# --------------------------------------------------------------------------- + + +def _set_metadata_for_topic(broker, name, num_partitions, leader_id=0): + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + Broker = MetadataResponse.MetadataResponseBroker + brokers = [Broker(node_id=0, host=broker.host, port=broker.port, rack=None)] + broker.set_metadata( + topics=[Topic( + version=8, error_code=0, name=name, is_internal=False, + partitions=[ + Partition(version=8, error_code=0, partition_index=i, + leader_id=leader_id, leader_epoch=0, + replica_nodes=[0], isr_nodes=[0], offline_replicas=[]) + for i in range(num_partitions) + ])], + brokers=brokers, + ) + + +def _offset_fetch_response(partitions): + """partitions: list of (topic, partition, committed_offset, metadata, leader_epoch).""" + _Topic = OffsetFetchResponse.OffsetFetchResponseTopic + _Partition = _Topic.OffsetFetchResponsePartition + by_topic = {} + for topic, part, offset, meta, le in partitions: + by_topic.setdefault(topic, []).append(_Partition( + partition_index=part, committed_offset=offset, + committed_leader_epoch=le, metadata=meta, error_code=0)) + return OffsetFetchResponse( + throttle_time_ms=0, error_code=0, + topics=[_Topic(name=t, partitions=parts) for t, parts in by_topic.items()], + ) + + +def _list_offsets_response(partitions): + """partitions: list of (topic, partition, offset, timestamp, leader_epoch).""" + Topic = ListOffsetsResponse.ListOffsetsTopicResponse + Partition = Topic.ListOffsetsPartitionResponse + by_topic = {} + for topic, part, offset, ts, le in partitions: + by_topic.setdefault(topic, []).append(Partition( + partition_index=part, error_code=0, timestamp=ts, + offset=offset, leader_epoch=le)) + return ListOffsetsResponse( + throttle_time_ms=0, + topics=[Topic(name=t, partitions=parts) for t, parts in by_topic.items()], + ) + + +def _offset_commit_response(partitions): + """partitions: list of (topic, partition, error_code).""" + _Topic = OffsetCommitResponse.OffsetCommitResponseTopic + _Partition = _Topic.OffsetCommitResponsePartition + by_topic = {} + for topic, part, err in partitions: + by_topic.setdefault(topic, []).append( + _Partition(partition_index=part, error_code=err)) + return OffsetCommitResponse( + throttle_time_ms=0, + topics=[_Topic(name=t, partitions=parts) for t, parts in by_topic.items()], + ) + + +class TestResetGroupOffsetsMockBroker: + def test_clamps_explicit_offset_above_latest(self, broker, admin): + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + broker.respond(OffsetFetchRequest, _offset_fetch_response( + [('topic-a', 0, 50, '', 0)])) + # Bounds: earliest=10, latest=100 + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 10, -1, 0)])) + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 100, -1, 0)])) + committed = {} + + def commit_handler(api_key, api_version, correlation_id, request_bytes): + req = OffsetCommitRequest.decode(request_bytes, version=api_version, header=True) + for t in req.topics: + for p in t.partitions: + committed[TopicPartition(t.name, p.partition_index)] = p.committed_offset + return _offset_commit_response([('topic-a', 0, 0)]) + + broker.respond_fn(OffsetCommitRequest, commit_handler) + + tp = TopicPartition('topic-a', 0) + result = admin.reset_group_offsets( + 'g1', {tp: 9999}, group_coordinator_id=0) + + assert committed[tp] == 100 + assert result[tp]['offset'] == 100 + assert result[tp]['error'] is NoError + + def test_clamps_explicit_offset_below_earliest(self, broker, admin): + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + broker.respond(OffsetFetchRequest, _offset_fetch_response( + [('topic-a', 0, 50, '', 0)])) + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 10, -1, 0)])) + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 100, -1, 0)])) + committed = {} + + def commit_handler(api_key, api_version, correlation_id, request_bytes): + req = OffsetCommitRequest.decode(request_bytes, version=api_version, header=True) + for t in req.topics: + for p in t.partitions: + committed[TopicPartition(t.name, p.partition_index)] = p.committed_offset + return _offset_commit_response([('topic-a', 0, 0)]) + + broker.respond_fn(OffsetCommitRequest, commit_handler) + + tp = TopicPartition('topic-a', 0) + result = admin.reset_group_offsets( + 'g1', {tp: 5}, group_coordinator_id=0) + + assert committed[tp] == 10 + assert result[tp]['offset'] == 10 + + def test_clamps_unknown_offset_to_latest(self, broker, admin): + # Simulate a timestamp beyond the last record: ListOffsets returns -1. + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + broker.respond(OffsetFetchRequest, _offset_fetch_response( + [('topic-a', 0, 50, '', 0)])) + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 10, -1, 0)])) # earliest + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 100, -1, 0)])) # latest + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, -1, -1, -1)])) # spec resolution: UNKNOWN + committed = {} + + def commit_handler(api_key, api_version, correlation_id, request_bytes): + req = OffsetCommitRequest.decode(request_bytes, version=api_version, header=True) + for t in req.topics: + for p in t.partitions: + committed[TopicPartition(t.name, p.partition_index)] = p.committed_offset + return _offset_commit_response([('topic-a', 0, 0)]) + + broker.respond_fn(OffsetCommitRequest, commit_handler) + + tp = TopicPartition('topic-a', 0) + result = admin.reset_group_offsets( + 'g1', {tp: OffsetTimestamp(99999999999)}, group_coordinator_id=0) + + assert committed[tp] == 100 + assert result[tp]['offset'] == 100 + + def test_offset_in_range_not_clamped(self, broker, admin): + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + broker.respond(OffsetFetchRequest, _offset_fetch_response( + [('topic-a', 0, 50, 'm', 3)])) + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 10, -1, 0)])) + broker.respond(ListOffsetsRequest, _list_offsets_response( + [('topic-a', 0, 100, -1, 0)])) + committed = {} + + def commit_handler(api_key, api_version, correlation_id, request_bytes): + req = OffsetCommitRequest.decode(request_bytes, version=api_version, header=True) + for t in req.topics: + for p in t.partitions: + committed[TopicPartition(t.name, p.partition_index)] = ( + p.committed_offset, p.committed_metadata, p.committed_leader_epoch) + return _offset_commit_response([('topic-a', 0, 0)]) + + broker.respond_fn(OffsetCommitRequest, commit_handler) + + tp = TopicPartition('topic-a', 0) + result = admin.reset_group_offsets( + 'g1', {tp: 42}, group_coordinator_id=0) + + # Offset not clamped; existing metadata/leader_epoch preserved. + assert committed[tp] == (42, 'm', 3) + assert result[tp]['offset'] == 42 + + def test_empty_input_noop(self, broker, admin): + assert admin.reset_group_offsets('g1', {}, group_coordinator_id=0) == {} + + def test_unsupported_value_type_raises(self, broker, admin): + tp = TopicPartition('topic-a', 0) + with pytest.raises(TypeError, match='Unsupported reset target'): + admin.reset_group_offsets( + 'g1', {tp: 'earliest'}, group_coordinator_id=0)