Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.producer import KafkaProducer
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.structs import TopicPartition, TopicPartitionReplica, OffsetAndMetadata
from kafka.protocol.consumer import IsolationLevel, OffsetSpec


__all__ = [
'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer',
'ConsumerRebalanceListener', 'Serializer', 'Deserializer',
'TopicPartition', 'OffsetAndMetadata', 'IsolationLevel', 'OffsetSpec',
'TopicPartition', 'TopicPartitionReplica', 'OffsetAndMetadata',
'IsolationLevel', 'OffsetSpec',
]
64 changes: 63 additions & 1 deletion kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
import kafka.errors as Errors
from kafka.protocol.api_key import ApiKey
from kafka.protocol.metadata import ApiVersionsRequest, MetadataRequest
from kafka.protocol.admin import DescribeLogDirsRequest, UpdateFeaturesRequest
from kafka.protocol.admin import (
AlterReplicaLogDirsRequest,
DescribeLogDirsRequest,
UpdateFeaturesRequest,
)
from kafka.structs import TopicPartitionReplica
from kafka.util import EnumHelper

if TYPE_CHECKING:
Expand Down Expand Up @@ -80,6 +85,63 @@ def describe_log_dirs(self, topic_partitions=None, brokers=None):
topic_partitions = self._get_topic_partitions(topic_partitions)
return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers)

@staticmethod
def _alter_replica_log_dirs_requests(replica_assignments):
_Dir = AlterReplicaLogDirsRequest.AlterReplicaLogDir
_Topic = _Dir.AlterReplicaLogDirTopic
broker_to_dirs = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
for tpr, log_dir in replica_assignments.items():
if not isinstance(tpr, TopicPartitionReplica):
tpr = TopicPartitionReplica(*tpr)
broker_to_dirs[tpr.broker_id][log_dir][tpr.topic].append(tpr.partition)
return {
broker_id: AlterReplicaLogDirsRequest(dirs=[
_Dir(path=path, topics=[
_Topic(name=topic, partitions=parts)
for topic, parts in topics.items()
])
for path, topics in dirs.items()
])
for broker_id, dirs in broker_to_dirs.items()
}

async def _async_alter_replica_log_dirs(self, replica_assignments):
if not replica_assignments:
return {}
broker_requests = self._alter_replica_log_dirs_requests(replica_assignments)
result = {}
for broker_id, request in broker_requests.items():
response = await self._manager.send(request, node_id=broker_id)
for topic in response.results:
for partition in topic.partitions:
tpr = TopicPartitionReplica(
topic=topic.topic_name,
partition=partition.partition_index,
broker_id=broker_id)
result[tpr] = Errors.for_code(partition.error_code)
return result

def alter_replica_log_dirs(self, replica_assignments):
"""Move replicas between log directories on their hosting brokers.

Each entry instructs the targeted broker to move (or place) the
replica for a given partition into the specified absolute log
directory path. Requests are sent to each broker in parallel; a
broker will only act on replicas it currently hosts.

Arguments:
replica_assignments: A dict mapping
:class:`~kafka.TopicPartitionReplica` (``topic``,
``partition``, ``broker_id``) to the destination log
directory path (absolute string). Tuples of
``(topic, partition, broker_id)`` are also accepted.

Returns:
dict mapping :class:`~kafka.TopicPartitionReplica` to the
corresponding error class (``kafka.errors.NoError`` on success).
"""
return self._manager.run(self._async_alter_replica_log_dirs, replica_assignments)

async def _async_get_broker_version_data(self, broker_id):
conn = await self._manager.get_connection(broker_id)
return conn.broker_version_data
Expand Down
1 change: 0 additions & 1 deletion kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def run_cli(args=None):
# abort (EndTxn)

# [cluster]
# alter-log-dirs (AlterReplicaLogDirs)
# describe-quorum (DescribeQuorum)
# unregister-broker (UnregisterBroker)
# add-raft-voter (AddRaftVoter)
Expand Down
8 changes: 5 additions & 3 deletions kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from .api_versions import GetApiVersions
from .broker_version import GetBrokerVersion
from .describe import DescribeCluster
from .log_dirs import DescribeLogDirs
from .log_dirs import DescribeLogDirs, AlterLogDirs
from .features import DescribeFeatures, UpdateFeatures


class ClusterCommandGroup:
GROUP = 'cluster'
HELP = 'Manage Kafka Cluster'
COMMANDS = [DescribeCluster, DescribeFeatures, UpdateFeatures,
GetApiVersions, GetBrokerVersion, DescribeLogDirs]
COMMANDS = [DescribeCluster,
GetApiVersions, GetBrokerVersion,
DescribeFeatures, UpdateFeatures,
DescribeLogDirs, AlterLogDirs]
33 changes: 32 additions & 1 deletion kafka/cli/admin/cluster/log_dirs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from kafka.structs import TopicPartitionReplica


class DescribeLogDirs:
COMMAND = 'log-dirs'
COMMAND = 'describe-log-dirs'
HELP = 'Get topic log directories and stats'

