diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 425e2f3c9..0dd32be6b 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -3,7 +3,8 @@ ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin._configs import ( - AlterConfigOp, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) + AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType, + ConfigType, ConfigSourceType) from kafka.admin._groups import MemberToRemove from kafka.admin._partitions import NewPartitions, OffsetSpec from kafka.admin._topics import NewTopic diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index b4c86aab8..24878dfdf 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -17,6 +17,7 @@ IncrementalAlterConfigsRequest, ListConfigResourcesRequest, ) +from kafka.util import EnumHelper if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager @@ -104,11 +105,7 @@ def _describe_configs_request(cls, config_resources, include_synonyms=False): @staticmethod def _describe_configs_process_responses(responses, config_filter='modified'): - if isinstance(config_filter, str): - try: - config_filter = ConfigFilterType[config_filter.upper()] - except KeyError: - raise ValueError(f'{config_filter} is not a valid ConfigFilterType') + config_filter = ConfigFilterType.build_from(config_filter) ret = defaultdict(dict) for response in responses: for result in response.results: @@ -192,11 +189,7 @@ def _list_config_resources_process_response(response): async def _async_list_config_resources(self, resource_types=None): wire_types = [] for rt in resource_types or []: - if not isinstance(rt, ConfigResourceType): - try: - rt = ConfigResourceType[str(rt).upper().replace('-', '_')] - except KeyError: - raise ValueError(f'Unrecognized ConfigResourceType: {rt}') + rt = ConfigResourceType.build_from(rt) wire_types.append(rt.value) request = ListConfigResourcesRequest(resource_types=wire_types) response = await self._manager.send(request) @@ -371,25 +364,14 @@ def reset_configs(self, config_resources, validate_only=False, raise_on_unknown= return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown, incremental) -class AlterConfigOp(IntEnum): +class AlterConfigOp(EnumHelper, IntEnum): SET = 0 DELETE = 1 APPEND = 2 SUBTRACT = 3 - @staticmethod - def value_for(op): - if isinstance(op, AlterConfigOp): - return op.value - if isinstance(op, int): - return AlterConfigOp(op).value - try: - return AlterConfigOp[str(op).upper()].value - except KeyError: - raise ValueError(f'Unrecognized AlterConfigOp: {op}') - -class ConfigFilterType(IntEnum): +class ConfigFilterType(EnumHelper, IntEnum): ALL = 0 DYNAMIC = 1 MODIFIED = 2 @@ -406,7 +388,7 @@ def should_skip(self, config_source): return False -class ConfigResourceType(IntEnum): +class ConfigResourceType(EnumHelper, IntEnum): UNKNOWN = 0 TOPIC = 2 BROKER = 4 diff --git a/kafka/admin/_partitions.py b/kafka/admin/_partitions.py index d9a6832d3..61b711d0a 100644 --- a/kafka/admin/_partitions.py +++ b/kafka/admin/_partitions.py @@ -462,14 +462,7 @@ def _list_partition_offsets_process_response(response): return results async def _async_list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'): - if isinstance(isolation_level, str): - try: - isolation_level = IsolationLevel[isolation_level.upper()] - except KeyError: - raise ValueError(f'Unrecognized isolation_level: {isolation_level}') - elif isinstance(isolation_level, int): - isolation_level = IsolationLevel(isolation_level) - + isolation_level = IsolationLevel.build_from(isolation_level) results = {} topic_partitions = set(topic_partition_specs.keys()) while topic_partitions: diff --git a/kafka/cli/admin/groups/reset_offsets.py b/kafka/cli/admin/groups/reset_offsets.py index f54e5b122..1cdb07c3d 100644 --- a/kafka/cli/admin/groups/reset_offsets.py +++ b/kafka/cli/admin/groups/reset_offsets.py @@ -41,7 +41,7 @@ def command(cls, client, args): offset_specs = {} if args.spec: offsets = client.list_group_offsets(args.group_id) - spec = cls._parse_spec(args.spec) + spec = OffsetSpec.build_from(args.spec) offset_specs = {tp: spec for tp in offsets} else: offset_specs = cls._parse_partition_specs(args.partitions) @@ -52,24 +52,12 @@ def command(cls, client, args): output[tp.topic][tp.partition] = res return dict(output) - @staticmethod - def _parse_spec(spec): - try: - return int(spec) - except ValueError: - pass - try: - spec_key = spec.upper().replace('-', '_') - return OffsetSpec[spec_key] - except KeyError: - raise ValueError(f'{spec_key} is not a valid OffsetSpec') - @classmethod def _parse_partition_specs(cls, partitions): tp_offsets = {} for entry in partitions: topic, partition, spec_str = entry.rsplit(':', 2) - spec = cls._parse_spec(spec_str) + spec = OffsetSpec.build_from(spec_str) tp = TopicPartition(topic, int(partition)) if tp in tp_offsets: # Passing multiple specs for a single partition results in an InvalidRequestError diff --git a/kafka/cli/admin/partitions/list_offsets.py b/kafka/cli/admin/partitions/list_offsets.py index 51003c236..0175e7b02 100644 --- a/kafka/cli/admin/partitions/list_offsets.py +++ b/kafka/cli/admin/partitions/list_offsets.py @@ -41,18 +41,6 @@ def command(cls, client, args): } return dict(output) - @staticmethod - def _parse_spec(spec): - try: - return int(spec) - except ValueError: - pass - try: - spec_key = spec.upper().replace('-', '_') - return OffsetSpec[spec_key] - except KeyError: - raise ValueError(f'{spec_key} is not a valid OffsetSpec') - @classmethod def _parse_partition_specs(cls, client, args): if args.partitions: @@ -64,7 +52,7 @@ def _parse_partition_specs(cls, client, args): tp_offsets = {} for entry in partitions: topic, partition, spec_str = entry.rsplit(':', 2) - spec = cls._parse_spec(spec_str) + spec = OffsetSpec.build_from(spec_str) for tp in cls._parse_tp(client, topic, partition): if tp in tp_offsets: # Passing multiple specs for a single partition results in an InvalidRequestError diff --git a/kafka/protocol/consumer/offsets.py b/kafka/protocol/consumer/offsets.py index 452fd6f7b..20ff3f2c6 100644 --- a/kafka/protocol/consumer/offsets.py +++ b/kafka/protocol/consumer/offsets.py @@ -1,6 +1,7 @@ from enum import IntEnum from ..api_message import ApiMessage +from kafka.util import EnumHelper UNKNOWN_OFFSET = -1 @@ -11,12 +12,12 @@ class OffsetResetStrategy: NONE = 0 -class IsolationLevel(IntEnum): +class IsolationLevel(EnumHelper, IntEnum): READ_UNCOMMITTED = 0 READ_COMMITTED = 1 -class OffsetSpec(IntEnum): +class OffsetSpec(EnumHelper, IntEnum): # Any >= 0: # earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record. LATEST = -1 # offset of the next message that will be appended to the log and a timestamp of -1 EARLIEST = -2 # first offset on the partition, including remote-storage, and a timestamp of -1 diff --git a/kafka/protocol/consumer/offsets.pyi b/kafka/protocol/consumer/offsets.pyi index 239ee774e..66c1a59ac 100644 --- a/kafka/protocol/consumer/offsets.pyi +++ b/kafka/protocol/consumer/offsets.pyi @@ -15,11 +15,11 @@ class OffsetResetStrategy: EARLIEST: int NONE: int -class IsolationLevel(IntEnum): +class IsolationLevel(EnumHelper, IntEnum): READ_UNCOMMITTED: int READ_COMMITTED: int -class OffsetSpec(IntEnum): +class OffsetSpec(EnumHelper, IntEnum): LATEST: int EARLIEST: int MAX_TIMESTAMP: int diff --git a/kafka/util.py b/kafka/util.py index 84c6d5fd1..5ba42be9a 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -131,3 +131,29 @@ def __init__(self, f): self.f = f def __get__(self, obj, owner): return self.f(owner) + + +class EnumHelper: + @classmethod + def build_from(cls, val): + if isinstance(val, cls): + return val + try: + return cls(val) + except ValueError: + pass + try: + return cls[str(val).strip().upper().replace('-', '_')] # pylint: disable=E1136 + except KeyError: + raise ValueError(f'Unrecognized {cls.__name__}: {val}') + + @classmethod + def value_for(cls, val): + if isinstance(val, cls): + return val.value + if isinstance(val, int): + return cls(val).value # pylint: disable=E1101 + try: + return cls[str(val).upper().replace('-', '_')].value # pylint: disable=E1136 + except KeyError: + raise ValueError(f'Unrecognized {cls.__name__}: {val}')