Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._configs import (
AlterConfigOp, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType,
ConfigType, ConfigSourceType)
from kafka.admin._groups import MemberToRemove
from kafka.admin._partitions import NewPartitions, OffsetSpec
from kafka.admin._topics import NewTopic
Expand Down
30 changes: 6 additions & 24 deletions kafka/admin/_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
IncrementalAlterConfigsRequest,
ListConfigResourcesRequest,
)
from kafka.util import EnumHelper

if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager
Expand Down Expand Up @@ -104,11 +105,7 @@ def _describe_configs_request(cls, config_resources, include_synonyms=False):

@staticmethod
def _describe_configs_process_responses(responses, config_filter='modified'):
if isinstance(config_filter, str):
try:
config_filter = ConfigFilterType[config_filter.upper()]
except KeyError:
raise ValueError(f'{config_filter} is not a valid ConfigFilterType')
config_filter = ConfigFilterType.build_from(config_filter)
ret = defaultdict(dict)
for response in responses:
for result in response.results:
Expand Down Expand Up @@ -192,11 +189,7 @@ def _list_config_resources_process_response(response):
async def _async_list_config_resources(self, resource_types=None):
wire_types = []
for rt in resource_types or []:
if not isinstance(rt, ConfigResourceType):
try:
rt = ConfigResourceType[str(rt).upper().replace('-', '_')]
except KeyError:
raise ValueError(f'Unrecognized ConfigResourceType: {rt}')
rt = ConfigResourceType.build_from(rt)
wire_types.append(rt.value)
request = ListConfigResourcesRequest(resource_types=wire_types)
response = await self._manager.send(request)
Expand Down Expand Up @@ -371,25 +364,14 @@ def reset_configs(self, config_resources, validate_only=False, raise_on_unknown=
return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown, incremental)


class AlterConfigOp(IntEnum):
class AlterConfigOp(EnumHelper, IntEnum):
SET = 0
DELETE = 1
APPEND = 2
SUBTRACT = 3

@staticmethod
def value_for(op):
if isinstance(op, AlterConfigOp):
return op.value
if isinstance(op, int):
return AlterConfigOp(op).value
try:
return AlterConfigOp[str(op).upper()].value
except KeyError:
raise ValueError(f'Unrecognized AlterConfigOp: {op}')


class ConfigFilterType(IntEnum):
class ConfigFilterType(EnumHelper, IntEnum):
ALL = 0
DYNAMIC = 1
MODIFIED = 2
Expand All @@ -406,7 +388,7 @@ def should_skip(self, config_source):
return False


class ConfigResourceType(IntEnum):
class ConfigResourceType(EnumHelper, IntEnum):
UNKNOWN = 0
TOPIC = 2
BROKER = 4
Expand Down
9 changes: 1 addition & 8 deletions kafka/admin/_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,7 @@ def _list_partition_offsets_process_response(response):
return results

async def _async_list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'):
if isinstance(isolation_level, str):
try:
isolation_level = IsolationLevel[isolation_level.upper()]
except KeyError:
raise ValueError(f'Unrecognized isolation_level: {isolation_level}')
elif isinstance(isolation_level, int):
isolation_level = IsolationLevel(isolation_level)

isolation_level = IsolationLevel.build_from(isolation_level)
results = {}
topic_partitions = set(topic_partition_specs.keys())
while topic_partitions:
Expand Down
16 changes: 2 additions & 14 deletions kafka/cli/admin/groups/reset_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def command(cls, client, args):
offset_specs = {}
if args.spec:
offsets = client.list_group_offsets(args.group_id)
spec = cls._parse_spec(args.spec)
spec = OffsetSpec.build_from(args.spec)
offset_specs = {tp: spec for tp in offsets}
else:
offset_specs = cls._parse_partition_specs(args.partitions)
Expand All @@ -52,24 +52,12 @@ def command(cls, client, args):
output[tp.topic][tp.partition] = res
return dict(output)