@classmethod
Expand All @@ -10,3 +13,31 @@ def add_arguments(cls, parser):
@classmethod
def command(cls, client, args):
return client.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers)


class AlterLogDirs:
COMMAND = 'alter-log-dirs'
HELP = 'Move replicas between log directories on their hosting brokers'

@classmethod
def add_arguments(cls, parser):
parser.add_argument(
'-a', '--assignment', type=str, action='append',
dest='assignments', default=[], required=True,
help='TOPIC:PARTITION:BROKER_ID=/absolute/log/dir/path (repeatable). '
'Instructs BROKER_ID to move its replica of TOPIC:PARTITION '
'into the given log directory.')

@classmethod
def command(cls, client, args):
assignments = {}
for spec in args.assignments:
tpr_str, log_dir = spec.rsplit('=', 1)
topic, partition, broker_id = tpr_str.rsplit(':', 2)
tpr = TopicPartitionReplica(topic, int(partition), int(broker_id))
assignments[tpr] = log_dir
result = client.alter_replica_log_dirs(assignments)
return {
f'{tpr.topic}:{tpr.partition}:{tpr.broker_id}': err.__name__
for tpr, err in result.items()
}
4 changes: 4 additions & 0 deletions kafka/protocol/admin/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ def json_patch(cls, json):
class DescribeLogDirsRequest(ApiMessage): pass
class DescribeLogDirsResponse(ApiMessage): pass

class AlterReplicaLogDirsRequest(ApiMessage): pass
class AlterReplicaLogDirsResponse(ApiMessage): pass

class UpdateFeaturesRequest(ApiMessage): pass
class UpdateFeaturesResponse(ApiMessage): pass


__all__ = [
'DescribeClusterRequest', 'DescribeClusterResponse',
'DescribeLogDirsRequest', 'DescribeLogDirsResponse',
'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse',
'UpdateFeaturesRequest', 'UpdateFeaturesResponse',
]
116 changes: 115 additions & 1 deletion kafka/protocol/admin/cluster.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ from typing import Any, Self
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse']
__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse']

class DescribeClusterRequest(ApiMessage):
include_cluster_authorized_operations: bool
Expand Down Expand Up @@ -224,6 +224,120 @@ class DescribeLogDirsResponse(ApiMessage):
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class AlterReplicaLogDirsRequest(ApiMessage):
class AlterReplicaLogDir(DataContainer):
class AlterReplicaLogDirTopic(DataContainer):
name: str
partitions: list[int]
def __init__(
self,
*args: Any,
name: str = ...,
partitions: list[int] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

path: str
topics: list[AlterReplicaLogDirTopic]
def __init__(
self,
*args: Any,
path: str = ...,
topics: list[AlterReplicaLogDirTopic] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

dirs: list[AlterReplicaLogDir]
def __init__(
self,
*args: Any,
dirs: list[AlterReplicaLogDir] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class AlterReplicaLogDirsResponse(ApiMessage):
class AlterReplicaLogDirTopicResult(DataContainer):
class AlterReplicaLogDirPartitionResult(DataContainer):
partition_index: int
error_code: int
def __init__(
self,
*args: Any,
partition_index: int = ...,
error_code: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

topic_name: str
partitions: list[AlterReplicaLogDirPartitionResult]
def __init__(
self,
*args: Any,
topic_name: str = ...,
partitions: list[AlterReplicaLogDirPartitionResult] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

throttle_time_ms: int
results: list[AlterReplicaLogDirTopicResult]
def __init__(
self,
*args: Any,
throttle_time_ms: int = ...,
results: list[AlterReplicaLogDirTopicResult] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class UpdateFeaturesRequest(ApiMessage):
class FeatureUpdateKey(DataContainer):
feature: str
Expand Down
41 changes: 41 additions & 0 deletions kafka/protocol/schemas/resources/AlterReplicaLogDirsRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 34,
"type": "request",
"listeners": ["broker"],
"name": "AlterReplicaLogDirsRequest",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
//
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.
"validVersions": "1-2",
"flexibleVersions": "2+",
"fields": [
{ "name": "Dirs", "type": "[]AlterReplicaLogDir", "versions": "0+",
"about": "The alterations to make for each directory.", "fields": [
{ "name": "Path", "type": "string", "versions": "0+", "mapKey": true,
"about": "The absolute directory path." },
{ "name": "Topics", "type": "[]AlterReplicaLogDirTopic", "versions": "0+",
"about": "The topics to add to the directory.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partition indexes." }
]}
]}
]
}
41 changes: 41 additions & 0 deletions kafka/protocol/schemas/resources/AlterReplicaLogDirsResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 34,
"type": "response",
"name": "AlterReplicaLogDirsResponse",
// Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline.
// Starting in version 1, on quota violation brokers send out responses before throttling.
// Version 2 enables flexible versions.
"validVersions": "1-2",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Results", "type": "[]AlterReplicaLogDirTopicResult", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the topic." },
{ "name": "Partitions", "type": "[]AlterReplicaLogDirPartitionResult", "versions": "0+",
"about": "The results for each partition.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index."},
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]}
]}
]
}
Loading
Loading