Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType,
ConfigType, ConfigSourceType)
from kafka.admin._groups import GroupState, GroupType, MemberToRemove
from kafka.admin._partitions import NewPartitions, OffsetSpec
from kafka.admin._partitions import NewPartitions, OffsetSpec, OffsetTimestamp
from kafka.admin._topics import NewTopic
from kafka.admin._users import (
ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion)
Expand All @@ -19,6 +19,6 @@
'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'UpdateFeatureType',
'GroupState', 'GroupType', 'MemberToRemove',
'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__
'OffsetSpec', 'OffsetTimestamp', # NewTopic + NewPartitions are deprecated and not included in __all__
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
68 changes: 61 additions & 7 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from kafka.protocol.admin import DeleteGroupsRequest, DescribeGroupsRequest, ListGroupsRequest
from kafka.protocol.consumer import (
LeaveGroupRequest, OffsetCommitRequest, OffsetDeleteRequest, OffsetFetchRequest,
OffsetSpec, OffsetTimestamp,
)
from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
from kafka.protocol.consumer.metadata import (
Expand Down Expand Up @@ -382,31 +383,83 @@ def _reset_group_offsets_process_response(response, to_reset):
}
return results

@staticmethod
def _clamp_offset(raw, earliest, latest):
if raw < 0 or raw > latest:
return latest
if raw < earliest:
return earliest
return raw

async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None):
if not offset_specs:
return {}
all_tps = set(offset_specs.keys())

explicit_offsets = {}
for tp, val in list(offset_specs.items()):
if isinstance(val, (OffsetSpec, OffsetTimestamp)):
pass
elif isinstance(val, int):
explicit_offsets[tp] = offset_specs.pop(tp)
else:
raise TypeError(
f'Unsupported reset target for {tp}: {val!r} '
'(expected OffsetSpec, OffsetTimestamp, or int offset)')

if group_coordinator_id is None:
group_coordinator_id = await self._find_coordinator_id(group_id)
current = await self._async_list_group_offsets(group_id, group_coordinator_id, offset_specs.keys())
offsets = await self._async_list_partition_offsets(offset_specs)

current = await self._async_list_group_offsets(group_id, group_coordinator_id, all_tps)
earliest = await self._async_list_partition_offsets({tp: OffsetSpec.EARLIEST for tp in all_tps})
latest = await self._async_list_partition_offsets({tp: OffsetSpec.LATEST for tp in all_tps})

offsets = {}
if offset_specs:
offsets = await self._async_list_partition_offsets(offset_specs)

to_reset = {}
for tp in offsets:
to_reset[tp] = current[tp]._replace(offset=offsets[tp].offset)
for tp in all_tps:
if tp in offsets:
raw = offsets[tp].offset
else:
raw = explicit_offsets[tp]
clamped = self._clamp_offset(raw, earliest[tp].offset, latest[tp].offset)
if tp in current:
to_reset[tp] = current[tp]._replace(offset=clamped)
else:
to_reset[tp] = OffsetAndMetadata(offset=clamped, metadata='', leader_epoch=None)

request = self._alter_group_offsets_request(group_id, to_reset)
response = await self._manager.send(request, node_id=group_coordinator_id)
return self._reset_group_offsets_process_response(response, to_reset)

def reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None):
"""Reset committed offsets for a consumer group to earliest or latest.
"""Reset committed offsets for a consumer group.

The group must have no active members (i.e. be empty or dead) for
the reset to succeed; otherwise individual partitions may return
``UNKNOWN_MEMBER_ID`` or similar errors.

Each dict value selects how the target offset is produced. All
resulting offsets are clamped to the partition's
``[earliest, latest]`` range; values that resolve to
``UNKNOWN_OFFSET`` (e.g. a timestamp beyond the last record) are
clamped to ``latest``.

Arguments:
group_id (str): The consumer group id.
offset_specs (dict): A dict mapping :class:`~kafka.TopicPartition` to
:class:`~kafka.admin.OffsetSpec`.
one of:

* :class:`~kafka.admin.OffsetSpec` (e.g. ``OffsetSpec.EARLIEST``,
``OffsetSpec.LATEST``, ``OffsetSpec.MAX_TIMESTAMP``):
resolved server-side via ListOffsets.
* :class:`~kafka.admin.OffsetTimestamp` (ms since epoch):
resolved server-side to the earliest offset whose timestamp
is ``>=`` the given value.
* Plain ``int``: an explicit committed offset (no server-side
resolution), which is still clamped to the valid range.