@staticmethod
def _parse_spec(spec):
try:
return int(spec)
except ValueError:
pass
try:
spec_key = spec.upper().replace('-', '_')
return OffsetSpec[spec_key]
except KeyError:
raise ValueError(f'{spec_key} is not a valid OffsetSpec')

@classmethod
def _parse_partition_specs(cls, partitions):
tp_offsets = {}
for entry in partitions:
topic, partition, spec_str = entry.rsplit(':', 2)
spec = cls._parse_spec(spec_str)
spec = OffsetSpec.build_from(spec_str)
tp = TopicPartition(topic, int(partition))
if tp in tp_offsets:
# Passing multiple specs for a single partition results in an InvalidRequestError
Expand Down
14 changes: 1 addition & 13 deletions kafka/cli/admin/partitions/list_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ def command(cls, client, args):
}
return dict(output)

@staticmethod
def _parse_spec(spec):
try:
return int(spec)
except ValueError:
pass
try:
spec_key = spec.upper().replace('-', '_')
return OffsetSpec[spec_key]
except KeyError:
raise ValueError(f'{spec_key} is not a valid OffsetSpec')

@classmethod
def _parse_partition_specs(cls, client, args):
if args.partitions:
Expand All @@ -64,7 +52,7 @@ def _parse_partition_specs(cls, client, args):
tp_offsets = {}
for entry in partitions:
topic, partition, spec_str = entry.rsplit(':', 2)
spec = cls._parse_spec(spec_str)
spec = OffsetSpec.build_from(spec_str)
for tp in cls._parse_tp(client, topic, partition):
if tp in tp_offsets:
# Passing multiple specs for a single partition results in an InvalidRequestError
Expand Down
5 changes: 3 additions & 2 deletions kafka/protocol/consumer/offsets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import IntEnum

from ..api_message import ApiMessage
from kafka.util import EnumHelper


UNKNOWN_OFFSET = -1
Expand All @@ -11,12 +12,12 @@ class OffsetResetStrategy:
NONE = 0


class IsolationLevel(IntEnum):
class IsolationLevel(EnumHelper, IntEnum):
READ_UNCOMMITTED = 0
READ_COMMITTED = 1


class OffsetSpec(IntEnum):
class OffsetSpec(EnumHelper, IntEnum):
# Any >= 0: # earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record.
LATEST = -1 # offset of the next message that will be appended to the log and a timestamp of -1
EARLIEST = -2 # first offset on the partition, including remote-storage, and a timestamp of -1
Expand Down
4 changes: 2 additions & 2 deletions kafka/protocol/consumer/offsets.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class OffsetResetStrategy:
EARLIEST: int
NONE: int

class IsolationLevel(IntEnum):
class IsolationLevel(EnumHelper, IntEnum):
READ_UNCOMMITTED: int
READ_COMMITTED: int

class OffsetSpec(IntEnum):
class OffsetSpec(EnumHelper, IntEnum):
LATEST: int
EARLIEST: int
MAX_TIMESTAMP: int
Expand Down
26 changes: 26 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,29 @@ def __init__(self, f):
self.f = f
def __get__(self, obj, owner):
return self.f(owner)


class EnumHelper:
@classmethod
def build_from(cls, val):
if isinstance(val, cls):
return val
try:
return cls(val)
except ValueError:
pass
try:
return cls[str(val).strip().upper().replace('-', '_')] # pylint: disable=E1136
except KeyError:
raise ValueError(f'Unrecognized {cls.__name__}: {val}')

@classmethod
def value_for(cls, val):
if isinstance(val, cls):
return val.value
if isinstance(val, int):
return cls(val).value # pylint: disable=E1101
try:
return cls[str(val).upper().replace('-', '_')].value # pylint: disable=E1136
except KeyError:
raise ValueError(f'Unrecognized {cls.__name__}: {val}')
Loading