Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from kafka.protocol.admin import (
AlterReplicaLogDirsRequest,
DescribeLogDirsRequest,
DescribeQuorumRequest,
UpdateFeaturesRequest,
)
from kafka.structs import TopicPartitionReplica
Expand Down Expand Up @@ -142,6 +143,43 @@ def alter_replica_log_dirs(self, replica_assignments):
"""
return self._manager.run(self._async_alter_replica_log_dirs, replica_assignments)

async def _async_describe_quorum(self, topic, partition):
_Topic = DescribeQuorumRequest.TopicData
_Partition = _Topic.PartitionData
request = DescribeQuorumRequest(topics=[
_Topic(topic_name=topic, partitions=[_Partition(partition_index=partition)])
])
response = await self._manager.send(request)
top_error = Errors.for_code(response.error_code)
if top_error is not Errors.NoError:
raise top_error(response.error_message or '')
result = response.to_dict()
result.pop('throttle_time_ms', None)
result.pop('error_code', None)
result.pop('error_message', None)
for topic in result['topics']:
for partition in topic['partitions']:
error = Errors.for_code(partition.pop('error_code'))(partition.pop('error_message'))
if not isinstance(error, Errors.NoError):
partition['error'] = str(error)
else:
partition['error'] = None
return result

def describe_metadata_quorum(self):
"""Describe the KRaft quorum state for the cluster metadata log.

Returns quorum info for the ``__cluster_metadata`` topic
(partition 0), including the current leader, leader epoch, high
watermark, voters, and observers. On broker version >= 3.8 (KIP-853),
the response also reports controller node endpoints in ``nodes``.
Requires a KRaft cluster.

