diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e654e737789c0..3730dab640098 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -99,6 +99,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; @@ -2267,7 +2269,7 @@ public DescribeLogDirsResult describeLogDirs(Collection brokers, Descri @Override public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { // Query selected partitions in all log directories - return new DescribeLogDirsRequest.Builder(null); + return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)); } @Override @@ -2299,20 +2301,33 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection()); } - Map> partitionsByBroker = new HashMap<>(); + Map partitionsByBroker = new HashMap<>(); - for (TopicPartitionReplica replica : replicas) { - partitionsByBroker.computeIfAbsent(replica.brokerId(), key -> new HashSet<>()) - .add(new TopicPartition(replica.topic(), replica.partition())); + for (TopicPartitionReplica replica: replicas) { + DescribeLogDirsRequestData requestData = partitionsByBroker.computeIfAbsent(replica.brokerId(), + brokerId -> new DescribeLogDirsRequestData()); + DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic()); + if (describableLogDirTopic == null) { + List partitionIndex = new ArrayList<>(); + partitionIndex.add(replica.partition()); + describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic()) + .setPartitionIndex(partitionIndex); + requestData.topics().add(describableLogDirTopic); + } else { + describableLogDirTopic.partitionIndex().add(replica.partition()); + } } final long now = time.milliseconds(); - for (Map.Entry> entry: partitionsByBroker.entrySet()) { + for (Map.Entry entry: partitionsByBroker.entrySet()) { final int brokerId = entry.getKey(); - final Set topicPartitions = entry.getValue(); + final DescribeLogDirsRequestData topicPartitions = entry.getValue(); final Map replicaDirInfoByPartition = new HashMap<>(); - for (TopicPartition topicPartition: topicPartitions) - replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo()); + for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) { + for (Integer partitionId : topicPartition.partitionIndex()) { + replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo()); + } + } runnable.call(new Call("describeReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6549d8fbb12d3..da63a3164481b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; @@ -47,6 +49,8 @@ import org.apache.kafka.common.message.DescribeDelegationTokenResponseData; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.EndTxnRequestData; @@ -69,8 +73,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; -import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; -import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.MetadataRequestData; @@ -110,8 +112,6 @@ import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; -import org.apache.kafka.common.requests.DescribeLogDirsRequest; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.ListOffsetRequest; @@ -190,8 +190,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) { AlterConfigsResponse.schemaVersions()), ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(), AlterReplicaLogDirsResponse.schemaVersions()), - DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(), - DescribeLogDirsResponse.schemaVersions()), + DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS, + DescribeLogDirsResponseData.SCHEMAS), SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS, SaslAuthenticateResponseData.SCHEMAS), CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 9d3f39d4f5124..a5799af81367f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -151,7 +151,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, shor case ALTER_REPLICA_LOG_DIRS: return new AlterReplicaLogDirsResponse(struct); case DESCRIBE_LOG_DIRS: - return new DescribeLogDirsResponse(struct); + return new DescribeLogDirsResponse(struct, version); case SASL_AUTHENTICATE: return new SaslAuthenticateResponse(struct, version); case CREATE_PARTITIONS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index 84f6c9111a3b5..a58ef1fedb532 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -17,137 +17,62 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; public class DescribeLogDirsRequest extends AbstractRequest { - // request level key names - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema( - new Field(TOPICS_KEY_NAME, ArrayOf.nullable(new Schema( - TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic."))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V1 = DESCRIBE_LOG_DIRS_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0, DESCRIBE_LOG_DIRS_REQUEST_V1}; - } - - private final Set topicPartitions; + private final DescribeLogDirsRequestData data; public static class Builder extends AbstractRequest.Builder { - private final Set topicPartitions; + private final DescribeLogDirsRequestData data; - // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. - public Builder(Set partitions) { + public Builder(DescribeLogDirsRequestData data) { super(ApiKeys.DESCRIBE_LOG_DIRS); - this.topicPartitions = partitions; + this.data = data; } @Override public DescribeLogDirsRequest build(short version) { - return new DescribeLogDirsRequest(topicPartitions, version); + return new DescribeLogDirsRequest(data, version); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("(type=DescribeLogDirsRequest") - .append(", topicPartitions=") - .append(topicPartitions) - .append(")"); - return builder.toString(); + return data.toString(); } } public DescribeLogDirsRequest(Struct struct, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); - - if (struct.getArray(TOPICS_KEY_NAME) == null) { - topicPartitions = null; - } else { - topicPartitions = new HashSet<>(); - for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - int partition = (Integer) partitionObj; - topicPartitions.add(new TopicPartition(topic, partition)); - } - } - } + this.data = new DescribeLogDirsRequestData(struct, version); } - // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. - public DescribeLogDirsRequest(Set topicPartitions, short version) { + public DescribeLogDirsRequest(DescribeLogDirsRequestData data, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); - this.topicPartitions = topicPartitions; + this.data = data; + } + + public DescribeLogDirsRequestData data() { + return data; } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.requestSchema(version())); - if (topicPartitions == null) { - struct.set(TOPICS_KEY_NAME, null); - return struct; - } - - Map> partitionsByTopic = new HashMap<>(); - for (TopicPartition tp : topicPartitions) { - if (!partitionsByTopic.containsKey(tp.topic())) { - partitionsByTopic.put(tp.topic(), new ArrayList()); - } - partitionsByTopic.get(tp.topic()).add(tp.partition()); - } - - List topicStructArray = new ArrayList<>(); - for (Map.Entry> partitionsByTopicEntry : partitionsByTopic.entrySet()) { - Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, partitionsByTopicEntry.getKey()); - topicStruct.set(PARTITIONS_KEY_NAME, partitionsByTopicEntry.getValue().toArray()); - topicStructArray.add(topicStruct); - } - struct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - - return struct; + return data.toStruct(version()); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new DescribeLogDirsResponse(throttleTimeMs, new HashMap()); + return new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setThrottleTimeMs(throttleTimeMs)); } public boolean isAllTopicPartitions() { - return topicPartitions == null; - } - - public Set topicPartitions() { - return topicPartitions; + return data.topics() == null; } public static DescribeLogDirsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index ee6d95cfff843..b744769b43d75 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -18,172 +18,77 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsPartition; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsResult; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; - public class DescribeLogDirsResponse extends AbstractResponse { public static final long INVALID_OFFSET_LAG = -1L; - // request level key names - private static final String LOG_DIRS_KEY_NAME = "log_dirs"; - - // dir level key names - private static final String LOG_DIR_KEY_NAME = "log_dir"; - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - // partition level key names - private static final String SIZE_KEY_NAME = "size"; - private static final String OFFSET_LAG_KEY_NAME = "offset_lag"; - private static final String IS_FUTURE_KEY_NAME = "is_future"; - - private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - new Field(LOG_DIRS_KEY_NAME, new ArrayOf(new Schema( - ERROR_CODE, - new Field(LOG_DIR_KEY_NAME, STRING, "The absolute log directory path."), - new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - new Field(SIZE_KEY_NAME, INT64, "The size of the log segments of the partition in bytes."), - new Field(OFFSET_LAG_KEY_NAME, INT64, "The lag of the log's LEO w.r.t. partition's HW " + - "(if it is the current log for the partition) or current replica's LEO " + - "(if it is the future log for the partition)"), - new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this log is created by " + - "AlterReplicaLogDirsRequest and will replace the current log of the replica " + - "in the future."))))))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V1 = DESCRIBE_LOG_DIRS_RESPONSE_V0; + private final DescribeLogDirsResponseData data; - public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_LOG_DIRS_RESPONSE_V0, DESCRIBE_LOG_DIRS_RESPONSE_V1}; + public DescribeLogDirsResponse(Struct struct, short version) { + this.data = new DescribeLogDirsResponseData(struct, version); } - private final int throttleTimeMs; - private final Map logDirInfos; - - public DescribeLogDirsResponse(Struct struct) { - throttleTimeMs = struct.get(THROTTLE_TIME_MS); - logDirInfos = new HashMap<>(); - - for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) { - Struct logDirStruct = (Struct) logDirStructObj; - Errors error = Errors.forCode(logDirStruct.get(ERROR_CODE)); - String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); - Map replicaInfos = new HashMap<>(); - - for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - - for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.get(PARTITION_ID); - long size = partitionStruct.getLong(SIZE_KEY_NAME); - long offsetLag = partitionStruct.getLong(OFFSET_LAG_KEY_NAME); - boolean isFuture = partitionStruct.getBoolean(IS_FUTURE_KEY_NAME); - ReplicaInfo replicaInfo = new ReplicaInfo(size, offsetLag, isFuture); - replicaInfos.put(new TopicPartition(topic, partition), replicaInfo); - } - } - - logDirInfos.put(logDir, new LogDirInfo(error, replicaInfos)); - } + public DescribeLogDirsResponse(DescribeLogDirsResponseData data) { + this.data = data; } - /** - * Constructor for version 0. - */ - public DescribeLogDirsResponse(int throttleTimeMs, Map logDirInfos) { - this.throttleTimeMs = throttleTimeMs; - this.logDirInfos = logDirInfos; + public DescribeLogDirsResponseData data() { + return data; } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); - List logDirStructArray = new ArrayList<>(); - for (Map.Entry logDirInfosEntry : logDirInfos.entrySet()) { - LogDirInfo logDirInfo = logDirInfosEntry.getValue(); - Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME); - logDirStruct.set(ERROR_CODE, logDirInfo.error.code()); - logDirStruct.set(LOG_DIR_KEY_NAME, logDirInfosEntry.getKey()); - - Map> replicaInfosByTopic = CollectionUtils.groupPartitionDataByTopic(logDirInfo.replicaInfos); - List topicStructArray = new ArrayList<>(); - for (Map.Entry> replicaInfosByTopicEntry : replicaInfosByTopic.entrySet()) { - Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, replicaInfosByTopicEntry.getKey()); - List partitionStructArray = new ArrayList<>(); - - for (Map.Entry replicaInfosByPartitionEntry : replicaInfosByTopicEntry.getValue().entrySet()) { - Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); - ReplicaInfo replicaInfo = replicaInfosByPartitionEntry.getValue(); - partitionStruct.set(PARTITION_ID, replicaInfosByPartitionEntry.getKey()); - partitionStruct.set(SIZE_KEY_NAME, replicaInfo.size); - partitionStruct.set(OFFSET_LAG_KEY_NAME, replicaInfo.offsetLag); - partitionStruct.set(IS_FUTURE_KEY_NAME, replicaInfo.isFuture); - partitionStructArray.add(partitionStruct); - } - topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray()); - topicStructArray.add(topicStruct); - } - logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - logDirStructArray.add(logDirStruct); - } - struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray()); - return struct; + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } @Override public Map errorCounts() { Map errorCounts = new HashMap<>(); - for (LogDirInfo logDirInfo : logDirInfos.values()) - updateErrorCounts(errorCounts, logDirInfo.error); + data.results().forEach(result -> { + updateErrorCounts(errorCounts, Errors.forCode(result.errorCode())); + }); return errorCounts; } public Map logDirInfos() { - return logDirInfos; + HashMap result = new HashMap<>(data.results().size()); + for (DescribeLogDirsResult logDirResult : data.results()) { + Map replicaInfoMap = new HashMap<>(); + for (DescribeLogDirsTopic t : logDirResult.topics()) { + for (DescribeLogDirsPartition p : t.partitions()) { + replicaInfoMap.put( + new TopicPartition(t.name(), p.partitionIndex()), + new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); + } + } + result.put(logDirResult.logDir(), new LogDirInfo(Errors.forCode(logDirResult.errorCode()), replicaInfoMap)); + } + return result; } public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) { - return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer)); + return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer), version); } + // Note this class is part of the public API, reachable from Admin.describeLogDirs() /** * Possible error code: * @@ -211,6 +116,7 @@ public String toString() { } } + // Note this class is part of the public API, reachable from Admin.describeLogDirs() static public class ReplicaInfo { public final long size; diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json index 333fa682b1dcb..577f2eb4cf6a8 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json @@ -18,12 +18,13 @@ "type": "request", "name": "DescribeLogDirsRequest", // Version 1 is the same as version 0. - "validVersions": "0-1", - "flexibleVersions": "none", + "validVersions": "0-2", + // Version 2 is the first flexible version. + "flexibleVersions": "2+", "fields": [ { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+", "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [ - { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name" }, { "name": "PartitionIndex", "type": "[]int32", "versions": "0+", "about": "The partition indxes." } diff --git a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json index d75b348b298e2..4322f1c52ee2f 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json @@ -18,8 +18,9 @@ "type": "response", "name": "DescribeLogDirsResponse", // Starting in version 1, on quota violation, brokers send out responses before throttling. - "validVersions": "0-1", - "flexibleVersions": "none", + "validVersions": "0-2", + // Version 2 is the first flexible version. + "flexibleVersions": "2+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1049c5c53a343..e235cf06fa56a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -50,7 +50,7 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} +import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} @@ -66,7 +66,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata -import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ @@ -2551,14 +2550,18 @@ class KafkaApis(val requestChannel: RequestChannel, if (describeLogDirsDirRequest.isAllTopicPartitions) replicaManager.logManager.allLogs.map(_.topicPartition).toSet else - describeLogDirsDirRequest.topicPartitions.asScala + describeLogDirsDirRequest.data.topics.asScala.flatMap( + logDirTopic => logDirTopic.partitionIndex.asScala.map(partitionIndex => + new TopicPartition(logDirTopic.topic, partitionIndex))).toSet replicaManager.describeLogDirs(partitions) } else { - Map.empty[String, LogDirInfo] + List.empty[DescribeLogDirsResponseData.DescribeLogDirsResult] } } - sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(throttleTimeMs, logDirInfos.asJava)) + sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(new DescribeLogDirsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setResults(logDirInfos.asJava))) } def handleCreateTokenRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 008f422234f58..47d96839833ce 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -39,8 +39,8 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartiti import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState -import org.apache.kafka.common.message.LeaderAndIsrResponseData import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult +import org.apache.kafka.common.message.{DescribeLogDirsResponseData, LeaderAndIsrResponseData} import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName @@ -50,7 +50,6 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.replica.{ClientMetadata, _} -import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo} import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction @@ -661,7 +660,7 @@ class ReplicaManager(val config: KafkaConfig, * 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. */ - def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = { + def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) config.logDirs.toSet.map { logDir: String => @@ -672,25 +671,38 @@ class ReplicaManager(val config: KafkaConfig, logsByDir.get(absolutePath) match { case Some(logs) => - val replicaInfos = logs.filter { log => - partitions.contains(log.topicPartition) - }.map { log => - log.topicPartition -> new ReplicaInfo(log.size, getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture), log.isFuture) - }.toMap - - (absolutePath, new LogDirInfo(Errors.NONE, replicaInfos.asJava)) + val topicInfos = logs.groupBy(_.topicPartition.topic).map{case (topic, logs) => + new DescribeLogDirsResponseData.DescribeLogDirsTopic().setName(topic).setPartitions( + logs.filter { log => + partitions.contains(log.topicPartition) + }.map { log => + new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionSize(log.size) + .setPartitionIndex(log.topicPartition.partition) + .setOffsetLag(getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture)) + .setIsFutureKey(log.isFuture) + }.toList.asJava) + }.toList.asJava + + new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + .setErrorCode(Errors.NONE.code).setTopics(topicInfos) case None => - (absolutePath, new LogDirInfo(Errors.NONE, Map.empty[TopicPartition, ReplicaInfo].asJava)) + new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + .setErrorCode(Errors.NONE.code) } } catch { case _: KafkaStorageException => - (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava)) + new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setLogDir(absolutePath) + .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code) case t: Throwable => error(s"Error while describing replica in dir $absolutePath", t) - (absolutePath, new LogDirInfo(Errors.forException(t), Map.empty[TopicPartition, ReplicaInfo].asJava)) + new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setLogDir(absolutePath) + .setErrorCode(Errors.forException(t).code) } - }.toMap + }.toList } def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 9c9eda6318124..b932f462b4320 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} -import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData} +import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord} @@ -528,7 +528,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build() - private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build() + private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton( + new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitionIndex(Collections.singletonList(tp.partition))).iterator()))).build() private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build() diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala index 7062016e977d6..e8bad37a9c03d 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala @@ -21,6 +21,7 @@ import java.io.File import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.DescribeLogDirsRequestData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests._ import org.junit.Assert._ @@ -43,7 +44,7 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { createTopic(topic, partitionNum, 1) TestUtils.generateAndProduceMessages(servers, topic, 10) - val request = new DescribeLogDirsRequest.Builder(null).build() + val request = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)).build() val response = connectAndReceive[DescribeLogDirsResponse](request, destination = controllerSocketServer) val logDirInfos = response.logDirInfos() diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 913868f52b1a4..6cc794567ad84 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -454,7 +454,11 @@ class RequestQuotaTest extends BaseRequestTest { new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)) case ApiKeys.DESCRIBE_LOG_DIRS => - new DescribeLogDirsRequest.Builder(Collections.singleton(tp)) + val data = new DescribeLogDirsRequestData() + data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic() + .setTopic(tp.topic) + .setPartitionIndex(Collections.singletonList(tp.partition))) + new DescribeLogDirsRequest.Builder(data) case ApiKeys.CREATE_PARTITIONS => val data = new CreatePartitionsRequestData()