Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from typing import TYPE_CHECKING

from kafka.protocol.api_key import ApiKey
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import DescribeLogDirsRequest

Expand Down Expand Up @@ -74,3 +75,15 @@ def describe_log_dirs(self, topic_partitions=None, brokers=None):
"""
topic_partitions = self._get_topic_partitions(topic_partitions)
return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers)

async def _async_get_broker_version_data(self, broker_id):
conn = await self._manager.get_connection(broker_id)
return conn.broker_version_data

def get_broker_version_data(self, broker_id):
"""Return BrokerVersionData for a specific broker"""
return self._manager.run(self._async_get_broker_version_data, broker_id)

def api_versions(self):
api_versions = self._manager.broker_version_data.api_versions
return {ApiKey(k): v for k, v in api_versions.items()}
30 changes: 13 additions & 17 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,24 @@ def run_cli(args=None):
# --dry-run support
# --trace ?

# [client-quotas]
# describe (DescribeClientQuotas - not supported yet)
# alter (AlterClientQuotas - not supported yet)

# [producers]
# describe (DescribeProducers - not supported yet)
# describe (DescribeProducers)

# [transactions]
# describe (DescribeTransactions - not supported yet)
# list (ListTransactions - not supported yet)
# abort (not supported yet)
# describe (DescribeTransactions)
# list (ListTransactions)
# abort (EndTxn)

# [cluster]
# describe-features (DescribeFeatures - not supported yet)
# update-features (UpdateFeatures - not supported yet)
# version
# api-versions
# alter-log-dirs (AlterReplicaLogDirs - not supported yet)
# DescribeQuorum (not supported yet)
# UnregisterBroker
# AddRaftVoter
# RemoveRaftVoter
# alter-log-dirs (AlterReplicaLogDirs)
# describe-features (ApiVersions)
# update-features (UpdateFeatures)
# describe-quorum (DescribeQuorum)
# unregister-broker (UnregisterBroker)
# add-raft-voter (AddRaftVoter)
# remove-raft-voter (RemoveRaftVoter)
# describe-quotas (DescribeClientQuotas)
# alter-quotas (AlterClientQuotas)

# [tokens] *DelegationTokenRequest
# create
Expand Down
4 changes: 3 additions & 1 deletion kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys

from .api_versions import GetApiVersions
from .broker_version import GetBrokerVersion
from .describe import DescribeCluster
from .log_dirs import DescribeLogDirs

Expand All @@ -10,6 +12,6 @@ class ClusterSubCommand:
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster')
commands = parser.add_subparsers()
for cmd in [DescribeCluster, DescribeLogDirs]:
for cmd in [DescribeCluster, GetApiVersions, GetBrokerVersion, DescribeLogDirs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
18 changes: 18 additions & 0 deletions kafka/cli/admin/cluster/api_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from kafka.protocol.api_key import ApiKey


class GetApiVersions:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('api-versions', help='Get Supported Api Versions')
parser.add_argument('-k', '--api-key', type=str, action='append', dest='api_keys', default=None)
parser.add_argument('--raw', action='store_true')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
api_keys = set(ApiKey[k] for k in args.api_keys) if args.api_keys else set(ApiKey)
api_versions = client.api_versions()
return {(k.value if args.raw else k.name): v for k, v in api_versions.items()
if k in api_keys}
13 changes: 13 additions & 0 deletions kafka/cli/admin/cluster/broker_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
class GetBrokerVersion:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('broker-version', help='Get Version for Broker')
parser.add_argument('-b', '--broker', required=True)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
broker_id = int(args.broker)
bvd = client.get_broker_version_data(broker_id)
return {broker_id: '.'.join(map(str, bvd.broker_version))}
4 changes: 4 additions & 0 deletions kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def __str__(self):
def init_future(self):
return self._init_future

def __await__(self):
yield self.init_future
return self

@property
def close_future(self):
return self._close_future
Expand Down
97 changes: 97 additions & 0 deletions kafka/protocol/api_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from enum import IntEnum


class ApiKey(IntEnum):
Produce = 0
Fetch = 1
ListOffsets = 2
Metadata = 3
LeaderAndIsr = 4
StopReplica = 5
UpdateMetadata = 6
ControlledShutdown = 7
OffsetCommit = 8
OffsetFetch = 9
FindCoordinator = 10
JoinGroup = 11
Heartbeat = 12
LeaveGroup = 13
SyncGroup = 14
DescribeGroups = 15
ListGroups = 16
SaslHandshake = 17
ApiVersions = 18
CreateTopics = 19
DeleteTopics = 20
DeleteRecords = 21
InitProducerId = 22
OffsetForLeaderEpoch = 23
AddPartitionsToTxn = 24
AddOffsetsToTxn = 25
EndTxn = 26
WriteTxnMarkers = 27
TxnOffsetCommit = 28
DescribeAcls = 29
CreateAcls = 30
DeleteAcls = 31
DescribeConfigs = 32
AlterConfigs = 33
AlterReplicaLogDirs = 34
DescribeLogDirs = 35
SaslAuthenticate = 36
CreatePartitions = 37
CreateDelegationToken = 38
RenewDelegationToken = 39
ExpireDelegationToken = 40
DescribeDelegationToken = 41
DeleteGroups = 42
ElectLeaders = 43
IncrementalAlterConfigs = 44
AlterPartitionReassignments = 45
ListPartitionReassignments = 46
OffsetDelete = 47
DescribeClientQuotas = 48
AlterClientQuotas = 49
DescribeUserScramCredentials = 50
AlterUserScramCredentials = 51
Vote = 52
BeginQuorumEpoch = 53
EndQuorumEpoch = 54
DescribeQuorum = 55
AlterPartition = 56
UpdateFeatures = 57
Envelope = 58
FetchSnapshot = 59
DescribeCluster = 60
DescribeProducers = 61
BrokerRegistration = 62
BrokerHeartbeat = 63
UnregisterBroker = 64
DescribeTransactions = 65
ListTransactions = 66
AllocateProducerIds = 67
ConsumerGroupHeartbeat = 68
ConsumerGroupDescribe = 69
ControllerRegistration = 70
GetTelemetrySubscriptions = 71
PushTelemetry = 72
AssignReplicasToDirs = 73
ListConfigResources = 74
DescribeTopicPartitions = 75
ShareGroupHeartbeat = 76
ShareGroupDescribe = 77
ShareFetch = 78
ShareAcknowledge = 79
AddRaftVoter = 80
RemoveRaftVoter = 81
UpdateRaftVoter = 82
InitializeShareGroupState = 83
ReadShareGroupState = 84
WriteShareGroupState = 85
DeleteShareGroupState = 86
ReadShareGroupStateSummary = 87
StreamsGroupHeartbeat = 88
StreamsGroupDescribe = 89
DescribeShareGroupOffsets = 90
AlterShareGroupOffsets = 91
DeleteShareGroupOffsets = 92
68 changes: 34 additions & 34 deletions kafka/protocol/broker_version_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import functools

import kafka.errors as Errors
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
from kafka.protocol.consumer import OffsetFetchRequest, FetchRequest, ListOffsetsRequest, JoinGroupRequest
from kafka.protocol.metadata import FindCoordinatorRequest, MetadataRequest
from kafka.protocol.producer import ProduceRequest, AddPartitionsToTxnRequest
from .api_key import ApiKey

log = logging.getLogger('kafka.protocol')

Expand Down Expand Up @@ -79,7 +76,7 @@ def api_version(self, operation, min_version=0, max_version=float('inf')):
"""
assert min_version <= max_version
# if _max_version is a data descriptor, operation is a protocol class so no request min/max
if inspect.isdatadescriptor(operation._max_version):
if isinstance(operation, ApiKey) or inspect.isdatadescriptor(operation._max_version):
request_max = float('inf')
request_min = 0
else:
Expand All @@ -88,7 +85,7 @@ def api_version(self, operation, min_version=0, max_version=float('inf')):
max_version = min(max_version, operation.max_version, request_max)
min_version = max(min_version, operation.min_version, request_min)
broker_api_versions = self.api_versions
api_key = operation.API_KEY
api_key = operation.value if isinstance(operation, ApiKey) else operation.API_KEY
if broker_api_versions is None or api_key not in broker_api_versions:
raise Errors.IncompatibleBrokerVersion(
f"Kafka broker does not support the '{operation.name}' Kafka protocol.")
Expand Down Expand Up @@ -116,38 +113,38 @@ def infer_broker_version_from_api_versions(api_versions):
test_cases = [
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
((4, 2), ListOffsetsRequest.API_KEY, 11),
((4, 1), ProduceRequest.API_KEY, 13),
((4, 0), ListOffsetsRequest.API_KEY, 10),
((3, 9), FetchRequest.API_KEY, 17),
((3, 8), ProduceRequest.API_KEY, 11),
((3, 7), FetchRequest.API_KEY, 16),
((3, 6), AddPartitionsToTxnRequest.API_KEY, 4),
((3, 5), FetchRequest.API_KEY, 15),
((3, 4), 4, 7), # StopReplicaRequest[3]), # broker-internal api...
((3, 3), DescribeAclsRequest.API_KEY, 3),
((3, 2), JoinGroupRequest.API_KEY, 9),
((3, 1), FetchRequest.API_KEY, 13),
((3, 0), ListOffsetsRequest.API_KEY, 7),
((2, 8), ProduceRequest.API_KEY, 9),
((2, 7), FetchRequest.API_KEY, 12),
((2, 6), DescribeClientQuotasRequest.API_KEY, 0),
((2, 5), DescribeAclsRequest.API_KEY, 2),
((2, 4), ProduceRequest.API_KEY, 8),
((2, 3), FetchRequest.API_KEY, 11),
((2, 2), ListOffsetsRequest.API_KEY, 5),
((2, 1), FetchRequest.API_KEY, 10),
((2, 0), FetchRequest.API_KEY, 8),
((1, 1), FetchRequest.API_KEY, 7),
((1, 0), MetadataRequest.API_KEY, 5),
((0, 11), MetadataRequest.API_KEY, 4),
((0, 10, 2), OffsetFetchRequest.API_KEY, 2),
((0, 10, 1), MetadataRequest.API_KEY, 2),
((4, 2), ApiKey.ListOffsets, 11),
((4, 1), ApiKey.Produce, 13),
((4, 0), ApiKey.ListOffsets, 10),
((3, 9), ApiKey.Fetch, 17),
((3, 8), ApiKey.Produce, 11),
((3, 7), ApiKey.Fetch, 16),
((3, 6), ApiKey.AddPartitionsToTxn, 4),
((3, 5), ApiKey.Fetch, 15),
((3, 4), ApiKey.LeaderAndIsr, 7), # broker-internal api...
((3, 3), ApiKey.DescribeAcls, 3),
((3, 2), ApiKey.JoinGroup, 9),
((3, 1), ApiKey.Fetch, 13),
((3, 0), ApiKey.ListOffsets, 7),
((2, 8), ApiKey.Produce, 9),
((2, 7), ApiKey.Fetch, 12),
((2, 6), ApiKey.DescribeClientQuotas, 0),
((2, 5), ApiKey.DescribeAcls, 2),
((2, 4), ApiKey.Produce, 8),
((2, 3), ApiKey.Fetch, 11),
((2, 2), ApiKey.ListOffsets, 5),
((2, 1), ApiKey.Fetch, 10),
((2, 0), ApiKey.Fetch, 8),
((1, 1), ApiKey.Fetch, 7),
((1, 0), ApiKey.Metadata, 5),
((0, 11), ApiKey.Metadata, 4),
((0, 10, 2), ApiKey.OffsetFetch, 2),
((0, 10, 1), ApiKey.Metadata, 2),
]

# Get the best match of test cases
for broker_version, api_key, version in sorted(test_cases, reverse=True):
if api_key not in api_versions:
if api_key.value not in api_versions:
continue
min_version, max_version = api_versions[api_key]
if min_version <= version <= max_version:
Expand All @@ -159,6 +156,9 @@ def infer_broker_version_from_api_versions(api_versions):


# Fallback version checks for brokers that do not support ApiVersionsCheck
from kafka.protocol.admin import ListGroupsRequest
from kafka.protocol.consumer import OffsetFetchRequest
from kafka.protocol.metadata import FindCoordinatorRequest, MetadataRequest
VERSION_CHECKS = (
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
Expand Down
Loading