Keyword Arguments:
group_coordinator_id (int, optional): The node_id of the group's
Expand All @@ -415,7 +468,8 @@ def reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None)

Returns:
dict: A dict mapping :class:`~kafka.TopicPartition` to dict of
{'error': :class:`~kafka.errors.KafkaError` class, 'offset': int}
{'error': :class:`~kafka.errors.KafkaError` class, 'offset': int}.
The ``offset`` value is the post-clamp value that was committed.
"""
return self._manager.run(
self._async_reset_group_offsets, group_id, offset_specs, group_coordinator_id)
Expand Down
4 changes: 3 additions & 1 deletion kafka/admin/_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
ElectionType,
ListPartitionReassignmentsRequest,
)
from kafka.protocol.consumer import ListOffsetsRequest, IsolationLevel, OffsetSpec
from kafka.protocol.consumer import (
ListOffsetsRequest, IsolationLevel, OffsetSpec, OffsetTimestamp,
)
from kafka.structs import TopicPartition, OffsetAndTimestamp


Expand Down
147 changes: 111 additions & 36 deletions kafka/cli/admin/groups/reset_offsets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import re
from collections import defaultdict
from datetime import datetime, timedelta, timezone

from kafka.admin import OffsetSpec
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.admin import OffsetSpec, OffsetTimestamp
from kafka.structs import TopicPartition


class ResetGroupOffsets:
Expand All @@ -11,54 +13,127 @@ class ResetGroupOffsets:
@classmethod
def add_arguments(cls, parser):
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.add_argument(
'-s', '--spec', type=str,
help='Spec may be one of earliest, latest, max-timestamp, earliest-local, '
'latest-tiered, or a millisecond timestamp. '
'Applies to all topic/partitions currently in group. Mutually exclusive '
'with --partition')
parser.add_argument(
'-p', '--partition', type=str, action='append',
dest='partitions', default=[],
help='TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a '
'single partition, a closed range (0-2), an open range (1-), or '
'a single wildcard "*" for all partitions. SPEC may be one of '
'earliest, latest, max-timestamp, earliest-local, latest-tiered, '
'or a millisecond timestamp.')
help='TOPIC:PARTITION pair (repeatable). Scopes the reset to these '
'partitions. If omitted, the reset applies to every partition '
'currently committed by the group.')
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument(
'-s', '--spec', type=str,
help='Spec may be one of earliest, latest, max-timestamp, '
'earliest-local, latest-tiered, or a millisecond timestamp.')
mode.add_argument(
'--to-offset', type=int, dest='to_offset',
help='Reset all in-scope partitions to this explicit offset '
'(clamped to [earliest, latest]).')
mode.add_argument(
'--shift-by', type=int, dest='shift_by',
help='Shift each in-scope committed offset by N positions (may be '
'negative); clamped to [earliest, latest]. Requires a current '
'commit for each in-scope partition.')
mode.add_argument(
'--by-duration', type=str, dest='by_duration',
help='Reset to the offset at (now - DURATION). DURATION is ISO-8601, '
'e.g. P7D, PT1H, PT30M.')
mode.add_argument(
'--to-datetime', type=str, dest='to_datetime',
help='Reset to the offset at the given ISO-8601 datetime (UTC assumed '
'if no tz offset is provided).')
mode.add_argument(
'--to-current', action='store_true', dest='to_current',
help='Re-commit the current committed offsets, clamped to '
'[earliest, latest]. Useful to heal out-of-range offsets. '
'Requires a current commit for each in-scope partition.')

@classmethod
def command(cls, client, args):
if not args.spec and not args.partitions:
raise ValueError('One of --spec or --partition is required')
elif args.spec and args.partitions:
raise ValueError('Only one of --spec and --partition are allowed')
group = client.describe_groups([args.group_id])
state = group[args.group_id]['group_state']
if state not in ('Empty', 'Dead'):
raise RuntimeError(f'Group {args.group_id} is {state}, expecting Empty or Dead!')
offset_specs = {}
if args.spec:
offsets = client.list_group_offsets(args.group_id)
spec = OffsetSpec.build_from(args.spec)
offset_specs = {tp: spec for tp in offsets}
else:
offset_specs = cls._parse_partition_specs(args.partitions)
result = client.reset_group_offsets(args.group_id, offset_specs)

targets = cls._build_targets(client, args)
result = client.reset_group_offsets(args.group_id, targets)
output = defaultdict(dict)
for tp, res in result.items():
res['error'] = res['error'].__name__
output[tp.topic][tp.partition] = res
return dict(output)

@classmethod
def _parse_partition_specs(cls, partitions):
tp_offsets = {}
for entry in partitions:
topic, partition, spec_str = entry.rsplit(':', 2)
spec = OffsetSpec.build_from(spec_str)
tp = TopicPartition(topic, int(partition))
if tp in tp_offsets:
# Passing multiple specs for a single partition results in an InvalidRequestError
raise ValueError('Only one spec allowed per partition')
tp_offsets[tp] = spec
return tp_offsets
def _build_targets(cls, client, args):
explicit_scope = [cls._parse_tp(p) for p in args.partitions] if args.partitions else None

if args.spec is not None:
scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id))
spec = cls._parse_spec(args.spec)
return {tp: spec for tp in scope}

if args.to_offset is not None:
scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id))
return {tp: args.to_offset for tp in scope}

if args.by_duration:
delta = cls._parse_duration(args.by_duration)
ts = OffsetTimestamp(int((datetime.now(timezone.utc) - delta).timestamp() * 1000))
scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id))
return {tp: ts for tp in scope}

if args.to_datetime:
dt = cls._parse_datetime(args.to_datetime)
ts = OffsetTimestamp(int(dt.timestamp() * 1000))
scope = explicit_scope if explicit_scope is not None else list(client.list_group_offsets(args.group_id))
return {tp: ts for tp in scope}

current = client.list_group_offsets(args.group_id)
scope = explicit_scope if explicit_scope is not None else list(current)
missing = [tp for tp in scope if tp not in current]
if missing:
raise ValueError(
f'No committed offset for {missing}; --shift-by / --to-current '
'require a current commit per in-scope partition')

if args.shift_by is not None:
return {tp: current[tp].offset + args.shift_by for tp in scope}
if args.to_current:
return {tp: current[tp].offset for tp in scope}
raise ValueError('No reset mode selected')

@classmethod
def _parse_tp(cls, entry):
topic, partition = entry.rsplit(':', 1)
return TopicPartition(topic, int(partition))

@classmethod
def _parse_spec(cls, spec_str):
try:
return OffsetSpec.build_from(spec_str)
except ValueError:
return OffsetTimestamp(int(spec_str))

@classmethod
def _parse_duration(cls, duration):
m = re.match(
r'^P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+(?:\.\d+)?)S)?)?$',
duration)
if not m or not any(m.groups()):
raise ValueError(f'Invalid ISO-8601 duration: {duration}')
days, hours, mins, secs = m.groups()
return timedelta(
days=int(days) if days else 0,
hours=int(hours) if hours else 0,
minutes=int(mins) if mins else 0,
seconds=float(secs) if secs else 0,
)

@classmethod
def _parse_datetime(cls, dt_str):
try:
dt = datetime.fromisoformat(dt_str)
except ValueError:
raise ValueError(f'Invalid ISO-8601 datetime: {dt_str}')
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
17 changes: 16 additions & 1 deletion kafka/protocol/consumer/offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ class OffsetSpec(EnumHelper, IntEnum):
LATEST_TIERED = -5 # the latest offset of the partition in remote storage (KIP-1005)


class OffsetTimestamp(int):
"""Millisecond-timestamp spec for partition offset lookup.