Returns:
dict matching the DescribeQuorumResponse shape.
"""
return self._manager.run(self._async_describe_quorum, '__cluster_metadata', 0)

async def _async_get_broker_version_data(self, broker_id):
conn = await self._manager.get_connection(broker_id)
return conn.broker_version_data
Expand Down
1 change: 0 additions & 1 deletion kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def run_cli(args=None):
# abort (EndTxn)

# [cluster]
# describe-quorum (DescribeQuorum)
# unregister-broker (UnregisterBroker)
# add-raft-voter (AddRaftVoter)
# remove-raft-voter (RemoveRaftVoter)
Expand Down
3 changes: 2 additions & 1 deletion kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .describe import DescribeCluster
from .describe_quorum import DescribeQuorum
from .features import DescribeFeatures, UpdateFeatures
from .log_dirs import DescribeLogDirs, AlterLogDirs
from .versions import GetApiVersions, GetBrokerVersion
Expand All @@ -7,7 +8,7 @@
class ClusterCommandGroup:
GROUP = 'cluster'
HELP = 'Manage Kafka Cluster'
COMMANDS = [DescribeCluster,
COMMANDS = [DescribeCluster, DescribeQuorum,
GetApiVersions, GetBrokerVersion,
DescribeFeatures, UpdateFeatures,
DescribeLogDirs, AlterLogDirs]
11 changes: 11 additions & 0 deletions kafka/cli/admin/cluster/describe_quorum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class DescribeQuorum:
COMMAND = 'describe-quorum'
HELP = 'Describe the KRaft metadata quorum'

@classmethod
def add_arguments(cls, parser):
pass

@classmethod
def command(cls, client, args):
return client.describe_metadata_quorum()
4 changes: 4 additions & 0 deletions kafka/protocol/admin/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class DescribeLogDirsResponse(ApiMessage): pass
class AlterReplicaLogDirsRequest(ApiMessage): pass
class AlterReplicaLogDirsResponse(ApiMessage): pass

class DescribeQuorumRequest(ApiMessage): pass
class DescribeQuorumResponse(ApiMessage): pass

class UpdateFeaturesRequest(ApiMessage): pass
class UpdateFeaturesResponse(ApiMessage): pass

Expand All @@ -22,5 +25,6 @@ class UpdateFeaturesResponse(ApiMessage): pass
'DescribeClusterRequest', 'DescribeClusterResponse',
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse',
'DescribeQuorumRequest', 'DescribeQuorumResponse',
'UpdateFeaturesRequest', 'UpdateFeaturesResponse',
]
183 changes: 182 additions & 1 deletion kafka/protocol/admin/cluster.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ from typing import Any, Self
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse']
__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'DescribeQuorumRequest', 'DescribeQuorumResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse']

class DescribeClusterRequest(ApiMessage):
include_cluster_authorized_operations: bool
Expand Down Expand Up @@ -338,6 +338,187 @@ class AlterReplicaLogDirsResponse(ApiMessage):
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class DescribeQuorumRequest(ApiMessage):
class TopicData(DataContainer):
class PartitionData(DataContainer):
partition_index: int
def __init__(
self,
*args: Any,
partition_index: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

topic_name: str
partitions: list[PartitionData]
def __init__(
self,
*args: Any,
topic_name: str = ...,
partitions: list[PartitionData] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

topics: list[TopicData]
def __init__(
self,
*args: Any,
topics: list[TopicData] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class DescribeQuorumResponse(ApiMessage):
class TopicData(DataContainer):
class PartitionData(DataContainer):
class ReplicaState(DataContainer):
replica_id: int
replica_directory_id: uuid.UUID
log_end_offset: int
last_fetch_timestamp: int
last_caught_up_timestamp: int
def __init__(
self,
*args: Any,
replica_id: int = ...,
replica_directory_id: uuid.UUID = ...,
log_end_offset: int = ...,
last_fetch_timestamp: int = ...,
last_caught_up_timestamp: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

partition_index: int
error_code: int
error_message: str | None
leader_id: int
leader_epoch: int
high_watermark: int
current_voters: list[ReplicaState]
observers: list[ReplicaState]
def __init__(
self,
*args: Any,
partition_index: int = ...,
error_code: int = ...,
error_message: str | None = ...,
leader_id: int = ...,
leader_epoch: int = ...,
high_watermark: int = ...,
current_voters: list[ReplicaState] = ...,
observers: list[ReplicaState] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

topic_name: str
partitions: list[PartitionData]
def __init__(
self,
*args: Any,
topic_name: str = ...,
partitions: list[PartitionData] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

class Node(DataContainer):
class Listener(DataContainer):
name: str
host: str
port: int
def __init__(
self,
*args: Any,
name: str = ...,
host: str = ...,
port: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

node_id: int
listeners: list[Listener]
def __init__(
self,
*args: Any,
node_id: int = ...,
listeners: list[Listener] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

error_code: int
error_message: str | None
topics: list[TopicData]
nodes: list[Node]
def __init__(
self,
*args: Any,
error_code: int = ...,
error_message: str | None = ...,
topics: list[TopicData] = ...,
nodes: list[Node] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class UpdateFeaturesRequest(ApiMessage):
class FeatureUpdateKey(DataContainer):
feature: str
Expand Down
1 change: 1 addition & 0 deletions kafka/protocol/generate_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
SIMPLE_TYPE_MAP = {
'int8': 'int',
'int16': 'int',
'uint16': 'int',
'int32': 'int',
'int64': 'int',
'float64': 'float',
Expand Down
4 changes: 2 additions & 2 deletions kafka/protocol/schemas/fields/codecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from .tagged_fields import TaggedFields
from .types import (
BitField, Boolean, UUID, Bytes, String,
Int8, Int16, Int32, Int64, UnsignedVarInt32, Float64,
Int8, Int16, Int32, Int64, UnsignedInt16, UnsignedVarInt32, Float64,
)

__all__ = [
'BitField', 'Boolean', 'UUID', 'Bytes', 'String',
'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedVarInt32', 'Float64',
'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedInt16', 'UnsignedVarInt32', 'Float64',
'TaggedFields', 'EncodeBuffer',
]
5 changes: 5 additions & 0 deletions kafka/protocol/schemas/fields/codecs/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class Int16(FixedCodec):
size = 2


class UnsignedInt16(FixedCodec):
fmt = 'H'
size = 2


class Int32(FixedCodec):
fmt = 'i'
size = 4
Expand Down
6 changes: 3 additions & 3 deletions kafka/protocol/schemas/fields/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from .base import BaseField
from .codecs import (
BitField, Boolean, Bytes,
Float64, Int8, Int16, Int32, Int64, String, UUID
Float64, Int8, Int16, Int32, Int64, String, UnsignedInt16, UUID
)


class SimpleField(BaseField):
TYPES = {
'int8': Int8,
'int16': Int16,
#'uint16': UnsignedInt16,
'uint16': UnsignedInt16,
'int32': Int32,
#'uint32': UnsignedInt32,
'int64': Int64,
Expand Down Expand Up @@ -52,7 +52,7 @@ def _calculate_default(self, default):
else:
raise ValueError('Invalid default for boolean field %s: %s' % (self._name, default))
return bool(default)
elif self._type in (Int8, Int16, Int32, Int64):
elif self._type in (Int8, Int16, Int32, Int64, UnsignedInt16):
if not default:
return 0
if isinstance(default, str):
Expand Down
Loading
Loading