From 42c85e5c419107568eaacc83517fd23563de0f62 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 14:34:44 -0700 Subject: [PATCH 1/8] Admin: cluster describe_features --- kafka/admin/_cluster.py | 52 ++++++++++++++++++++++++++++++++++++++++- kafka/net/manager.py | 4 ++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index 2443154ad..24417f66a 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -5,8 +5,9 @@ import logging from typing import TYPE_CHECKING +import kafka.errors as Errors from kafka.protocol.api_key import ApiKey -from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.metadata import ApiVersionsRequest, MetadataRequest from kafka.protocol.admin import DescribeLogDirsRequest if TYPE_CHECKING: @@ -87,3 +88,52 @@ def get_broker_version_data(self, broker_id): def api_versions(self): api_versions = self._manager.broker_version_data.api_versions return {ApiKey(k): v for k, v in api_versions.items()} + + async def _async_describe_features(self, send_request_to_controller=False): + request = ApiVersionsRequest( + client_software_name=self._manager.config['client_software_name'], + client_software_version=self._manager.config['client_software_version'], + min_version=3, + ) + if send_request_to_controller: + response = await self._send_request_to_controller(request) + else: + response = await self._manager.send(request) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type(f"ApiVersionsRequest failed: {response}") + supported = {feature.name: (feature.min_version, feature.max_version) + for feature in (response.supported_features or [])} + finalized = {feature.name: (feature.min_version_level, feature.max_version_level) + for feature in (response.finalized_features or [])} + epoch = response.finalized_features_epoch + if epoch is None or epoch < 0: + epoch = None + return { + 'supported_features': supported, + 'finalized_features': finalized, + 'finalized_features_epoch': epoch, + } + + def describe_features(self, send_request_to_controller=False): + """Fetch the cluster's supported and finalized feature flags. + + Features are broker-level capabilities (e.g. ``metadata.version``) + that can be finalized cluster-wide via ``update_features`` (KIP-584). + Requires broker >= 2.4. + + Keyword Arguments: + send_request_to_controller (bool, optional): If True, route the + request to the active controller. By default the request is + sent to any available broker. Default: False. + + Returns: + dict with keys: + - ``supported_features``: dict of + ``{feature_name: (min_version, max_version)}`` + - ``finalized_features``: dict of + ``{feature_name: (min_version_level, max_version_level)}`` + - ``finalized_features_epoch``: int, or None if unknown + (broker did not report an epoch, or reported -1) + """ + return self._manager.run(self._async_describe_features, send_request_to_controller) diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 1ffeac3f2..34d7df1c6 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -13,12 +13,16 @@ import kafka.errors as Errors from kafka.protocol.broker_version_data import BrokerVersionData from kafka.future import Future +from kafka.version import __version__ log = logging.getLogger(__name__) class KafkaConnectionManager: DEFAULT_CONFIG = { + 'client_id': 'kafka-python-' + __version__, + 'client_software_name': 'kafka-python', + 'client_software_version': __version__, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, 'request_timeout_ms': 30000, From bd303d4928af95a67e147e49fe3a2add28d85626 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 14:44:27 -0700 Subject: [PATCH 2/8] cli cluster describe-features --- kafka/cli/admin/cluster/__init__.py | 3 ++- kafka/cli/admin/cluster/features.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 kafka/cli/admin/cluster/features.py diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index 085a617ce..9ee815be6 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -4,6 +4,7 @@ from .broker_version import GetBrokerVersion from .describe import DescribeCluster from .log_dirs import DescribeLogDirs +from .features import DescribeFeatures class ClusterSubCommand: @@ -12,6 +13,6 @@ class ClusterSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') commands = parser.add_subparsers() - for cmd in [DescribeCluster, GetApiVersions, GetBrokerVersion, DescribeLogDirs]: + for cmd in [DescribeCluster, DescribeFeatures, GetApiVersions, GetBrokerVersion, DescribeLogDirs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/cluster/features.py b/kafka/cli/admin/cluster/features.py new file mode 100644 index 000000000..1626a5d1c --- /dev/null +++ b/kafka/cli/admin/cluster/features.py @@ -0,0 +1,10 @@ +class DescribeFeatures: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe-features', help='Describe Features of Kafka Cluster') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + return client.describe_features() From 2a4cf807ffb168dde6883147476acaebadb0366d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 14:53:11 -0700 Subject: [PATCH 3/8] kafka.protocol UpdateFeatures --- kafka/protocol/admin/cluster.py | 4 + kafka/protocol/admin/cluster.pyi | 100 +++++++++++++++++- .../resources/UpdateFeaturesRequest.json | 43 ++++++++ .../resources/UpdateFeaturesResponse.json | 39 +++++++ 4 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 kafka/protocol/schemas/resources/UpdateFeaturesRequest.json create mode 100644 kafka/protocol/schemas/resources/UpdateFeaturesResponse.json diff --git a/kafka/protocol/admin/cluster.py b/kafka/protocol/admin/cluster.py index f9049e694..af242dd6e 100644 --- a/kafka/protocol/admin/cluster.py +++ b/kafka/protocol/admin/cluster.py @@ -11,8 +11,12 @@ def json_patch(cls, json): class DescribeLogDirsRequest(ApiMessage): pass class DescribeLogDirsResponse(ApiMessage): pass +class UpdateFeaturesRequest(ApiMessage): pass +class UpdateFeaturesResponse(ApiMessage): pass + __all__ = [ 'DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', + 'UpdateFeaturesRequest', 'UpdateFeaturesResponse', ] diff --git a/kafka/protocol/admin/cluster.pyi b/kafka/protocol/admin/cluster.pyi index d2686cc45..6070360a6 100644 --- a/kafka/protocol/admin/cluster.pyi +++ b/kafka/protocol/admin/cluster.pyi @@ -5,7 +5,7 @@ from typing import Any, Self from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse'] +__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse'] class DescribeClusterRequest(ApiMessage): include_cluster_authorized_operations: bool @@ -223,3 +223,101 @@ class DescribeLogDirsResponse(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class UpdateFeaturesRequest(ApiMessage): + class FeatureUpdateKey(DataContainer): + feature: str + max_version_level: int + allow_downgrade: bool + upgrade_type: int + def __init__( + self, + *args: Any, + feature: str = ..., + max_version_level: int = ..., + allow_downgrade: bool = ..., + upgrade_type: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + timeout_ms: int + feature_updates: list[FeatureUpdateKey] + validate_only: bool + def __init__( + self, + *args: Any, + timeout_ms: int = ..., + feature_updates: list[FeatureUpdateKey] = ..., + validate_only: bool = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class UpdateFeaturesResponse(ApiMessage): + class UpdatableFeatureResult(DataContainer): + feature: str + error_code: int + error_message: str | None + def __init__( + self, + *args: Any, + feature: str = ..., + error_code: int = ..., + error_message: str | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + error_code: int + error_message: str | None + results: list[UpdatableFeatureResult] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + error_code: int = ..., + error_message: str | None = ..., + results: list[UpdatableFeatureResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... diff --git a/kafka/protocol/schemas/resources/UpdateFeaturesRequest.json b/kafka/protocol/schemas/resources/UpdateFeaturesRequest.json new file mode 100644 index 000000000..e2f9b45d4 --- /dev/null +++ b/kafka/protocol/schemas/resources/UpdateFeaturesRequest.json @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 57, + "type": "request", + "listeners": ["broker", "controller"], + "name": "UpdateFeaturesRequest", + // Version 1 adds validate only field. + // + // Version 2 changes the response to not return feature level results. + "validVersions": "0-2", + "flexibleVersions": "0+", + "fields": [ + { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000", + "about": "How long to wait in milliseconds before timing out the request." }, + { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + { "name": "Feature", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the finalized feature to be updated."}, + { "name": "MaxVersionLevel", "type": "int16", "versions": "0+", + "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."}, + { "name": "AllowDowngrade", "type": "bool", "versions": "0", + "about": "DEPRECATED in version 1 (see DowngradeType). When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}, + { "name": "UpgradeType", "type": "int8", "versions": "1+", "default": 1, + "about": "Determine which type of upgrade will be performed: 1 will perform an upgrade only (default), 2 is safe downgrades only (lossless), 3 is unsafe downgrades (lossy)."} + ]}, + { "name": "ValidateOnly", "type": "bool", "versions": "1+", "default": false, + "about": "True if we should validate the request, but not perform the upgrade or downgrade."} + ] +} diff --git a/kafka/protocol/schemas/resources/UpdateFeaturesResponse.json b/kafka/protocol/schemas/resources/UpdateFeaturesResponse.json new file mode 100644 index 000000000..440ae257e --- /dev/null +++ b/kafka/protocol/schemas/resources/UpdateFeaturesResponse.json @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 57, + "type": "response", + "name": "UpdateFeaturesResponse", + "validVersions": "0-2", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or `0` if there was no top-level error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The top-level error message, or `null` if there was no top-level error." }, + { "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0-1", "ignorable": true, + "about": "Results for each feature update.", "fields": [ + { "name": "Feature", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the finalized feature."}, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The feature update error code or `0` if the feature update succeeded." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The feature update error, or `null` if the feature update succeeded." } + ]} + ] +} From 5c0365d0b05c14aac206e513b7eadec3cb27c3aa Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 19:22:54 -0700 Subject: [PATCH 4/8] Admin: cluster update_features --- kafka/admin/__init__.py | 2 + kafka/admin/_cluster.py | 88 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 0dd32be6b..72728df93 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -2,6 +2,7 @@ from kafka.admin._acls import ( ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, ResourceType, ACLPermissionType, ACLResourcePatternType) +from kafka.admin._cluster import UpdateFeatureType from kafka.admin._configs import ( AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) @@ -16,6 +17,7 @@ 'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType', 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', 'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', + 'UpdateFeatureType', 'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__ 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index 24417f66a..0761e1684 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -2,13 +2,15 @@ from __future__ import annotations +from enum import IntEnum import logging from typing import TYPE_CHECKING import kafka.errors as Errors from kafka.protocol.api_key import ApiKey from kafka.protocol.metadata import ApiVersionsRequest, MetadataRequest -from kafka.protocol.admin import DescribeLogDirsRequest +from kafka.protocol.admin import DescribeLogDirsRequest, UpdateFeaturesRequest +from kafka.util import EnumHelper if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager @@ -137,3 +139,87 @@ def describe_features(self, send_request_to_controller=False): (broker did not report an epoch, or reported -1) """ return self._manager.run(self._async_describe_features, send_request_to_controller) + + @staticmethod + def _build_feature_updates(feature_updates): + if not isinstance(feature_updates, dict): + raise TypeError('feature_updates must be a dict of ' + '{feature_name: (max_version_level, upgrade_type)} ' + 'or {feature_name: max_version_level}') + _FeatureUpdateKey = UpdateFeaturesRequest.FeatureUpdateKey + updates = [] + for feature, spec in feature_updates.items(): + if isinstance(spec, tuple): + upgrade_type, max_version_level = spec + else: + upgrade_type = UpdateFeatureType.UPGRADE + max_version_level = spec + upgrade_code = UpdateFeatureType.value_for(upgrade_type) + downgrade = upgrade_code in ( + UpdateFeatureType.SAFE_DOWNGRADE.value, + UpdateFeatureType.UNSAFE_DOWNGRADE.value) + updates.append(_FeatureUpdateKey( + feature=feature, + max_version_level=int(max_version_level), + allow_downgrade=downgrade, + upgrade_type=upgrade_code)) + return updates + + async def _async_update_features(self, feature_updates, validate_only=False, timeout_ms=60000): + min_version = 1 if validate_only else 0 + request = UpdateFeaturesRequest( + timeout_ms=timeout_ms, + feature_updates=self._build_feature_updates(feature_updates), + validate_only=validate_only, + min_version=min_version, + ) + response = await self._send_request_to_controller( + request, + get_errors_fn=lambda r: [Errors.for_code(r.error_code)], + ) + ret = {} + for result in response.results or []: + if result.error_code == 0: + ret[result.feature] = 'OK' + else: + ret[result.feature] = str(Errors.for_code(result.error_code)(result.error_message)) + # v2+ responses omit per-feature results; top-level error is already + # raised by _send_request_to_controller, so any feature we asked about + # succeeded. + for feature in feature_updates: + ret.setdefault(feature, 'OK') + return ret + + def update_features(self, feature_updates, validate_only=False, timeout_ms=60000): + """Update cluster-wide finalized feature flags. + + Finalize cluster-wide feature capabilities (e.g. ``metadata.version``). + The request is always routed to the active controller. See KIP-584. + Requires broker >= 2.7. + + Arguments: + feature_updates: A dict of + ``{feature_name: (upgrade_type, max_version_level)}`` or + ``{feature_name: max_version_level}`` (implicit UPGRADE). + ``upgrade_type`` may be a :class:`UpdateFeatureType`, + its name, or int value. A ``max_version_level < 1`` requests + deletion of the finalized feature. + + Keyword Arguments: + validate_only (bool, optional): If True, validate the request but + do not apply it. Default: False. + timeout_ms (int, optional): Broker-side timeout in milliseconds. + Default: 60000. + + Returns: + dict of {feature_name: 'OK' | error message} + """ + return self._manager.run(self._async_update_features, + feature_updates, validate_only, timeout_ms) + + +class UpdateFeatureType(EnumHelper, IntEnum): + UNKNOWN = 0 + UPGRADE = 1 + SAFE_DOWNGRADE = 2 + UNSAFE_DOWNGRADE = 3 From 58cfc4006a4bd52590d396805a5c0c66df5c52d2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 19:23:19 -0700 Subject: [PATCH 5/8] admin.cli: cluster update-features --- kafka/cli/admin/cluster/__init__.py | 4 ++-- kafka/cli/admin/cluster/features.py | 36 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index 9ee815be6..b47b03674 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -4,7 +4,7 @@ from .broker_version import GetBrokerVersion from .describe import DescribeCluster from .log_dirs import DescribeLogDirs -from .features import DescribeFeatures +from .features import DescribeFeatures, UpdateFeatures class ClusterSubCommand: @@ -13,6 +13,6 @@ class ClusterSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') commands = parser.add_subparsers() - for cmd in [DescribeCluster, DescribeFeatures, GetApiVersions, GetBrokerVersion, DescribeLogDirs]: + for cmd in [DescribeCluster, DescribeFeatures, UpdateFeatures, GetApiVersions, GetBrokerVersion, DescribeLogDirs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/cluster/features.py b/kafka/cli/admin/cluster/features.py index 1626a5d1c..9369d1a2e 100644 --- a/kafka/cli/admin/cluster/features.py +++ b/kafka/cli/admin/cluster/features.py @@ -1,3 +1,6 @@ +from kafka.admin import UpdateFeatureType + + class DescribeFeatures: @classmethod @@ -8,3 +11,36 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): return client.describe_features() + + +class UpdateFeatures: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('update-features', help='Update Features of Kafka Cluster') + parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[], help='set feature=value') + parser.add_argument('--downgrade', action='store_true') + parser.add_argument('--unsafe', action='store_true') + parser.add_argument('--timeout', type=int, default=60) + parser.add_argument('--validate-only', action='store_true') + parser.set_defaults(command=cls.command) + + @staticmethod + def _feature_type(args): + if not args.downgrade: + return UpdateFeatureType.UPGRADE + elif args.unsafe: + return UpdateFeatureType.UNSAFE_DOWNGRADE + else: + return UpdateFeatureType.SAFE_DOWNGRADE + + @classmethod + def command(cls, client, args): + feature_type = cls._feature_type(args) + feature_updates = { + feature_name: (feature_type, version) + for feature_name, version in [feature.split('=') for feature in args.features] + } + return client.update_features(feature_updates, + validate_only=args.validate_only, + timeout_ms=1000*args.timeout) From 9be6d4f3e7adbe4db69f24a53d212118b2f658be Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 21:39:19 -0700 Subject: [PATCH 6/8] tests --- test/admin/test_admin_cluster_features.py | 301 ++++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 test/admin/test_admin_cluster_features.py diff --git a/test/admin/test_admin_cluster_features.py b/test/admin/test_admin_cluster_features.py new file mode 100644 index 000000000..c970c992a --- /dev/null +++ b/test/admin/test_admin_cluster_features.py @@ -0,0 +1,301 @@ +import pytest + +from kafka.admin import KafkaAdminClient, UpdateFeatureType +from kafka.errors import ( + ClusterAuthorizationFailedError, + FeatureUpdateFailedError, + InvalidUpdateVersionError, +) # noqa: F401 +from kafka.protocol.admin import UpdateFeaturesRequest, UpdateFeaturesResponse +from kafka.protocol.metadata import ApiVersionsRequest, ApiVersionsResponse + +from test.mock_broker import MockBroker + + +# --------------------------------------------------------------------------- +# fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def broker(request): + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + request_timeout_ms=5000, + ) + try: + yield admin + finally: + admin.close() + + +# --------------------------------------------------------------------------- +# describe_features +# --------------------------------------------------------------------------- + + +def _api_versions_response( + *, + error_code=0, + supported=(), + finalized=(), + finalized_epoch=-1, +): + """supported: iterable of (name, min, max). finalized: iterable of (name, min_level, max_level).""" + Api = ApiVersionsResponse.ApiVersion + SF = ApiVersionsResponse.SupportedFeatureKey + FF = ApiVersionsResponse.FinalizedFeatureKey + return ApiVersionsResponse( + error_code=error_code, + api_keys=[Api(api_key=ApiVersionsRequest.API_KEY, min_version=0, max_version=4)], + throttle_time_ms=0, + supported_features=[ + SF(name=n, min_version=mn, max_version=mx) for n, mn, mx in supported + ], + finalized_features_epoch=finalized_epoch, + finalized_features=[ + FF(name=n, min_version_level=mn, max_version_level=mx) for n, mn, mx in finalized + ], + zk_migration_ready=False, + ) + + +class TestDescribeFeaturesMockBroker: + + def test_happy_path(self, broker, admin): + # Set the response directly to avoid bootstrap consuming our queued responses + broker._api_versions_response = _api_versions_response( + supported=[ + ('metadata.version', 1, 19), + ('kraft.version', 0, 1), + ], + finalized=[('metadata.version', 8, 8)], + finalized_epoch=42, + ) + + result = admin.describe_features() + + assert result['supported_features'] == { + 'metadata.version': (1, 19), + 'kraft.version': (0, 1), + } + assert result['finalized_features'] == {'metadata.version': (8, 8)} + assert result['finalized_features_epoch'] == 42 + + def test_empty_features(self, broker, admin): + broker.respond(ApiVersionsRequest, _api_versions_response()) + result = admin.describe_features() + assert result == { + 'supported_features': {}, + 'finalized_features': {}, + 'finalized_features_epoch': None, + } + + def test_negative_epoch_normalized_to_none(self, broker, admin): + broker.respond(ApiVersionsRequest, _api_versions_response( + supported=[('metadata.version', 1, 19)], + finalized_epoch=-1, + )) + assert admin.describe_features()['finalized_features_epoch'] is None + + def test_sends_request_version_3_plus(self, broker, admin): + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['version'] = api_version + return _api_versions_response() + + broker.respond_fn(ApiVersionsRequest, handler) + admin.describe_features() + # Admin explicitly requests min_version=3, broker supports up to 4 + assert captured['version'] >= 3 + + +# --------------------------------------------------------------------------- +# update_features +# --------------------------------------------------------------------------- + + +def _update_features_response(results=(), error_code=0, error_message=None): + """results: iterable of (feature, error_code, error_message).""" + Result = UpdateFeaturesResponse.UpdatableFeatureResult + return UpdateFeaturesResponse( + throttle_time_ms=0, + error_code=error_code, + error_message=error_message, + results=[Result(feature=f, error_code=ec, error_message=em) + for f, ec, em in results], + ) + + +def _capture_update(captured, response=None): + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = UpdateFeaturesRequest.decode( + request_bytes, version=api_version, header=True) + captured['version'] = api_version + return response if response is not None else _update_features_response([ + ('metadata.version', 0, None), + ]) + return handler + + +def _sent_updates(captured): + """Return {feature: FeatureUpdateKey} from the decoded request. At v2 the + AllowDowngrade field is absent; only feature/max_version_level/upgrade_type + are present.""" + return {u.feature: u for u in captured['request'].feature_updates} + + +class TestUpdateFeaturesMockBroker: + + def test_upgrade_implicit(self, broker, admin): + """Bare int level means UPGRADE (type=1).""" + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured)) + + result = admin.update_features({'metadata.version': 19}) + + sent = _sent_updates(captured) + assert set(sent) == {'metadata.version'} + assert sent['metadata.version'].max_version_level == 19 + assert sent['metadata.version'].upgrade_type == UpdateFeatureType.UPGRADE.value + assert result == {'metadata.version': 'OK'} + + def test_safe_downgrade_tuple(self, broker, admin): + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured)) + + admin.update_features({'metadata.version': (UpdateFeatureType.SAFE_DOWNGRADE, 8)}) + + u = _sent_updates(captured)['metadata.version'] + assert u.max_version_level == 8 + assert u.upgrade_type == UpdateFeatureType.SAFE_DOWNGRADE.value + + def test_unsafe_downgrade_string(self, broker, admin): + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured)) + + admin.update_features({'metadata.version': ('unsafe-downgrade', 8)}) + + u = _sent_updates(captured)['metadata.version'] + assert u.upgrade_type == UpdateFeatureType.UNSAFE_DOWNGRADE.value + + def test_upgrade_type_by_int(self, broker, admin): + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured)) + + admin.update_features({'metadata.version': (2, 8)}) # 2 = SAFE_DOWNGRADE + + assert _sent_updates(captured)['metadata.version'].upgrade_type == 2 + + def test_deletion_request(self, broker, admin): + """max_version_level < 1 requests deletion of the finalized feature.""" + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured, + _update_features_response([('kraft.version', 0, None)]))) + + admin.update_features({'kraft.version': ('safe-downgrade', 0)}) + + assert _sent_updates(captured)['kraft.version'].max_version_level == 0 + + def test_validate_only_propagates(self, broker, admin): + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured)) + + admin.update_features({'metadata.version': 19}, validate_only=True) + + assert captured['request'].validate_only is True + + def test_timeout_ms_propagates(self, broker, admin): + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured)) + + admin.update_features({'metadata.version': 19}, timeout_ms=12345) + + assert captured['request'].timeout_ms == 12345 + + def test_multiple_features(self, broker, admin): + captured = {} + broker.respond_fn(UpdateFeaturesRequest, _capture_update(captured, + _update_features_response([ + ('metadata.version', 0, None), + ('transaction.version', 0, None), + ]))) + + result = admin.update_features({ + 'metadata.version': 19, + 'transaction.version': (UpdateFeatureType.UPGRADE, 2), + }) + + sent = _sent_updates(captured) + assert sent['metadata.version'].max_version_level == 19 + assert sent['metadata.version'].upgrade_type == UpdateFeatureType.UPGRADE.value + assert sent['transaction.version'].max_version_level == 2 + assert sent['transaction.version'].upgrade_type == UpdateFeatureType.UPGRADE.value + assert result == {'metadata.version': 'OK', 'transaction.version': 'OK'} + + @pytest.mark.parametrize('broker', [(3, 9, 0)], indirect=True) + def test_per_feature_error_surfaces_in_result(self, broker, admin): + broker.respond(UpdateFeaturesRequest, _update_features_response([ + ('metadata.version', InvalidUpdateVersionError.errno, + 'Unsafe metadata downgrade is not supported in this version.'), + ])) + + result = admin.update_features({'metadata.version': ('UNSAFE_DOWNGRADE', 8)}) + + assert 'metadata.version' in result + assert 'InvalidUpdateVersion' in result['metadata.version'] + assert 'Unsafe metadata downgrade' in result['metadata.version'] + + def test_top_level_error_raises(self, broker, admin): + broker.respond(UpdateFeaturesRequest, _update_features_response( + error_code=ClusterAuthorizationFailedError.errno, + error_message='not authorized', + )) + + with pytest.raises(ClusterAuthorizationFailedError): + admin.update_features({'metadata.version': 19}) + + def test_v2_empty_results_defaults_to_ok(self, broker, admin): + """v2 UpdateFeaturesResponse has no per-feature results; client sets 'OK' + for each requested feature when the top-level error code is NoError.""" + broker.respond(UpdateFeaturesRequest, _update_features_response(results=[])) + + result = admin.update_features({ + 'metadata.version': 19, + 'transaction.version': 2, + }) + + assert result == {'metadata.version': 'OK', 'transaction.version': 'OK'} + + def test_non_dict_raises(self, admin): + with pytest.raises(TypeError, match='feature_updates must be a dict'): + admin.update_features([('metadata.version', 19)]) + + def test_unrecognized_upgrade_type_string(self, admin): + with pytest.raises(ValueError, match='Unrecognized UpdateFeatureType'): + admin.update_features({'metadata.version': ('bogus', 19)}) + + @pytest.mark.parametrize('broker', [(3, 9, 0)], indirect=True) + def test_partial_error_reports_both(self, broker, admin): + """Mix of ok + failed features surfaces independently in the result dict.""" + broker.respond(UpdateFeaturesRequest, _update_features_response([ + ('metadata.version', 0, None), + ('transaction.version', FeatureUpdateFailedError.errno, 'dependency not met'), + ])) + + result = admin.update_features({ + 'metadata.version': 19, + 'transaction.version': 2, + }) + + assert result['metadata.version'] == 'OK' + assert 'FeatureUpdateFailed' in result['transaction.version'] + assert 'dependency not met' in result['transaction.version'] From 49d82a3550d3b244e5d5d5df8159d2e37107d8a6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 21:41:50 -0700 Subject: [PATCH 7/8] update todo --- kafka/cli/admin/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 7151ceae6..5dbcc8c60 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -122,8 +122,6 @@ def run_cli(args=None): # [cluster] # alter-log-dirs (AlterReplicaLogDirs) - # describe-features (ApiVersions) - # update-features (UpdateFeatures) # describe-quorum (DescribeQuorum) # unregister-broker (UnregisterBroker) # add-raft-voter (AddRaftVoter) From 0b722e79d620cbfb48ec8d822a33d2172a7ed349 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 22:05:52 -0700 Subject: [PATCH 8/8] change describe shape: feature => supported/finalized => (min, max) --- kafka/admin/_cluster.py | 17 ++++++++--------- kafka/cli/admin/cluster/features.py | 8 +++++++- test/admin/test_admin_cluster_features.py | 21 ++++++++------------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index 0761e1684..dc366bd4b 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections import defaultdict from enum import IntEnum import logging from typing import TYPE_CHECKING @@ -104,18 +105,16 @@ async def _async_describe_features(self, send_request_to_controller=False): error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: raise error_type(f"ApiVersionsRequest failed: {response}") - supported = {feature.name: (feature.min_version, feature.max_version) - for feature in (response.supported_features or [])} - finalized = {feature.name: (feature.min_version_level, feature.max_version_level) - for feature in (response.finalized_features or [])} + result = defaultdict(dict) epoch = response.finalized_features_epoch if epoch is None or epoch < 0: epoch = None - return { - 'supported_features': supported, - 'finalized_features': finalized, - 'finalized_features_epoch': epoch, - } + for feature in (response.supported_features or []): + result[feature.name]['supported'] = (feature.min_version, feature.max_version) + for feature in (response.finalized_features or []): + result[feature.name]['finalized'] = (feature.min_version_level, feature.max_version_level) + result[feature.name]['finalized_epoch'] = epoch + return dict(result) def describe_features(self, send_request_to_controller=False): """Fetch the cluster's supported and finalized feature flags. diff --git a/kafka/cli/admin/cluster/features.py b/kafka/cli/admin/cluster/features.py index 9369d1a2e..c16880d11 100644 --- a/kafka/cli/admin/cluster/features.py +++ b/kafka/cli/admin/cluster/features.py @@ -6,11 +6,17 @@ class DescribeFeatures: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('describe-features', help='Describe Features of Kafka Cluster') + parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[], + help='Show one or more specific features. If not provided, returns all features.') parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): - return client.describe_features() + result = client.describe_features() + if args.features: + return {k: v for k, v in result.items() if k in args.features} + else: + return result class UpdateFeatures: diff --git a/test/admin/test_admin_cluster_features.py b/test/admin/test_admin_cluster_features.py index c970c992a..4e184871c 100644 --- a/test/admin/test_admin_cluster_features.py +++ b/test/admin/test_admin_cluster_features.py @@ -82,28 +82,23 @@ def test_happy_path(self, broker, admin): result = admin.describe_features() - assert result['supported_features'] == { - 'metadata.version': (1, 19), - 'kraft.version': (0, 1), + assert result == { + 'metadata.version': {'supported': (1, 19), 'finalized': (8, 8), 'finalized_epoch': 42}, + 'kraft.version': {'supported': (0, 1)}, } - assert result['finalized_features'] == {'metadata.version': (8, 8)} - assert result['finalized_features_epoch'] == 42 def test_empty_features(self, broker, admin): broker.respond(ApiVersionsRequest, _api_versions_response()) result = admin.describe_features() - assert result == { - 'supported_features': {}, - 'finalized_features': {}, - 'finalized_features_epoch': None, - } + assert result == {} def test_negative_epoch_normalized_to_none(self, broker, admin): - broker.respond(ApiVersionsRequest, _api_versions_response( + broker._api_versions_response = _api_versions_response( supported=[('metadata.version', 1, 19)], + finalized=[('metadata.version', 8, 8)], finalized_epoch=-1, - )) - assert admin.describe_features()['finalized_features_epoch'] is None + ) + assert admin.describe_features()['metadata.version']['finalized_epoch'] is None def test_sends_request_version_3_plus(self, broker, admin): captured = {}