Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from kafka.admin._acls import (
ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._cluster import UpdateFeatureType
from kafka.admin._configs import (
AlterConfigOp, ConfigFilterType, ConfigResource, ConfigResourceType,
ConfigType, ConfigSourceType)
Expand All @@ -16,6 +17,7 @@
'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType',
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'UpdateFeatureType',
'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
139 changes: 137 additions & 2 deletions kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

from __future__ import annotations

from collections import defaultdict
from enum import IntEnum
import logging
from typing import TYPE_CHECKING

import kafka.errors as Errors
from kafka.protocol.api_key import ApiKey
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import DescribeLogDirsRequest
from kafka.protocol.metadata import ApiVersionsRequest, MetadataRequest
from kafka.protocol.admin import DescribeLogDirsRequest, UpdateFeaturesRequest
from kafka.util import EnumHelper

if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager
Expand Down Expand Up @@ -87,3 +91,134 @@ def get_broker_version_data(self, broker_id):
def api_versions(self):
api_versions = self._manager.broker_version_data.api_versions
return {ApiKey(k): v for k, v in api_versions.items()}

async def _async_describe_features(self, send_request_to_controller=False):
request = ApiVersionsRequest(
client_software_name=self._manager.config['client_software_name'],
client_software_version=self._manager.config['client_software_version'],
min_version=3,
)
if send_request_to_controller:
response = await self._send_request_to_controller(request)
else:
response = await self._manager.send(request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(f"ApiVersionsRequest failed: {response}")
result = defaultdict(dict)
epoch = response.finalized_features_epoch
if epoch is None or epoch < 0:
epoch = None
for feature in (response.supported_features or []):
result[feature.name]['supported'] = (feature.min_version, feature.max_version)
for feature in (response.finalized_features or []):
result[feature.name]['finalized'] = (feature.min_version_level, feature.max_version_level)
result[feature.name]['finalized_epoch'] = epoch
return dict(result)

def describe_features(self, send_request_to_controller=False):
"""Fetch the cluster's supported and finalized feature flags.

Features are broker-level capabilities (e.g. ``metadata.version``)
that can be finalized cluster-wide via ``update_features`` (KIP-584).
Requires broker >= 2.4.

Keyword Arguments:
send_request_to_controller (bool, optional): If True, route the
request to the active controller. By default the request is
sent to any available broker. Default: False.

Returns:
dict with keys:
- ``supported_features``: dict of
``{feature_name: (min_version, max_version)}``
- ``finalized_features``: dict of
``{feature_name: (min_version_level, max_version_level)}``
- ``finalized_features_epoch``: int, or None if unknown
(broker did not report an epoch, or reported -1)
"""
return self._manager.run(self._async_describe_features, send_request_to_controller)

@staticmethod
def _build_feature_updates(feature_updates):
if not isinstance(feature_updates, dict):
raise TypeError('feature_updates must be a dict of '
'{feature_name: (max_version_level, upgrade_type)} '
'or {feature_name: max_version_level}')
_FeatureUpdateKey = UpdateFeaturesRequest.FeatureUpdateKey
updates = []
for feature, spec in feature_updates.items():
if isinstance(spec, tuple):
upgrade_type, max_version_level = spec
else:
upgrade_type = UpdateFeatureType.UPGRADE
max_version_level = spec
upgrade_code = UpdateFeatureType.value_for(upgrade_type)
downgrade = upgrade_code in (
UpdateFeatureType.SAFE_DOWNGRADE.value,
UpdateFeatureType.UNSAFE_DOWNGRADE.value)
updates.append(_FeatureUpdateKey(
feature=feature,
max_version_level=int(max_version_level),
allow_downgrade=downgrade,
upgrade_type=upgrade_code))
return updates

async def _async_update_features(self, feature_updates, validate_only=False, timeout_ms=60000):
min_version = 1 if validate_only else 0
request = UpdateFeaturesRequest(
timeout_ms=timeout_ms,
feature_updates=self._build_feature_updates(feature_updates),
validate_only=validate_only,
min_version=min_version,
)
response = await self._send_request_to_controller(
request,
get_errors_fn=lambda r: [Errors.for_code(r.error_code)],
)
ret = {}
for result in response.results or []:
if result.error_code == 0:
ret[result.feature] = 'OK'
else:
ret[result.feature] = str(Errors.for_code(result.error_code)(result.error_message))
# v2+ responses omit per-feature results; top-level error is already
# raised by _send_request_to_controller, so any feature we asked about
# succeeded.
for feature in feature_updates:
ret.setdefault(feature, 'OK')
return ret

def update_features(self, feature_updates, validate_only=False, timeout_ms=60000):
"""Update cluster-wide finalized feature flags.

Finalize cluster-wide feature capabilities (e.g. ``metadata.version``).
The request is always routed to the active controller. See KIP-584.
Requires broker >= 2.7.

Arguments:
feature_updates: A dict of
``{feature_name: (upgrade_type, max_version_level)}`` or
``{feature_name: max_version_level}`` (implicit UPGRADE).
``upgrade_type`` may be a :class:`UpdateFeatureType`,
its name, or int value. A ``max_version_level < 1`` requests
deletion of the finalized feature.

Keyword Arguments:
validate_only (bool, optional): If True, validate the request but
do not apply it. Default: False.
timeout_ms (int, optional): Broker-side timeout in milliseconds.
Default: 60000.

Returns:
dict of {feature_name: 'OK' | error message}
"""
return self._manager.run(self._async_update_features,
feature_updates, validate_only, timeout_ms)


class UpdateFeatureType(EnumHelper, IntEnum):
UNKNOWN = 0
UPGRADE = 1
SAFE_DOWNGRADE = 2
UNSAFE_DOWNGRADE = 3
2 changes: 0 additions & 2 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ def run_cli(args=None):

# [cluster]
# alter-log-dirs (AlterReplicaLogDirs)
# describe-features (ApiVersions)
# update-features (UpdateFeatures)
# describe-quorum (DescribeQuorum)
# unregister-broker (UnregisterBroker)
# add-raft-voter (AddRaftVoter)
Expand Down
3 changes: 2 additions & 1 deletion kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .broker_version import GetBrokerVersion
from .describe import DescribeCluster
from .log_dirs import DescribeLogDirs
from .features import DescribeFeatures, UpdateFeatures


class ClusterSubCommand:
Expand All @@ -12,6 +13,6 @@ class ClusterSubCommand:
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster')
commands = parser.add_subparsers()
for cmd in [DescribeCluster, GetApiVersions, GetBrokerVersion, DescribeLogDirs]:
for cmd in [DescribeCluster, DescribeFeatures, UpdateFeatures, GetApiVersions, GetBrokerVersion, DescribeLogDirs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
52 changes: 52 additions & 0 deletions kafka/cli/admin/cluster/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from kafka.admin import UpdateFeatureType


class DescribeFeatures:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe-features', help='Describe Features of Kafka Cluster')
parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[],
help='Show one or more specific features. If not provided, returns all features.')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
result = client.describe_features()
if args.features:
return {k: v for k, v in result.items() if k in args.features}
else:
return result


class UpdateFeatures:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('update-features', help='Update Features of Kafka Cluster')
parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[], help='set feature=value')
parser.add_argument('--downgrade', action='store_true')
parser.add_argument('--unsafe', action='store_true')
parser.add_argument('--timeout', type=int, default=60)
parser.add_argument('--validate-only', action='store_true')
parser.set_defaults(command=cls.command)

@staticmethod
def _feature_type(args):
if not args.downgrade:
return UpdateFeatureType.UPGRADE
elif args.unsafe:
return UpdateFeatureType.UNSAFE_DOWNGRADE
else:
return UpdateFeatureType.SAFE_DOWNGRADE

@classmethod
def command(cls, client, args):
feature_type = cls._feature_type(args)
feature_updates = {
feature_name: (feature_type, version)
for feature_name, version in [feature.split('=') for feature in args.features]
}
return client.update_features(feature_updates,
validate_only=args.validate_only,
timeout_ms=1000*args.timeout)
4 changes: 4 additions & 0 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
import kafka.errors as Errors
from kafka.protocol.broker_version_data import BrokerVersionData
from kafka.future import Future
from kafka.version import __version__


log = logging.getLogger(__name__)

class KafkaConnectionManager:
DEFAULT_CONFIG = {
'client_id': 'kafka-python-' + __version__,
'client_software_name': 'kafka-python',
'client_software_version': __version__,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 30000,
'request_timeout_ms': 30000,
Expand Down
4 changes: 4 additions & 0 deletions kafka/protocol/admin/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ def json_patch(cls, json):
class DescribeLogDirsRequest(ApiMessage): pass
class DescribeLogDirsResponse(ApiMessage): pass

class UpdateFeaturesRequest(ApiMessage): pass
class UpdateFeaturesResponse(ApiMessage): pass


__all__ = [
'DescribeClusterRequest', 'DescribeClusterResponse',
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
'UpdateFeaturesRequest', 'UpdateFeaturesResponse',
]
100 changes: 99 additions & 1 deletion kafka/protocol/admin/cluster.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ from typing import Any, Self
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse']
__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse']

class DescribeClusterRequest(ApiMessage):
include_cluster_authorized_operations: bool
Expand Down Expand Up @@ -223,3 +223,101 @@ class DescribeLogDirsResponse(ApiMessage):
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class UpdateFeaturesRequest(ApiMessage):
class FeatureUpdateKey(DataContainer):
feature: str
max_version_level: int
allow_downgrade: bool
upgrade_type: int
def __init__(
self,
*args: Any,
feature: str = ...,
max_version_level: int = ...,
allow_downgrade: bool = ...,
upgrade_type: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

timeout_ms: int
feature_updates: list[FeatureUpdateKey]
validate_only: bool
def __init__(
self,
*args: Any,
timeout_ms: int = ...,
feature_updates: list[FeatureUpdateKey] = ...,
validate_only: bool = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class UpdateFeaturesResponse(ApiMessage):
class UpdatableFeatureResult(DataContainer):
feature: str
error_code: int
error_message: str | None
def __init__(
self,
*args: Any,
feature: str = ...,
error_code: int = ...,
error_message: str | None = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

throttle_time_ms: int
error_code: int
error_message: str | None
results: list[UpdatableFeatureResult]
def __init__(
self,
*args: Any,
throttle_time_ms: int = ...,
error_code: int = ...,
error_message: str | None = ...,
results: list[UpdatableFeatureResult] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...
Loading
Loading