diff --git a/kafka/__init__.py b/kafka/__init__.py index d5cc26de5..aff51c166 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -15,12 +15,13 @@ from kafka.consumer.subscription_state import ConsumerRebalanceListener from kafka.producer import KafkaProducer from kafka.serializer import Serializer, Deserializer -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.structs import TopicPartition, TopicPartitionReplica, OffsetAndMetadata from kafka.protocol.consumer import IsolationLevel, OffsetSpec __all__ = [ 'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer', 'ConsumerRebalanceListener', 'Serializer', 'Deserializer', - 'TopicPartition', 'OffsetAndMetadata', 'IsolationLevel', 'OffsetSpec', + 'TopicPartition', 'TopicPartitionReplica', 'OffsetAndMetadata', + 'IsolationLevel', 'OffsetSpec', ] diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index dc366bd4b..aee5b326b 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -10,7 +10,12 @@ import kafka.errors as Errors from kafka.protocol.api_key import ApiKey from kafka.protocol.metadata import ApiVersionsRequest, MetadataRequest -from kafka.protocol.admin import DescribeLogDirsRequest, UpdateFeaturesRequest +from kafka.protocol.admin import ( + AlterReplicaLogDirsRequest, + DescribeLogDirsRequest, + UpdateFeaturesRequest, +) +from kafka.structs import TopicPartitionReplica from kafka.util import EnumHelper if TYPE_CHECKING: @@ -80,6 +85,63 @@ def describe_log_dirs(self, topic_partitions=None, brokers=None): topic_partitions = self._get_topic_partitions(topic_partitions) return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) + @staticmethod + def _alter_replica_log_dirs_requests(replica_assignments): + _Dir = AlterReplicaLogDirsRequest.AlterReplicaLogDir + _Topic = _Dir.AlterReplicaLogDirTopic + broker_to_dirs = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + for tpr, log_dir in replica_assignments.items(): + if not isinstance(tpr, TopicPartitionReplica): + tpr = TopicPartitionReplica(*tpr) + broker_to_dirs[tpr.broker_id][log_dir][tpr.topic].append(tpr.partition) + return { + broker_id: AlterReplicaLogDirsRequest(dirs=[ + _Dir(path=path, topics=[ + _Topic(name=topic, partitions=parts) + for topic, parts in topics.items() + ]) + for path, topics in dirs.items() + ]) + for broker_id, dirs in broker_to_dirs.items() + } + + async def _async_alter_replica_log_dirs(self, replica_assignments): + if not replica_assignments: + return {} + broker_requests = self._alter_replica_log_dirs_requests(replica_assignments) + result = {} + for broker_id, request in broker_requests.items(): + response = await self._manager.send(request, node_id=broker_id) + for topic in response.results: + for partition in topic.partitions: + tpr = TopicPartitionReplica( + topic=topic.topic_name, + partition=partition.partition_index, + broker_id=broker_id) + result[tpr] = Errors.for_code(partition.error_code) + return result + + def alter_replica_log_dirs(self, replica_assignments): + """Move replicas between log directories on their hosting brokers. + + Each entry instructs the targeted broker to move (or place) the + replica for a given partition into the specified absolute log + directory path. Requests are sent to each broker in parallel; a + broker will only act on replicas it currently hosts. + + Arguments: + replica_assignments: A dict mapping + :class:`~kafka.TopicPartitionReplica` (``topic``, + ``partition``, ``broker_id``) to the destination log + directory path (absolute string). Tuples of + ``(topic, partition, broker_id)`` are also accepted. + + Returns: + dict mapping :class:`~kafka.TopicPartitionReplica` to the + corresponding error class (``kafka.errors.NoError`` on success). + """ + return self._manager.run(self._async_alter_replica_log_dirs, replica_assignments) + async def _async_get_broker_version_data(self, broker_id): conn = await self._manager.get_connection(broker_id) return conn.broker_version_data diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 5b62f9661..71fb3213d 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -102,7 +102,6 @@ def run_cli(args=None): # abort (EndTxn) # [cluster] - # alter-log-dirs (AlterReplicaLogDirs) # describe-quorum (DescribeQuorum) # unregister-broker (UnregisterBroker) # add-raft-voter (AddRaftVoter) diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index ebfb034dc..25b7877f6 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -1,12 +1,14 @@ from .api_versions import GetApiVersions from .broker_version import GetBrokerVersion from .describe import DescribeCluster -from .log_dirs import DescribeLogDirs +from .log_dirs import DescribeLogDirs, AlterLogDirs from .features import DescribeFeatures, UpdateFeatures class ClusterCommandGroup: GROUP = 'cluster' HELP = 'Manage Kafka Cluster' - COMMANDS = [DescribeCluster, DescribeFeatures, UpdateFeatures, - GetApiVersions, GetBrokerVersion, DescribeLogDirs] + COMMANDS = [DescribeCluster, + GetApiVersions, GetBrokerVersion, + DescribeFeatures, UpdateFeatures, + DescribeLogDirs, AlterLogDirs] diff --git a/kafka/cli/admin/cluster/log_dirs.py b/kafka/cli/admin/cluster/log_dirs.py index 055d47b90..beed9bb5b 100644 --- a/kafka/cli/admin/cluster/log_dirs.py +++ b/kafka/cli/admin/cluster/log_dirs.py @@ -1,5 +1,8 @@ +from kafka.structs import TopicPartitionReplica + + class DescribeLogDirs: - COMMAND = 'log-dirs' + COMMAND = 'describe-log-dirs' HELP = 'Get topic log directories and stats' @classmethod @@ -10,3 +13,31 @@ def add_arguments(cls, parser): @classmethod def command(cls, client, args): return client.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers) + + +class AlterLogDirs: + COMMAND = 'alter-log-dirs' + HELP = 'Move replicas between log directories on their hosting brokers' + + @classmethod + def add_arguments(cls, parser): + parser.add_argument( + '-a', '--assignment', type=str, action='append', + dest='assignments', default=[], required=True, + help='TOPIC:PARTITION:BROKER_ID=/absolute/log/dir/path (repeatable). ' + 'Instructs BROKER_ID to move its replica of TOPIC:PARTITION ' + 'into the given log directory.') + + @classmethod + def command(cls, client, args): + assignments = {} + for spec in args.assignments: + tpr_str, log_dir = spec.rsplit('=', 1) + topic, partition, broker_id = tpr_str.rsplit(':', 2) + tpr = TopicPartitionReplica(topic, int(partition), int(broker_id)) + assignments[tpr] = log_dir + result = client.alter_replica_log_dirs(assignments) + return { + f'{tpr.topic}:{tpr.partition}:{tpr.broker_id}': err.__name__ + for tpr, err in result.items() + } diff --git a/kafka/protocol/admin/cluster.py b/kafka/protocol/admin/cluster.py index af242dd6e..b95568cb4 100644 --- a/kafka/protocol/admin/cluster.py +++ b/kafka/protocol/admin/cluster.py @@ -11,6 +11,9 @@ def json_patch(cls, json): class DescribeLogDirsRequest(ApiMessage): pass class DescribeLogDirsResponse(ApiMessage): pass +class AlterReplicaLogDirsRequest(ApiMessage): pass +class AlterReplicaLogDirsResponse(ApiMessage): pass + class UpdateFeaturesRequest(ApiMessage): pass class UpdateFeaturesResponse(ApiMessage): pass @@ -18,5 +21,6 @@ class UpdateFeaturesResponse(ApiMessage): pass __all__ = [ 'DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', + 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse', ] diff --git a/kafka/protocol/admin/cluster.pyi b/kafka/protocol/admin/cluster.pyi index 6070360a6..0b108aa11 100644 --- a/kafka/protocol/admin/cluster.pyi +++ b/kafka/protocol/admin/cluster.pyi @@ -5,7 +5,7 @@ from typing import Any, Self from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse'] +__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'AlterReplicaLogDirsRequest', 'AlterReplicaLogDirsResponse', 'UpdateFeaturesRequest', 'UpdateFeaturesResponse'] class DescribeClusterRequest(ApiMessage): include_cluster_authorized_operations: bool @@ -224,6 +224,120 @@ class DescribeLogDirsResponse(ApiMessage): def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... +class AlterReplicaLogDirsRequest(ApiMessage): + class AlterReplicaLogDir(DataContainer): + class AlterReplicaLogDirTopic(DataContainer): + name: str + partitions: list[int] + def __init__( + self, + *args: Any, + name: str = ..., + partitions: list[int] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + path: str + topics: list[AlterReplicaLogDirTopic] + def __init__( + self, + *args: Any, + path: str = ..., + topics: list[AlterReplicaLogDirTopic] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + dirs: list[AlterReplicaLogDir] + def __init__( + self, + *args: Any, + dirs: list[AlterReplicaLogDir] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class AlterReplicaLogDirsResponse(ApiMessage): + class AlterReplicaLogDirTopicResult(DataContainer): + class AlterReplicaLogDirPartitionResult(DataContainer): + partition_index: int + error_code: int + def __init__( + self, + *args: Any, + partition_index: int = ..., + error_code: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + topic_name: str + partitions: list[AlterReplicaLogDirPartitionResult] + def __init__( + self, + *args: Any, + topic_name: str = ..., + partitions: list[AlterReplicaLogDirPartitionResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + results: list[AlterReplicaLogDirTopicResult] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + results: list[AlterReplicaLogDirTopicResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + class UpdateFeaturesRequest(ApiMessage): class FeatureUpdateKey(DataContainer): feature: str diff --git a/kafka/protocol/schemas/resources/AlterReplicaLogDirsRequest.json b/kafka/protocol/schemas/resources/AlterReplicaLogDirsRequest.json new file mode 100644 index 000000000..42ef66933 --- /dev/null +++ b/kafka/protocol/schemas/resources/AlterReplicaLogDirsRequest.json @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 34, + "type": "request", + "listeners": ["broker"], + "name": "AlterReplicaLogDirsRequest", + // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. + // + // Version 1 is the same as version 0. + // Version 2 enables flexible versions. + "validVersions": "1-2", + "flexibleVersions": "2+", + "fields": [ + { "name": "Dirs", "type": "[]AlterReplicaLogDir", "versions": "0+", + "about": "The alterations to make for each directory.", "fields": [ + { "name": "Path", "type": "string", "versions": "0+", "mapKey": true, + "about": "The absolute directory path." }, + { "name": "Topics", "type": "[]AlterReplicaLogDirTopic", "versions": "0+", + "about": "The topics to add to the directory.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partition indexes." } + ]} + ]} + ] +} diff --git a/kafka/protocol/schemas/resources/AlterReplicaLogDirsResponse.json b/kafka/protocol/schemas/resources/AlterReplicaLogDirsResponse.json new file mode 100644 index 000000000..d26c9e873 --- /dev/null +++ b/kafka/protocol/schemas/resources/AlterReplicaLogDirsResponse.json @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 34, + "type": "response", + "name": "AlterReplicaLogDirsResponse", + // Version 0 was removed in Apache Kafka 4.0, Version 1 is the new baseline. + // Starting in version 1, on quota violation brokers send out responses before throttling. + // Version 2 enables flexible versions. + "validVersions": "1-2", + "flexibleVersions": "2+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Results", "type": "[]AlterReplicaLogDirTopicResult", "versions": "0+", + "about": "The results for each topic.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The name of the topic." }, + { "name": "Partitions", "type": "[]AlterReplicaLogDirPartitionResult", "versions": "0+", + "about": "The results for each partition.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index."}, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." } + ]} + ]} + ] +} diff --git a/kafka/structs.py b/kafka/structs.py index 9fae24f3b..4189cc1bc 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -13,6 +13,17 @@ """ +TopicPartitionReplica = namedtuple("TopicPartitionReplica", + ["topic", "partition", "broker_id"]) +TopicPartitionReplica.__doc__ = """A topic / partition / broker replica tuple + +Keyword Arguments: + topic (str): A topic name + partition (int): A partition id + broker_id (int): The node_id of the broker hosting the replica +""" + + OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata", "leader_epoch"], defaults=[None, '', -1]) OffsetAndMetadata.__doc__ = """Container for committed group offset data. diff --git a/test/admin/test_admin_cluster_log_dirs.py b/test/admin/test_admin_cluster_log_dirs.py new file mode 100644 index 000000000..8c2c88f0c --- /dev/null +++ b/test/admin/test_admin_cluster_log_dirs.py @@ -0,0 +1,156 @@ +import pytest + +import kafka.errors as Errors +from kafka.admin import KafkaAdminClient +from kafka.protocol.admin import ( + AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse, +) +from kafka.protocol.metadata import MetadataResponse +from kafka.structs import TopicPartitionReplica + +from test.mock_broker import MockBroker + + +@pytest.fixture +def broker(request): + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + + +@pytest.fixture +def multi_broker(broker): + Broker = MetadataResponse.MetadataResponseBroker + broker.set_metadata(brokers=[ + Broker(node_id=0, host=broker.host, port=broker.port, rack=None), + Broker(node_id=1, host=broker.host, port=broker.port, rack=None), + ]) + return broker + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + request_timeout_ms=5000, + ) + try: + yield admin + finally: + admin.close() + + +def _alter_response(results): + """results: iterable of (topic, partition, error_code).""" + Topic = AlterReplicaLogDirsResponse.AlterReplicaLogDirTopicResult + Partition = Topic.AlterReplicaLogDirPartitionResult + by_topic = {} + for topic, partition, err in results: + by_topic.setdefault(topic, []).append( + Partition(partition_index=partition, error_code=err)) + return AlterReplicaLogDirsResponse( + throttle_time_ms=0, + results=[Topic(topic_name=t, partitions=parts) + for t, parts in by_topic.items()], + ) + + +class TestAlterReplicaLogDirsMockBroker: + + def test_empty_input_is_noop(self, admin): + assert admin.alter_replica_log_dirs({}) == {} + + def test_success_returns_per_replica_errors(self, broker, admin): + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = AlterReplicaLogDirsRequest.decode( + request_bytes, version=api_version, header=True) + return _alter_response([('topic-a', 0, 0), ('topic-a', 1, 0)]) + + broker.respond_fn(AlterReplicaLogDirsRequest, handler) + + result = admin.alter_replica_log_dirs({ + TopicPartitionReplica('topic-a', 0, 0): '/tmp/kafka-logs-a', + TopicPartitionReplica('topic-a', 1, 0): '/tmp/kafka-logs-a', + }) + + assert result == { + TopicPartitionReplica('topic-a', 0, 0): Errors.NoError, + TopicPartitionReplica('topic-a', 1, 0): Errors.NoError, + } + + req = captured['request'] + assert len(req.dirs) == 1 + assert req.dirs[0].path == '/tmp/kafka-logs-a' + assert req.dirs[0].topics[0].name == 'topic-a' + assert list(req.dirs[0].topics[0].partitions) == [0, 1] + + def test_groups_by_dir_within_broker(self, broker, admin): + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = AlterReplicaLogDirsRequest.decode( + request_bytes, version=api_version, header=True) + return _alter_response([('t', 0, 0), ('t', 1, 0)]) + + broker.respond_fn(AlterReplicaLogDirsRequest, handler) + + admin.alter_replica_log_dirs({ + TopicPartitionReplica('t', 0, 0): '/disk/a', + TopicPartitionReplica('t', 1, 0): '/disk/b', + }) + + req = captured['request'] + by_dir = {d.path: d for d in req.dirs} + assert set(by_dir) == {'/disk/a', '/disk/b'} + + def test_tuple_key_accepted(self, broker, admin): + broker.respond( + AlterReplicaLogDirsRequest, + _alter_response([('topic-a', 0, 0)])) + + result = admin.alter_replica_log_dirs({ + ('topic-a', 0, 0): '/tmp/kafka-logs-a', + }) + assert result == {TopicPartitionReplica('topic-a', 0, 0): Errors.NoError} + + def test_one_request_per_broker(self, multi_broker, admin): + captured = [] + + def handler(api_key, api_version, correlation_id, request_bytes): + captured.append(AlterReplicaLogDirsRequest.decode( + request_bytes, version=api_version, header=True)) + return _alter_response([('topic-a', 0, 0)]) + + multi_broker.respond_fn(AlterReplicaLogDirsRequest, handler) + multi_broker.respond_fn(AlterReplicaLogDirsRequest, handler) + + result = admin.alter_replica_log_dirs({ + TopicPartitionReplica('topic-a', 0, 0): '/disk/a', + TopicPartitionReplica('topic-a', 0, 1): '/disk/b', + }) + + assert len(captured) == 2 + paths_sent = {r.dirs[0].path for r in captured} + assert paths_sent == {'/disk/a', '/disk/b'} + assert set(result) == { + TopicPartitionReplica('topic-a', 0, 0), + TopicPartitionReplica('topic-a', 0, 1), + } + + def test_error_surfaces_per_replica(self, broker, admin): + broker.respond( + AlterReplicaLogDirsRequest, + _alter_response([ + ('topic-a', 0, 0), + ('topic-a', 1, Errors.LogDirNotFoundError.errno), + ])) + + result = admin.alter_replica_log_dirs({ + TopicPartitionReplica('topic-a', 0, 0): '/disk/a', + TopicPartitionReplica('topic-a', 1, 0): '/disk/a', + }) + + assert result[TopicPartitionReplica('topic-a', 0, 0)] is Errors.NoError + assert result[TopicPartitionReplica('topic-a', 1, 0)] is Errors.LogDirNotFoundError diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index af54448e0..ac56a482f 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -16,7 +16,7 @@ UnknownTopicOrPartitionError, ElectionNotNeededError, KafkaTimeoutError, IncompatibleBrokerVersion ) -from kafka.structs import TopicPartition, OffsetAndTimestamp +from kafka.structs import TopicPartition, OffsetAndTimestamp, TopicPartitionReplica from test.testutil import env_kafka_version, random_string from test.integration.fixtures import create_topics @@ -494,6 +494,19 @@ def test_describe_log_dirs(kafka_admin_client): assert log_dir['error_code'] == 0 +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="AlterReplicaLogDirs requires broker >=1.1") +def test_alter_replica_log_dirs(kafka_admin_client, topic): + log_dirs = kafka_admin_client.describe_log_dirs() + target_dir = log_dirs[0]['log_dirs'][0]['log_dir'] + broker_id = log_dirs[0]['broker'] + tpr = TopicPartitionReplica(topic, 0, broker_id) + + result = kafka_admin_client.alter_replica_log_dirs({tpr: target_dir}) + assert tpr in result + # Moving a replica to its current log dir is a no-op that returns NoError. + assert result[tpr] is NoError + + @pytest.mark.skipif(env_kafka_version() < (2, 4), reason="AlterPartitionReassignments requires broker >=2.4") def test_alter_partition_reassignments(kafka_admin_client, topic): topic_metadata = kafka_admin_client.describe_topics([topic])[0]