From 1984fb92d16c086d011b91b8543e81f1c9fa4cd8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 15:15:04 -0700 Subject: [PATCH 1/4] EnumHelper --- kafka/admin/_configs.py | 14 ++------------ kafka/util.py | 13 +++++++++++++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index b4c86aab8..3821e8331 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -17,6 +17,7 @@ IncrementalAlterConfigsRequest, ListConfigResourcesRequest, ) +from kafka.util import EnumHelper if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager @@ -371,23 +372,12 @@ def reset_configs(self, config_resources, validate_only=False, raise_on_unknown= return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown, incremental) -class AlterConfigOp(IntEnum): +class AlterConfigOp(EnumHelper, IntEnum): SET = 0 DELETE = 1 APPEND = 2 SUBTRACT = 3 - @staticmethod - def value_for(op): - if isinstance(op, AlterConfigOp): - return op.value - if isinstance(op, int): - return AlterConfigOp(op).value - try: - return AlterConfigOp[str(op).upper()].value - except KeyError: - raise ValueError(f'Unrecognized AlterConfigOp: {op}') - class ConfigFilterType(IntEnum): ALL = 0 diff --git a/kafka/util.py b/kafka/util.py index 84c6d5fd1..304cc291d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -131,3 +131,16 @@ def __init__(self, f): self.f = f def __get__(self, obj, owner): return self.f(owner) + + +class EnumHelper: + @classmethod + def value_for(cls, val): + if isinstance(val, cls): + return val.value + if isinstance(val, int): + return cls(val).value + try: + return cls[str(val).upper().replace('-', '_')].value + except KeyError: + raise ValueError(f'Unrecognized {cls.__name__}: {val}') From 4860ad4ed40afd9692707b044cbdbeed477f2c1f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 15:26:36 -0700 Subject: [PATCH 2/4] Use EnumHelper --- kafka/admin/__init__.py | 3 ++- kafka/admin/_configs.py | 16 ++++------------ kafka/admin/_partitions.py | 9 +-------- kafka/cli/admin/groups/reset_offsets.py | 16 ++-------------- kafka/cli/admin/partitions/list_offsets.py | 14 +------------- kafka/protocol/consumer/offsets.py | 5 +++-- kafka/util.py | 13 +++++++++++++ 7 files changed, 26 insertions(+), 50 deletions(-) diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 425e2f3c9..0dd32be6b 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -3,7 +3,8 @@ ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin._configs import ( - AlterConfigOp, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) + AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType, + ConfigType, ConfigSourceType) from kafka.admin._groups import MemberToRemove from kafka.admin._partitions import NewPartitions, OffsetSpec from kafka.admin._topics import NewTopic diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 3821e8331..24878dfdf 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -105,11 +105,7 @@ def _describe_configs_request(cls, config_resources, include_synonyms=False): @staticmethod def _describe_configs_process_responses(responses, config_filter='modified'): - if isinstance(config_filter, str): - try: - config_filter = ConfigFilterType[config_filter.upper()] - except KeyError: - raise ValueError(f'{config_filter} is not a valid ConfigFilterType') + config_filter = ConfigFilterType.build_from(config_filter) ret = defaultdict(dict) for response in responses: for result in response.results: @@ -193,11 +189,7 @@ def _list_config_resources_process_response(response): async def _async_list_config_resources(self, resource_types=None): wire_types = [] for rt in resource_types or []: - if not isinstance(rt, ConfigResourceType): - try: - rt = ConfigResourceType[str(rt).upper().replace('-', '_')] - except KeyError: - raise ValueError(f'Unrecognized ConfigResourceType: {rt}') + rt = ConfigResourceType.build_from(rt) wire_types.append(rt.value) request = ListConfigResourcesRequest(resource_types=wire_types) response = await self._manager.send(request) @@ -379,7 +371,7 @@ class AlterConfigOp(EnumHelper, IntEnum): SUBTRACT = 3 -class ConfigFilterType(IntEnum): +class ConfigFilterType(EnumHelper, IntEnum): ALL = 0 DYNAMIC = 1 MODIFIED = 2 @@ -396,7 +388,7 @@ def should_skip(self, config_source): return False -class ConfigResourceType(IntEnum): +class ConfigResourceType(EnumHelper, IntEnum): UNKNOWN = 0 TOPIC = 2 BROKER = 4 diff --git a/kafka/admin/_partitions.py b/kafka/admin/_partitions.py index d9a6832d3..61b711d0a 100644 --- a/kafka/admin/_partitions.py +++ b/kafka/admin/_partitions.py @@ -462,14 +462,7 @@ def _list_partition_offsets_process_response(response): return results async def _async_list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'): - if isinstance(isolation_level, str): - try: - isolation_level = IsolationLevel[isolation_level.upper()] - except KeyError: - raise ValueError(f'Unrecognized isolation_level: {isolation_level}') - elif isinstance(isolation_level, int): - isolation_level = IsolationLevel(isolation_level) - + isolation_level = IsolationLevel.build_from(isolation_level) results = {} topic_partitions = set(topic_partition_specs.keys()) while topic_partitions: diff --git a/kafka/cli/admin/groups/reset_offsets.py b/kafka/cli/admin/groups/reset_offsets.py index f54e5b122..1cdb07c3d 100644 --- a/kafka/cli/admin/groups/reset_offsets.py +++ b/kafka/cli/admin/groups/reset_offsets.py @@ -41,7 +41,7 @@ def command(cls, client, args): offset_specs = {} if args.spec: offsets = client.list_group_offsets(args.group_id) - spec = cls._parse_spec(args.spec) + spec = OffsetSpec.build_from(args.spec) offset_specs = {tp: spec for tp in offsets} else: offset_specs = cls._parse_partition_specs(args.partitions) @@ -52,24 +52,12 @@ def command(cls, client, args): output[tp.topic][tp.partition] = res return dict(output) - @staticmethod - def _parse_spec(spec): - try: - return int(spec) - except ValueError: - pass - try: - spec_key = spec.upper().replace('-', '_') - return OffsetSpec[spec_key] - except KeyError: - raise ValueError(f'{spec_key} is not a valid OffsetSpec') - @classmethod def _parse_partition_specs(cls, partitions): tp_offsets = {} for entry in partitions: topic, partition, spec_str = entry.rsplit(':', 2) - spec = cls._parse_spec(spec_str) + spec = OffsetSpec.build_from(spec_str) tp = TopicPartition(topic, int(partition)) if tp in tp_offsets: # Passing multiple specs for a single partition results in an InvalidRequestError diff --git a/kafka/cli/admin/partitions/list_offsets.py b/kafka/cli/admin/partitions/list_offsets.py index 51003c236..0175e7b02 100644 --- a/kafka/cli/admin/partitions/list_offsets.py +++ b/kafka/cli/admin/partitions/list_offsets.py @@ -41,18 +41,6 @@ def command(cls, client, args): } return dict(output) - @staticmethod - def _parse_spec(spec): - try: - return int(spec) - except ValueError: - pass - try: - spec_key = spec.upper().replace('-', '_') - return OffsetSpec[spec_key] - except KeyError: - raise ValueError(f'{spec_key} is not a valid OffsetSpec') - @classmethod def _parse_partition_specs(cls, client, args): if args.partitions: @@ -64,7 +52,7 @@ def _parse_partition_specs(cls, client, args): tp_offsets = {} for entry in partitions: topic, partition, spec_str = entry.rsplit(':', 2) - spec = cls._parse_spec(spec_str) + spec = OffsetSpec.build_from(spec_str) for tp in cls._parse_tp(client, topic, partition): if tp in tp_offsets: # Passing multiple specs for a single partition results in an InvalidRequestError diff --git a/kafka/protocol/consumer/offsets.py b/kafka/protocol/consumer/offsets.py index 452fd6f7b..20ff3f2c6 100644 --- a/kafka/protocol/consumer/offsets.py +++ b/kafka/protocol/consumer/offsets.py @@ -1,6 +1,7 @@ from enum import IntEnum from ..api_message import ApiMessage +from kafka.util import EnumHelper UNKNOWN_OFFSET = -1 @@ -11,12 +12,12 @@ class OffsetResetStrategy: NONE = 0 -class IsolationLevel(IntEnum): +class IsolationLevel(EnumHelper, IntEnum): READ_UNCOMMITTED = 0 READ_COMMITTED = 1 -class OffsetSpec(IntEnum): +class OffsetSpec(EnumHelper, IntEnum): # Any >= 0: # earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record. LATEST = -1 # offset of the next message that will be appended to the log and a timestamp of -1 EARLIEST = -2 # first offset on the partition, including remote-storage, and a timestamp of -1 diff --git a/kafka/util.py b/kafka/util.py index 304cc291d..aa2f80961 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -134,6 +134,19 @@ def __get__(self, obj, owner): class EnumHelper: + @classmethod + def build_from(cls, val): + if isinstance(val, cls): + return val + try: + return cls(val) + except ValueError: + pass + try: + return cls[str(val).strip().upper().replace('-', '_')] + except KeyError: + raise ValueError(f'Unrecognized {cls.__name__}: {val}') + @classmethod def value_for(cls, val): if isinstance(val, cls): From d88a3f0fb4599962d0ab6d8b92002ca9f0d0939d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 15:45:38 -0700 Subject: [PATCH 3/4] pylint --- kafka/util.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/util.py b/kafka/util.py index aa2f80961..5ba42be9a 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -143,7 +143,7 @@ def build_from(cls, val): except ValueError: pass try: - return cls[str(val).strip().upper().replace('-', '_')] + return cls[str(val).strip().upper().replace('-', '_')] # pylint: disable=E1136 except KeyError: raise ValueError(f'Unrecognized {cls.__name__}: {val}') @@ -152,8 +152,8 @@ def value_for(cls, val): if isinstance(val, cls): return val.value if isinstance(val, int): - return cls(val).value + return cls(val).value # pylint: disable=E1101 try: - return cls[str(val).upper().replace('-', '_')].value + return cls[str(val).upper().replace('-', '_')].value # pylint: disable=E1136 except KeyError: raise ValueError(f'Unrecognized {cls.__name__}: {val}') From 29e9b033edfbe3a3beba52304208c8149f2c875d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 15:53:20 -0700 Subject: [PATCH 4/4] update stubs --- kafka/protocol/consumer/offsets.pyi | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/consumer/offsets.pyi b/kafka/protocol/consumer/offsets.pyi index 239ee774e..66c1a59ac 100644 --- a/kafka/protocol/consumer/offsets.pyi +++ b/kafka/protocol/consumer/offsets.pyi @@ -15,11 +15,11 @@ class OffsetResetStrategy: EARLIEST: int NONE: int -class IsolationLevel(IntEnum): +class IsolationLevel(EnumHelper, IntEnum): READ_UNCOMMITTED: int READ_COMMITTED: int -class OffsetSpec(IntEnum): +class OffsetSpec(EnumHelper, IntEnum): LATEST: int EARLIEST: int MAX_TIMESTAMP: int