Wraps an int so it can be distinguished from a bare offset. Use with
:meth:`KafkaAdminClient.reset_group_offsets` (and anywhere else a spec
may be mixed with explicit offsets) to request "earliest offset whose
timestamp is >= N ms".
"""
__slots__ = ()

def __repr__(self):
return f'OffsetTimestamp({int(self)})'


class ListOffsetsRequest(ApiMessage):
@classmethod
def min_version_for_timestamp(cls, ts):
Expand Down Expand Up @@ -54,7 +68,8 @@ class OffsetForLeaderEpochResponse(ApiMessage): pass


__all__ = [
'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec',
'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel',
'OffsetSpec', 'OffsetTimestamp',
'ListOffsetsRequest', 'ListOffsetsResponse',
'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse',
]
5 changes: 4 additions & 1 deletion kafka/protocol/consumer/offsets.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from enum import IntEnum
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse']
__all__ = ['UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', 'OffsetTimestamp', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse']

UNKNOWN_OFFSET: int

Expand All @@ -26,6 +26,9 @@ class OffsetSpec(EnumHelper, IntEnum):
EARLIEST_LOCAL: int
LATEST_TIERED: int

class OffsetTimestamp(int):
...

class ListOffsetsRequest(ApiMessage):
class ListOffsetsTopic(DataContainer):
class ListOffsetsPartition(DataContainer):
Expand Down
Loading
Loading