diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index aee5b326b..c14e3f66e 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -13,6 +13,7 @@ from kafka.protocol.admin import ( AlterReplicaLogDirsRequest, DescribeLogDirsRequest, + DescribeQuorumRequest, UpdateFeaturesRequest, ) from kafka.structs import TopicPartitionReplica @@ -142,6 +143,43 @@ def alter_replica_log_dirs(self, replica_assignments): """ return self._manager.run(self._async_alter_replica_log_dirs, replica_assignments) + async def _async_describe_quorum(self, topic, partition): + _Topic = DescribeQuorumRequest.TopicData + _Partition = _Topic.PartitionData + request = DescribeQuorumRequest(topics=[ + _Topic(topic_name=topic, partitions=[_Partition(partition_index=partition)]) + ]) + response = await self._manager.send(request) + top_error = Errors.for_code(response.error_code) + if top_error is not Errors.NoError: + raise top_error(response.error_message or '') + result = response.to_dict() + result.pop('throttle_time_ms', None) + result.pop('error_code', None) + result.pop('error_message', None) + for topic in result['topics']: + for partition in topic['partitions']: + error = Errors.for_code(partition.pop('error_code'))(partition.pop('error_message')) + if not isinstance(error, Errors.NoError): + partition['error'] = str(error) + else: + partition['error'] = None + return result + + def describe_metadata_quorum(self): + """Describe the KRaft quorum state for the cluster metadata log. + + Returns quorum info for the ``__cluster_metadata`` topic + (partition 0), including the current leader, leader epoch, high + watermark, voters, and observers. On broker version >= 3.8 (KIP-853), + the response also reports controller node endpoints in ``nodes``. + Requires a KRaft cluster. + + Returns: + dict matching the DescribeQuorumResponse shape. + """ + return self._manager.run(self._async_describe_quorum, '__cluster_metadata', 0) + async def _async_get_broker_version_data(self, broker_id): conn = await self._manager.get_connection(broker_id) return conn.broker_version_data diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 71fb3213d..3fd23830f 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -102,7 +102,6 @@ def run_cli(args=None): # abort (EndTxn) # [cluster] - # describe-quorum (DescribeQuorum) # unregister-broker (UnregisterBroker) # add-raft-voter (AddRaftVoter) # remove-raft-voter (RemoveRaftVoter) diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index c7b21891b..0472d4e84 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -1,4 +1,5 @@ from .describe import DescribeCluster +from .describe_quorum import DescribeQuorum from .features import DescribeFeatures, UpdateFeatures from .log_dirs import DescribeLogDirs, AlterLogDirs from .versions import GetApiVersions, GetBrokerVersion @@ -7,7 +8,7 @@ class ClusterCommandGroup: GROUP = 'cluster' HELP = 'Manage Kafka Cluster' - COMMANDS = [DescribeCluster, + COMMANDS = [DescribeCluster, DescribeQuorum, GetApiVersions, GetBrokerVersion, DescribeFeatures, UpdateFeatures, DescribeLogDirs, AlterLogDirs] diff --git a/kafka/cli/admin/cluster/describe_quorum.py b/kafka/cli/admin/cluster/describe_quorum.py new file mode 100644 index 000000000..f70b4127c --- /dev/null +++ b/kafka/cli/admin/cluster/describe_quorum.py @@ -0,0 +1,11 @@ +class DescribeQuorum: + COMMAND = 'describe-quorum' + HELP = 'Describe the KRaft metadata quorum' + + @classmethod + def add_arguments(cls, parser): + pass + + @classmethod + def command(cls, client, args): + return client.describe_metadata_quorum() diff --git a/kafka/protocol/admin/cluster.py b/kafka/protocol/admin/cluster.py index b95568cb4..493ae4d84 100644 --- a/kafka/protocol/admin/cluster.py +++ b/kafka/protocol/admin/cluster.py @@ -14,6 +14,9 @@ class DescribeLogDirsResponse(ApiMessage): pass class AlterReplicaLogDirsRequest(ApiMessage): pass class AlterReplicaLogDirsResponse(ApiMessage): pass +class DescribeQuorumRequest(ApiMessage): pass +class DescribeQuorumResponse(ApiMessage): pass + class UpdateFeaturesRequest(ApiMessage): pass class UpdateFeaturesResponse(ApiMessage): pass @@ -22,5 +25,6 @@ class UpdateFeaturesResponse(ApiMessage): pass 'DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', + 'DescribeQuorumRequest', 'DescribeQuorumResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse', ] diff --git a/kafka/protocol/admin/cluster.pyi b/kafka/protocol/admin/cluster.pyi index 0b108aa11..3acc5a8fa 100644 --- a/kafka/protocol/admin/cluster.pyi +++ b/kafka/protocol/admin/cluster.pyi @@ -5,7 +5,7 @@ from typing import Any, Self from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse'] +__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'DescribeQuorumRequest', 'DescribeQuorumResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse'] class DescribeClusterRequest(ApiMessage): include_cluster_authorized_operations: bool @@ -338,6 +338,187 @@ class AlterReplicaLogDirsResponse(ApiMessage): def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... +class DescribeQuorumRequest(ApiMessage): + class TopicData(DataContainer): + class PartitionData(DataContainer): + partition_index: int + def __init__( + self, + *args: Any, + partition_index: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + topic_name: str + partitions: list[PartitionData] + def __init__( + self, + *args: Any, + topic_name: str = ..., + partitions: list[PartitionData] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + topics: list[TopicData] + def __init__( + self, + *args: Any, + topics: list[TopicData] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class DescribeQuorumResponse(ApiMessage): + class TopicData(DataContainer): + class PartitionData(DataContainer): + class ReplicaState(DataContainer): + replica_id: int + replica_directory_id: uuid.UUID + log_end_offset: int + last_fetch_timestamp: int + last_caught_up_timestamp: int + def __init__( + self, + *args: Any, + replica_id: int = ..., + replica_directory_id: uuid.UUID = ..., + log_end_offset: int = ..., + last_fetch_timestamp: int = ..., + last_caught_up_timestamp: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + partition_index: int + error_code: int + error_message: str | None + leader_id: int + leader_epoch: int + high_watermark: int + current_voters: list[ReplicaState] + observers: list[ReplicaState] + def __init__( + self, + *args: Any, + partition_index: int = ..., + error_code: int = ..., + error_message: str | None = ..., + leader_id: int = ..., + leader_epoch: int = ..., + high_watermark: int = ..., + current_voters: list[ReplicaState] = ..., + observers: list[ReplicaState] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + topic_name: str + partitions: list[PartitionData] + def __init__( + self, + *args: Any, + topic_name: str = ..., + partitions: list[PartitionData] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + class Node(DataContainer): + class Listener(DataContainer): + name: str + host: str + port: int + def __init__( + self, + *args: Any, + name: str = ..., + host: str = ..., + port: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + node_id: int + listeners: list[Listener] + def __init__( + self, + *args: Any, + node_id: int = ..., + listeners: list[Listener] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + error_code: int + error_message: str | None + topics: list[TopicData] + nodes: list[Node] + def __init__( + self, + *args: Any, + error_code: int = ..., + error_message: str | None = ..., + topics: list[TopicData] = ..., + nodes: list[Node] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + class UpdateFeaturesRequest(ApiMessage): class FeatureUpdateKey(DataContainer): feature: str diff --git a/kafka/protocol/generate_stubs.py b/kafka/protocol/generate_stubs.py index 38c367bcb..3b4753d07 100644 --- a/kafka/protocol/generate_stubs.py +++ b/kafka/protocol/generate_stubs.py @@ -24,6 +24,7 @@ SIMPLE_TYPE_MAP = { 'int8': 'int', 'int16': 'int', + 'uint16': 'int', 'int32': 'int', 'int64': 'int', 'float64': 'float', diff --git a/kafka/protocol/schemas/fields/codecs/__init__.py b/kafka/protocol/schemas/fields/codecs/__init__.py index 68484f6a3..48bfc4843 100644 --- a/kafka/protocol/schemas/fields/codecs/__init__.py +++ b/kafka/protocol/schemas/fields/codecs/__init__.py @@ -2,11 +2,11 @@ from .tagged_fields import TaggedFields from .types import ( BitField, Boolean, UUID, Bytes, String, - Int8, Int16, Int32, Int64, UnsignedVarInt32, Float64, + Int8, Int16, Int32, Int64, UnsignedInt16, UnsignedVarInt32, Float64, ) __all__ = [ 'BitField', 'Boolean', 'UUID', 'Bytes', 'String', - 'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedVarInt32', 'Float64', + 'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedInt16', 'UnsignedVarInt32', 'Float64', 'TaggedFields', 'EncodeBuffer', ] diff --git a/kafka/protocol/schemas/fields/codecs/types.py b/kafka/protocol/schemas/fields/codecs/types.py index 1e504501f..8deca2e6f 100644 --- a/kafka/protocol/schemas/fields/codecs/types.py +++ b/kafka/protocol/schemas/fields/codecs/types.py @@ -60,6 +60,11 @@ class Int16(FixedCodec): size = 2 +class UnsignedInt16(FixedCodec): + fmt = 'H' + size = 2 + + class Int32(FixedCodec): fmt = 'i' size = 4 diff --git a/kafka/protocol/schemas/fields/simple.py b/kafka/protocol/schemas/fields/simple.py index 8d3a7416d..66e18d05b 100644 --- a/kafka/protocol/schemas/fields/simple.py +++ b/kafka/protocol/schemas/fields/simple.py @@ -3,7 +3,7 @@ from .base import BaseField from .codecs import ( BitField, Boolean, Bytes, - Float64, Int8, Int16, Int32, Int64, String, UUID + Float64, Int8, Int16, Int32, Int64, String, UnsignedInt16, UUID ) @@ -11,7 +11,7 @@ class SimpleField(BaseField): TYPES = { 'int8': Int8, 'int16': Int16, - #'uint16': UnsignedInt16, + 'uint16': UnsignedInt16, 'int32': Int32, #'uint32': UnsignedInt32, 'int64': Int64, @@ -52,7 +52,7 @@ def _calculate_default(self, default): else: raise ValueError('Invalid default for boolean field %s: %s' % (self._name, default)) return bool(default) - elif self._type in (Int8, Int16, Int32, Int64): + elif self._type in (Int8, Int16, Int32, Int64, UnsignedInt16): if not default: return 0 if isinstance(default, str): diff --git a/kafka/protocol/schemas/resources/DescribeQuorumRequest.json b/kafka/protocol/schemas/resources/DescribeQuorumRequest.json new file mode 100644 index 000000000..7b9ee5a23 --- /dev/null +++ b/kafka/protocol/schemas/resources/DescribeQuorumRequest.json @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 55, + "type": "request", + "listeners": ["broker", "controller"], + "name": "DescribeQuorumRequest", + // Version 1 adds additional fields in the response. The request is unchanged (KIP-836). + // Version 2 adds additional fields in the response. The request is unchanged (KIP-853). + "validVersions": "0-2", + "flexibleVersions": "0+", + "latestVersionUnstable": false, + "fields": [ + { "name": "Topics", "type": "[]TopicData", "versions": "0+", + "about": "The topics to describe.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", + "about": "The partitions to describe.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." } + ] + }] + } + ] +} diff --git a/kafka/protocol/schemas/resources/DescribeQuorumResponse.json b/kafka/protocol/schemas/resources/DescribeQuorumResponse.json new file mode 100644 index 000000000..b5b51d1a7 --- /dev/null +++ b/kafka/protocol/schemas/resources/DescribeQuorumResponse.json @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 55, + "type": "response", + "name": "DescribeQuorumResponse", + // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836). + // Version 2 adds ErrorMessage, Nodes, ErrorMessage in PartitionData, ReplicaDirectoryId in ReplicaState (KIP-853). + "validVersions": "0-2", + "flexibleVersions": "0+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level error code."}, + { "name": "ErrorMessage", "type": "string", "versions": "2+", "nullableVersions": "2+", "ignorable": true, + "about": "The error message, or null if there was no error." }, + { "name": "Topics", "type": "[]TopicData", "versions": "0+", + "about": "The response from the describe quorum API.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", + "about": "The partition data.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error code."}, + { "name": "ErrorMessage", "type": "string", "versions": "2+", "nullableVersions": "2+", "ignorable": true, + "about": "The error message, or null if there was no error." }, + { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the current leader or -1 if the leader is unknown."}, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", + "about": "The latest known leader epoch."}, + { "name": "HighWatermark", "type": "int64", "versions": "0+", + "about": "The high water mark."}, + { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+", + "about": "The current voters of the partition."}, + { "name": "Observers", "type": "[]ReplicaState", "versions": "0+", + "about": "The observers of the partition."} + ]} + ]}, + { "name": "Nodes", "type": "[]Node", "versions": "2+", + "about": "The nodes in the quorum.", "fields": [ + { "name": "NodeId", "type": "int32", "versions": "2+", + "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, + { "name": "Listeners", "type": "[]Listener", + "about": "The listeners of this controller.", "versions": "2+", "fields": [ + { "name": "Name", "type": "string", "versions": "2+", "mapKey": true, + "about": "The name of the endpoint." }, + { "name": "Host", "type": "string", "versions": "2+", + "about": "The hostname." }, + { "name": "Port", "type": "uint16", "versions": "2+", + "about": "The port." } + ]} + ]} + ], + "commonStructs": [ + { "name": "ReplicaState", "versions": "0+", "fields": [ + { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the replica."}, + { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "2+", + "about": "The replica directory ID of the replica."}, + { "name": "LogEndOffset", "type": "int64", "versions": "0+", + "about": "The last known log end offset of the follower or -1 if it is unknown."}, + { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, + "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter."}, + { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, + "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter."} + ]} + ] +} diff --git a/test/admin/test_admin_cluster_quorum.py b/test/admin/test_admin_cluster_quorum.py new file mode 100644 index 000000000..d9c113cde --- /dev/null +++ b/test/admin/test_admin_cluster_quorum.py @@ -0,0 +1,121 @@ +import pytest + +import kafka.errors as Errors +from kafka.admin import KafkaAdminClient +from kafka.protocol.admin import DescribeQuorumRequest, DescribeQuorumResponse + +from test.mock_broker import MockBroker + + +@pytest.fixture +def broker(request): + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + request_timeout_ms=5000, + ) + try: + yield admin + finally: + admin.close() + + +def _quorum_response( + *, + error_code=0, + error_message=None, + leader_id=1, + leader_epoch=5, + high_watermark=100, + voter_ids=(1, 2, 3), + observer_ids=(), + nodes=(), + partition_error_code=0, + partition_error_message=None, +): + Topic = DescribeQuorumResponse.TopicData + Part = Topic.PartitionData + State = Part.ReplicaState + voters = [State(replica_id=rid, + replica_directory_id='00000000-0000-0000-0000-000000000000', + log_end_offset=high_watermark, + last_fetch_timestamp=-1, + last_caught_up_timestamp=-1) + for rid in voter_ids] + observers = [State(replica_id=rid, + replica_directory_id='00000000-0000-0000-0000-000000000000', + log_end_offset=high_watermark, + last_fetch_timestamp=-1, + last_caught_up_timestamp=-1) + for rid in observer_ids] + return DescribeQuorumResponse( + error_code=error_code, + error_message=error_message, + topics=[Topic(topic_name='__cluster_metadata', partitions=[ + Part(partition_index=0, + error_code=partition_error_code, + error_message=partition_error_message, + leader_id=leader_id, leader_epoch=leader_epoch, + high_watermark=high_watermark, + current_voters=voters, observers=observers), + ])], + nodes=list(nodes), + ) + + +class TestDescribeMetadataQuorumMockBroker: + + def test_returns_quorum_state(self, broker, admin): + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = DescribeQuorumRequest.decode( + request_bytes, version=api_version, header=True) + return _quorum_response(leader_id=2, leader_epoch=7, + high_watermark=42, + voter_ids=(1, 2, 3), observer_ids=(4,)) + + broker.respond_fn(DescribeQuorumRequest, handler) + + result = admin.describe_metadata_quorum() + + req = captured['request'] + assert len(req.topics) == 1 + assert req.topics[0].topic_name == '__cluster_metadata' + assert [p.partition_index for p in req.topics[0].partitions] == [0] + + assert 'error_code' not in result + assert 'error_message' not in result + partition = result['topics'][0]['partitions'][0] + assert partition['error'] is None + assert partition['leader_id'] == 2 + assert partition['leader_epoch'] == 7 + assert partition['high_watermark'] == 42 + assert [v['replica_id'] for v in partition['current_voters']] == [1, 2, 3] + assert [o['replica_id'] for o in partition['observers']] == [4] + + def test_top_level_error_raises(self, broker, admin): + broker.respond( + DescribeQuorumRequest, + _quorum_response( + error_code=Errors.ClusterAuthorizationFailedError.errno, + error_message='nope')) + + with pytest.raises(Errors.ClusterAuthorizationFailedError): + admin.describe_metadata_quorum() + + def test_partition_level_error_returned(self, broker, admin): + broker.respond( + DescribeQuorumRequest, + _quorum_response( + partition_error_code=Errors.ClusterAuthorizationFailedError.errno, + partition_error_message='nope')) + + result = admin.describe_metadata_quorum() + assert result['topics'][0]['partitions'][0]['error'] == str(Errors.ClusterAuthorizationFailedError('nope')) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index ac56a482f..d5bf7afd2 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -507,6 +507,26 @@ def test_alter_replica_log_dirs(kafka_admin_client, topic): assert result[tpr] is NoError +@pytest.mark.skipif(env_kafka_version() < (4, 0), reason="DescribeQuorum requires KRaft (broker >=4.0)") +def test_describe_metadata_quorum(kafka_admin_client): + result = kafka_admin_client.describe_metadata_quorum() + assert 'error_code' not in result + assert 'error_message' not in result + assert len(result['topics']) == 1 + t = result['topics'][0] + assert t['topic_name'] == '__cluster_metadata' + assert len(t['partitions']) == 1 + p = t['partitions'][0] + assert p['partition_index'] == 0 + assert 'error_code' not in p + assert 'error_message' not in p + assert p['error'] is None + assert p['leader_id'] >= 0 + assert p['leader_epoch'] >= 0 + assert p['high_watermark'] >= 0 + assert len(p['current_voters']) >= 1 + + @pytest.mark.skipif(env_kafka_version() < (2, 4), reason="AlterPartitionReassignments requires broker >=2.4") def test_alter_partition_reassignments(kafka_admin_client, topic): topic_metadata = kafka_admin_client.describe_topics([topic])[0] diff --git a/test/protocol/schemas/test_codec_types.py b/test/protocol/schemas/test_codec_types.py index b5fecf601..4332e4860 100644 --- a/test/protocol/schemas/test_codec_types.py +++ b/test/protocol/schemas/test_codec_types.py @@ -6,7 +6,7 @@ from kafka.protocol.schemas.fields.codecs.types import ( Int8, Int16, Int32, Int64, Float64, Boolean, UUID, - UnsignedVarInt32, VarInt32, VarInt64, + UnsignedInt16, UnsignedVarInt32, VarInt32, VarInt64, String, Bytes, BitField, ) @@ -18,6 +18,9 @@ (Int16, 0, b'\x00\x00'), (Int16, 32767, b'\x7f\xff'), (Int16, -32768, b'\x80\x00'), + (UnsignedInt16, 0, b'\x00\x00'), + (UnsignedInt16, 32768, b'\x80\x00'), + (UnsignedInt16, 65535, b'\xff\xff'), (Int32, 0, b'\x00\x00\x00\x00'), (Int32, 2147483647, b'\x7f\xff\xff\xff'), (Int32, -2147483648, b'\x80\x00\x00\x00'), @@ -89,6 +92,12 @@ def test_error_handling(): with pytest.raises(struct.error): Int8.decode(io.BytesIO(b'')) # Too short + with pytest.raises(struct.error): + UnsignedInt16.encode(-1) # Negative not allowed + + with pytest.raises(struct.error): + UnsignedInt16.encode(65536) # Too large + s = String() with pytest.raises(ValueError): s.decode(io.BytesIO(b'\x00\x05foo')) # length 5 but only 3 bytes