diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index d78e168a7..2443154ad 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -5,6 +5,7 @@ import logging from typing import TYPE_CHECKING +from kafka.protocol.api_key import ApiKey from kafka.protocol.metadata import MetadataRequest from kafka.protocol.admin import DescribeLogDirsRequest @@ -74,3 +75,15 @@ def describe_log_dirs(self, topic_partitions=None, brokers=None): """ topic_partitions = self._get_topic_partitions(topic_partitions) return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) + + async def _async_get_broker_version_data(self, broker_id): + conn = await self._manager.get_connection(broker_id) + return conn.broker_version_data + + def get_broker_version_data(self, broker_id): + """Return BrokerVersionData for a specific broker""" + return self._manager.run(self._async_get_broker_version_data, broker_id) + + def api_versions(self): + api_versions = self._manager.broker_version_data.api_versions + return {ApiKey(k): v for k, v in api_versions.items()} diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 94325a04b..7151ceae6 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -112,28 +112,24 @@ def run_cli(args=None): # --dry-run support # --trace ? - # [client-quotas] - # describe (DescribeClientQuotas - not supported yet) - # alter (AlterClientQuotas - not supported yet) - # [producers] - # describe (DescribeProducers - not supported yet) + # describe (DescribeProducers) # [transactions] - # describe (DescribeTransactions - not supported yet) - # list (ListTransactions - not supported yet) - # abort (not supported yet) + # describe (DescribeTransactions) + # list (ListTransactions) + # abort (EndTxn) # [cluster] - # describe-features (DescribeFeatures - not supported yet) - # update-features (UpdateFeatures - not supported yet) - # version - # api-versions - # alter-log-dirs (AlterReplicaLogDirs - not supported yet) - # DescribeQuorum (not supported yet) - # UnregisterBroker - # AddRaftVoter - # RemoveRaftVoter + # alter-log-dirs (AlterReplicaLogDirs) + # describe-features (ApiVersions) + # update-features (UpdateFeatures) + # describe-quorum (DescribeQuorum) + # unregister-broker (UnregisterBroker) + # add-raft-voter (AddRaftVoter) + # remove-raft-voter (RemoveRaftVoter) + # describe-quotas (DescribeClientQuotas) + # alter-quotas (AlterClientQuotas) # [tokens] *DelegationTokenRequest # create diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index 5baec8b44..085a617ce 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -1,5 +1,7 @@ import sys +from .api_versions import GetApiVersions +from .broker_version import GetBrokerVersion from .describe import DescribeCluster from .log_dirs import DescribeLogDirs @@ -10,6 +12,6 @@ class ClusterSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') commands = parser.add_subparsers() - for cmd in [DescribeCluster, DescribeLogDirs]: + for cmd in [DescribeCluster, GetApiVersions, GetBrokerVersion, DescribeLogDirs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/cluster/api_versions.py b/kafka/cli/admin/cluster/api_versions.py new file mode 100644 index 000000000..0d2ded02d --- /dev/null +++ b/kafka/cli/admin/cluster/api_versions.py @@ -0,0 +1,18 @@ +from kafka.protocol.api_key import ApiKey + + +class GetApiVersions: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('api-versions', help='Get Supported Api Versions') + parser.add_argument('-k', '--api-key', type=str, action='append', dest='api_keys', default=None) + parser.add_argument('--raw', action='store_true') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + api_keys = set(ApiKey[k] for k in args.api_keys) if args.api_keys else set(ApiKey) + api_versions = client.api_versions() + return {(k.value if args.raw else k.name): v for k, v in api_versions.items() + if k in api_keys} diff --git a/kafka/cli/admin/cluster/broker_version.py b/kafka/cli/admin/cluster/broker_version.py new file mode 100644 index 000000000..bb9d19169 --- /dev/null +++ b/kafka/cli/admin/cluster/broker_version.py @@ -0,0 +1,13 @@ +class GetBrokerVersion: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('broker-version', help='Get Version for Broker') + parser.add_argument('-b', '--broker', required=True) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + broker_id = int(args.broker) + bvd = client.get_broker_version_data(broker_id) + return {broker_id: '.'.join(map(str, bvd.broker_version))} diff --git a/kafka/net/connection.py b/kafka/net/connection.py index d3a4534b2..f7333002d 100644 --- a/kafka/net/connection.py +++ b/kafka/net/connection.py @@ -89,6 +89,10 @@ def __str__(self): def init_future(self): return self._init_future + def __await__(self): + yield self.init_future + return self + @property def close_future(self): return self._close_future diff --git a/kafka/protocol/api_key.py b/kafka/protocol/api_key.py new file mode 100644 index 000000000..58eab3f9f --- /dev/null +++ b/kafka/protocol/api_key.py @@ -0,0 +1,97 @@ +from enum import IntEnum + + +class ApiKey(IntEnum): + Produce = 0 + Fetch = 1 + ListOffsets = 2 + Metadata = 3 + LeaderAndIsr = 4 + StopReplica = 5 + UpdateMetadata = 6 + ControlledShutdown = 7 + OffsetCommit = 8 + OffsetFetch = 9 + FindCoordinator = 10 + JoinGroup = 11 + Heartbeat = 12 + LeaveGroup = 13 + SyncGroup = 14 + DescribeGroups = 15 + ListGroups = 16 + SaslHandshake = 17 + ApiVersions = 18 + CreateTopics = 19 + DeleteTopics = 20 + DeleteRecords = 21 + InitProducerId = 22 + OffsetForLeaderEpoch = 23 + AddPartitionsToTxn = 24 + AddOffsetsToTxn = 25 + EndTxn = 26 + WriteTxnMarkers = 27 + TxnOffsetCommit = 28 + DescribeAcls = 29 + CreateAcls = 30 + DeleteAcls = 31 + DescribeConfigs = 32 + AlterConfigs = 33 + AlterReplicaLogDirs = 34 + DescribeLogDirs = 35 + SaslAuthenticate = 36 + CreatePartitions = 37 + CreateDelegationToken = 38 + RenewDelegationToken = 39 + ExpireDelegationToken = 40 + DescribeDelegationToken = 41 + DeleteGroups = 42 + ElectLeaders = 43 + IncrementalAlterConfigs = 44 + AlterPartitionReassignments = 45 + ListPartitionReassignments = 46 + OffsetDelete = 47 + DescribeClientQuotas = 48 + AlterClientQuotas = 49 + DescribeUserScramCredentials = 50 + AlterUserScramCredentials = 51 + Vote = 52 + BeginQuorumEpoch = 53 + EndQuorumEpoch = 54 + DescribeQuorum = 55 + AlterPartition = 56 + UpdateFeatures = 57 + Envelope = 58 + FetchSnapshot = 59 + DescribeCluster = 60 + DescribeProducers = 61 + BrokerRegistration = 62 + BrokerHeartbeat = 63 + UnregisterBroker = 64 + DescribeTransactions = 65 + ListTransactions = 66 + AllocateProducerIds = 67 + ConsumerGroupHeartbeat = 68 + ConsumerGroupDescribe = 69 + ControllerRegistration = 70 + GetTelemetrySubscriptions = 71 + PushTelemetry = 72 + AssignReplicasToDirs = 73 + ListConfigResources = 74 + DescribeTopicPartitions = 75 + ShareGroupHeartbeat = 76 + ShareGroupDescribe = 77 + ShareFetch = 78 + ShareAcknowledge = 79 + AddRaftVoter = 80 + RemoveRaftVoter = 81 + UpdateRaftVoter = 82 + InitializeShareGroupState = 83 + ReadShareGroupState = 84 + WriteShareGroupState = 85 + DeleteShareGroupState = 86 + ReadShareGroupStateSummary = 87 + StreamsGroupHeartbeat = 88 + StreamsGroupDescribe = 89 + DescribeShareGroupOffsets = 90 + AlterShareGroupOffsets = 91 + DeleteShareGroupOffsets = 92 diff --git a/kafka/protocol/broker_version_data.py b/kafka/protocol/broker_version_data.py index b1b863acd..e8abe1e00 100644 --- a/kafka/protocol/broker_version_data.py +++ b/kafka/protocol/broker_version_data.py @@ -4,10 +4,7 @@ import functools import kafka.errors as Errors -from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest -from kafka.protocol.consumer import OffsetFetchRequest, FetchRequest, ListOffsetsRequest, JoinGroupRequest -from kafka.protocol.metadata import FindCoordinatorRequest, MetadataRequest -from kafka.protocol.producer import ProduceRequest, AddPartitionsToTxnRequest +from .api_key import ApiKey log = logging.getLogger('kafka.protocol') @@ -79,7 +76,7 @@ def api_version(self, operation, min_version=0, max_version=float('inf')): """ assert min_version <= max_version # if _max_version is a data descriptor, operation is a protocol class so no request min/max - if inspect.isdatadescriptor(operation._max_version): + if isinstance(operation, ApiKey) or inspect.isdatadescriptor(operation._max_version): request_max = float('inf') request_min = 0 else: @@ -88,7 +85,7 @@ def api_version(self, operation, min_version=0, max_version=float('inf')): max_version = min(max_version, operation.max_version, request_max) min_version = max(min_version, operation.min_version, request_min) broker_api_versions = self.api_versions - api_key = operation.API_KEY + api_key = operation.value if isinstance(operation, ApiKey) else operation.API_KEY if broker_api_versions is None or api_key not in broker_api_versions: raise Errors.IncompatibleBrokerVersion( f"Kafka broker does not support the '{operation.name}' Kafka protocol.") @@ -116,38 +113,38 @@ def infer_broker_version_from_api_versions(api_versions): test_cases = [ # format (, ) # Make sure to update consumer_integration test check when adding newer versions. - ((4, 2), ListOffsetsRequest.API_KEY, 11), - ((4, 1), ProduceRequest.API_KEY, 13), - ((4, 0), ListOffsetsRequest.API_KEY, 10), - ((3, 9), FetchRequest.API_KEY, 17), - ((3, 8), ProduceRequest.API_KEY, 11), - ((3, 7), FetchRequest.API_KEY, 16), - ((3, 6), AddPartitionsToTxnRequest.API_KEY, 4), - ((3, 5), FetchRequest.API_KEY, 15), - ((3, 4), 4, 7), # StopReplicaRequest[3]), # broker-internal api... - ((3, 3), DescribeAclsRequest.API_KEY, 3), - ((3, 2), JoinGroupRequest.API_KEY, 9), - ((3, 1), FetchRequest.API_KEY, 13), - ((3, 0), ListOffsetsRequest.API_KEY, 7), - ((2, 8), ProduceRequest.API_KEY, 9), - ((2, 7), FetchRequest.API_KEY, 12), - ((2, 6), DescribeClientQuotasRequest.API_KEY, 0), - ((2, 5), DescribeAclsRequest.API_KEY, 2), - ((2, 4), ProduceRequest.API_KEY, 8), - ((2, 3), FetchRequest.API_KEY, 11), - ((2, 2), ListOffsetsRequest.API_KEY, 5), - ((2, 1), FetchRequest.API_KEY, 10), - ((2, 0), FetchRequest.API_KEY, 8), - ((1, 1), FetchRequest.API_KEY, 7), - ((1, 0), MetadataRequest.API_KEY, 5), - ((0, 11), MetadataRequest.API_KEY, 4), - ((0, 10, 2), OffsetFetchRequest.API_KEY, 2), - ((0, 10, 1), MetadataRequest.API_KEY, 2), + ((4, 2), ApiKey.ListOffsets, 11), + ((4, 1), ApiKey.Produce, 13), + ((4, 0), ApiKey.ListOffsets, 10), + ((3, 9), ApiKey.Fetch, 17), + ((3, 8), ApiKey.Produce, 11), + ((3, 7), ApiKey.Fetch, 16), + ((3, 6), ApiKey.AddPartitionsToTxn, 4), + ((3, 5), ApiKey.Fetch, 15), + ((3, 4), ApiKey.LeaderAndIsr, 7), # broker-internal api... + ((3, 3), ApiKey.DescribeAcls, 3), + ((3, 2), ApiKey.JoinGroup, 9), + ((3, 1), ApiKey.Fetch, 13), + ((3, 0), ApiKey.ListOffsets, 7), + ((2, 8), ApiKey.Produce, 9), + ((2, 7), ApiKey.Fetch, 12), + ((2, 6), ApiKey.DescribeClientQuotas, 0), + ((2, 5), ApiKey.DescribeAcls, 2), + ((2, 4), ApiKey.Produce, 8), + ((2, 3), ApiKey.Fetch, 11), + ((2, 2), ApiKey.ListOffsets, 5), + ((2, 1), ApiKey.Fetch, 10), + ((2, 0), ApiKey.Fetch, 8), + ((1, 1), ApiKey.Fetch, 7), + ((1, 0), ApiKey.Metadata, 5), + ((0, 11), ApiKey.Metadata, 4), + ((0, 10, 2), ApiKey.OffsetFetch, 2), + ((0, 10, 1), ApiKey.Metadata, 2), ] # Get the best match of test cases for broker_version, api_key, version in sorted(test_cases, reverse=True): - if api_key not in api_versions: + if api_key.value not in api_versions: continue min_version, max_version = api_versions[api_key] if min_version <= version <= max_version: @@ -159,6 +156,9 @@ def infer_broker_version_from_api_versions(api_versions): # Fallback version checks for brokers that do not support ApiVersionsCheck +from kafka.protocol.admin import ListGroupsRequest +from kafka.protocol.consumer import OffsetFetchRequest +from kafka.protocol.metadata import FindCoordinatorRequest, MetadataRequest VERSION_CHECKS = ( ((0, 9), ListGroupsRequest[0]()), ((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),