From 4f5c42f5778eb705fc197f518de7dd4b209b5a5b Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Thu, 16 Jan 2020 12:06:03 +0000 Subject: [PATCH 1/7] Fix KAFKA-9435, using automated protocol for DescribeLogDirs --- .../kafka/clients/admin/KafkaAdminClient.java | 36 +++- .../apache/kafka/common/protocol/ApiKeys.java | 12 +- .../common/requests/AbstractResponse.java | 2 +- .../requests/DescribeLogDirsRequest.java | 95 +++-------- .../requests/DescribeLogDirsResponse.java | 159 ++++-------------- .../message/DescribeLogDirsRequest.json | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 13 +- .../scala/kafka/server/ReplicaManager.scala | 38 +++-- .../kafka/api/AuthorizerIntegrationTest.scala | 5 +- .../server/DescribeLogDirsRequestTest.scala | 3 +- .../unit/kafka/server/RequestQuotaTest.scala | 6 +- 11 files changed, 133 insertions(+), 238 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e654e737789c0..2e4c7a2444124 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -99,6 +99,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; @@ -2267,7 +2269,7 @@ public DescribeLogDirsResult describeLogDirs(Collection brokers, Descri @Override public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { // Query selected partitions in all log directories - return new DescribeLogDirsRequest.Builder(null); + return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)); } @Override @@ -2299,20 +2301,36 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection()); } - Map> partitionsByBroker = new HashMap<>(); + Map partitionsByBroker = new HashMap<>(); - for (TopicPartitionReplica replica : replicas) { - partitionsByBroker.computeIfAbsent(replica.brokerId(), key -> new HashSet<>()) - .add(new TopicPartition(replica.topic(), replica.partition())); + for (TopicPartitionReplica replica: replicas) { + DescribeLogDirsRequestData requestData = partitionsByBroker.get(replica.brokerId()); + if (requestData == null) { + requestData = new DescribeLogDirsRequestData(); + partitionsByBroker.put(replica.brokerId(), requestData); + } + DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic()); + if (describableLogDirTopic == null) { + ArrayList v = new ArrayList<>(); + v.add(replica.partition()); + describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic()) + .setPartitionIndex(v); + requestData.topics().add(describableLogDirTopic); + } else { + describableLogDirTopic.partitionIndex().add(replica.partition()); + } } final long now = time.milliseconds(); - for (Map.Entry> entry: partitionsByBroker.entrySet()) { + for (Map.Entry entry: partitionsByBroker.entrySet()) { final int brokerId = entry.getKey(); - final Set topicPartitions = entry.getValue(); + final DescribeLogDirsRequestData topicPartitions = entry.getValue(); final Map replicaDirInfoByPartition = new HashMap<>(); - for (TopicPartition topicPartition: topicPartitions) - replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo()); + for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) { + for (Integer partitionId : topicPartition.partitionIndex()) { + replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo()); + } + } runnable.call(new Call("describeReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6549d8fbb12d3..da63a3164481b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; @@ -47,6 +49,8 @@ import org.apache.kafka.common.message.DescribeDelegationTokenResponseData; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; import org.apache.kafka.common.message.EndTxnRequestData; @@ -69,8 +73,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; -import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; -import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.MetadataRequestData; @@ -110,8 +112,6 @@ import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; -import org.apache.kafka.common.requests.DescribeLogDirsRequest; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.ListOffsetRequest; @@ -190,8 +190,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) { AlterConfigsResponse.schemaVersions()), ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(), AlterReplicaLogDirsResponse.schemaVersions()), - DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(), - DescribeLogDirsResponse.schemaVersions()), + DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS, + DescribeLogDirsResponseData.SCHEMAS), SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS, SaslAuthenticateResponseData.SCHEMAS), CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 9d3f39d4f5124..a5799af81367f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -151,7 +151,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, shor case ALTER_REPLICA_LOG_DIRS: return new AlterReplicaLogDirsResponse(struct); case DESCRIBE_LOG_DIRS: - return new DescribeLogDirsResponse(struct); + return new DescribeLogDirsResponse(struct, version); case SASL_AUTHENTICATE: return new SaslAuthenticateResponse(struct, version); case CREATE_PARTITIONS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index 84f6c9111a3b5..370703eb923ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -17,21 +17,15 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; import static org.apache.kafka.common.protocol.types.Type.INT32; @@ -49,105 +43,58 @@ public class DescribeLogDirsRequest extends AbstractRequest { TOPIC_NAME, new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic."))))); - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V1 = DESCRIBE_LOG_DIRS_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0, DESCRIBE_LOG_DIRS_REQUEST_V1}; - } - - private final Set topicPartitions; + private final DescribeLogDirsRequestData data; + private final short version; public static class Builder extends AbstractRequest.Builder { - private final Set topicPartitions; + private final DescribeLogDirsRequestData data; // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. - public Builder(Set partitions) { + public Builder(DescribeLogDirsRequestData data) { super(ApiKeys.DESCRIBE_LOG_DIRS); - this.topicPartitions = partitions; + this.data = data; } @Override public DescribeLogDirsRequest build(short version) { - return new DescribeLogDirsRequest(topicPartitions, version); + return new DescribeLogDirsRequest(data, version); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("(type=DescribeLogDirsRequest") - .append(", topicPartitions=") - .append(topicPartitions) - .append(")"); - return builder.toString(); + return data.toString(); } } public DescribeLogDirsRequest(Struct struct, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); - - if (struct.getArray(TOPICS_KEY_NAME) == null) { - topicPartitions = null; - } else { - topicPartitions = new HashSet<>(); - for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - int partition = (Integer) partitionObj; - topicPartitions.add(new TopicPartition(topic, partition)); - } - } - } + this.data = new DescribeLogDirsRequestData(struct, version); + this.version = version; } // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. - public DescribeLogDirsRequest(Set topicPartitions, short version) { + public DescribeLogDirsRequest(DescribeLogDirsRequestData data, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); - this.topicPartitions = topicPartitions; + this.data = data; + this.version = version; + } + + public DescribeLogDirsRequestData data() { + return data; } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.requestSchema(version())); - if (topicPartitions == null) { - struct.set(TOPICS_KEY_NAME, null); - return struct; - } - - Map> partitionsByTopic = new HashMap<>(); - for (TopicPartition tp : topicPartitions) { - if (!partitionsByTopic.containsKey(tp.topic())) { - partitionsByTopic.put(tp.topic(), new ArrayList()); - } - partitionsByTopic.get(tp.topic()).add(tp.partition()); - } - - List topicStructArray = new ArrayList<>(); - for (Map.Entry> partitionsByTopicEntry : partitionsByTopic.entrySet()) { - Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, partitionsByTopicEntry.getKey()); - topicStruct.set(PARTITIONS_KEY_NAME, partitionsByTopicEntry.getValue().toArray()); - topicStructArray.add(topicStruct); - } - struct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - - return struct; + return data.toStruct(version); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new DescribeLogDirsResponse(throttleTimeMs, new HashMap()); + return new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setThrottleTimeMs(throttleTimeMs)); } public boolean isAllTopicPartitions() { - return topicPartitions == null; - } - - public Set topicPartitions() { - return topicPartitions; + return data.topics() == null; } public static DescribeLogDirsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index ee6d95cfff843..688c46b7f153f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -18,172 +18,80 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsPartition; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsResult; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; - public class DescribeLogDirsResponse extends AbstractResponse { public static final long INVALID_OFFSET_LAG = -1L; - // request level key names - private static final String LOG_DIRS_KEY_NAME = "log_dirs"; - - // dir level key names - private static final String LOG_DIR_KEY_NAME = "log_dir"; - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - // partition level key names - private static final String SIZE_KEY_NAME = "size"; - private static final String OFFSET_LAG_KEY_NAME = "offset_lag"; - private static final String IS_FUTURE_KEY_NAME = "is_future"; - - private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - new Field(LOG_DIRS_KEY_NAME, new ArrayOf(new Schema( - ERROR_CODE, - new Field(LOG_DIR_KEY_NAME, STRING, "The absolute log directory path."), - new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - new Field(SIZE_KEY_NAME, INT64, "The size of the log segments of the partition in bytes."), - new Field(OFFSET_LAG_KEY_NAME, INT64, "The lag of the log's LEO w.r.t. partition's HW " + - "(if it is the current log for the partition) or current replica's LEO " + - "(if it is the future log for the partition)"), - new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this log is created by " + - "AlterReplicaLogDirsRequest and will replace the current log of the replica " + - "in the future."))))))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V1 = DESCRIBE_LOG_DIRS_RESPONSE_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_LOG_DIRS_RESPONSE_V0, DESCRIBE_LOG_DIRS_RESPONSE_V1}; - } - - private final int throttleTimeMs; - private final Map logDirInfos; - - public DescribeLogDirsResponse(Struct struct) { - throttleTimeMs = struct.get(THROTTLE_TIME_MS); - logDirInfos = new HashMap<>(); - - for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) { - Struct logDirStruct = (Struct) logDirStructObj; - Errors error = Errors.forCode(logDirStruct.get(ERROR_CODE)); - String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); - Map replicaInfos = new HashMap<>(); - - for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - - for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.get(PARTITION_ID); - long size = partitionStruct.getLong(SIZE_KEY_NAME); - long offsetLag = partitionStruct.getLong(OFFSET_LAG_KEY_NAME); - boolean isFuture = partitionStruct.getBoolean(IS_FUTURE_KEY_NAME); - ReplicaInfo replicaInfo = new ReplicaInfo(size, offsetLag, isFuture); - replicaInfos.put(new TopicPartition(topic, partition), replicaInfo); - } - } + private final DescribeLogDirsResponseData data; - logDirInfos.put(logDir, new LogDirInfo(error, replicaInfos)); - } + public DescribeLogDirsResponse(Struct struct, short version) { + this.data = new DescribeLogDirsResponseData(struct, version); } /** * Constructor for version 0. */ - public DescribeLogDirsResponse(int throttleTimeMs, Map logDirInfos) { - this.throttleTimeMs = throttleTimeMs; - this.logDirInfos = logDirInfos; + public DescribeLogDirsResponse(DescribeLogDirsResponseData data) { + this.data = data; + } + + public DescribeLogDirsResponseData data() { + return data; } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); - List logDirStructArray = new ArrayList<>(); - for (Map.Entry logDirInfosEntry : logDirInfos.entrySet()) { - LogDirInfo logDirInfo = logDirInfosEntry.getValue(); - Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME); - logDirStruct.set(ERROR_CODE, logDirInfo.error.code()); - logDirStruct.set(LOG_DIR_KEY_NAME, logDirInfosEntry.getKey()); - - Map> replicaInfosByTopic = CollectionUtils.groupPartitionDataByTopic(logDirInfo.replicaInfos); - List topicStructArray = new ArrayList<>(); - for (Map.Entry> replicaInfosByTopicEntry : replicaInfosByTopic.entrySet()) { - Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, replicaInfosByTopicEntry.getKey()); - List partitionStructArray = new ArrayList<>(); - - for (Map.Entry replicaInfosByPartitionEntry : replicaInfosByTopicEntry.getValue().entrySet()) { - Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); - ReplicaInfo replicaInfo = replicaInfosByPartitionEntry.getValue(); - partitionStruct.set(PARTITION_ID, replicaInfosByPartitionEntry.getKey()); - partitionStruct.set(SIZE_KEY_NAME, replicaInfo.size); - partitionStruct.set(OFFSET_LAG_KEY_NAME, replicaInfo.offsetLag); - partitionStruct.set(IS_FUTURE_KEY_NAME, replicaInfo.isFuture); - partitionStructArray.add(partitionStruct); - } - topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray()); - topicStructArray.add(topicStruct); - } - logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - logDirStructArray.add(logDirStruct); - } - struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray()); - return struct; + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } @Override public Map errorCounts() { Map errorCounts = new HashMap<>(); - for (LogDirInfo logDirInfo : logDirInfos.values()) - updateErrorCounts(errorCounts, logDirInfo.error); + data.results().forEach(result -> { + updateErrorCounts(errorCounts, Errors.forCode(result.errorCode())); + }); return errorCounts; } public Map logDirInfos() { - return logDirInfos; + HashMap result = new HashMap<>(data.results().size()); + for (DescribeLogDirsResult logDirResult : data.results()) { + Map replicaInfoMap = new HashMap<>(); + for (DescribeLogDirsTopic t : logDirResult.topics()) { + for (DescribeLogDirsPartition p : t.partitions()) { + replicaInfoMap.put( + new TopicPartition(t.name(), p.partitionIndex()), + new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); + } + } + result.put(logDirResult.logDir(), new LogDirInfo(Errors.forCode(logDirResult.errorCode()), replicaInfoMap)); + } + return result; } public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) { - return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer)); + return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer), version); } + // Note this class is part of the public API, reachable from Admin.describeLogDirs() /** * Possible error code: * @@ -211,6 +119,7 @@ public String toString() { } } + // Note this class is part of the public API, reachable from Admin.describeLogDirs() static public class ReplicaInfo { public final long size; diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json index 333fa682b1dcb..8605931befc84 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json @@ -23,7 +23,7 @@ "fields": [ { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+", "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [ - { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name" }, { "name": "PartitionIndex", "type": "[]int32", "versions": "0+", "about": "The partition indxes." } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1049c5c53a343..e235cf06fa56a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -50,7 +50,7 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} +import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} @@ -66,7 +66,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata -import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ @@ -2551,14 +2550,18 @@ class KafkaApis(val requestChannel: RequestChannel, if (describeLogDirsDirRequest.isAllTopicPartitions) replicaManager.logManager.allLogs.map(_.topicPartition).toSet else - describeLogDirsDirRequest.topicPartitions.asScala + describeLogDirsDirRequest.data.topics.asScala.flatMap( + logDirTopic => logDirTopic.partitionIndex.asScala.map(partitionIndex => + new TopicPartition(logDirTopic.topic, partitionIndex))).toSet replicaManager.describeLogDirs(partitions) } else { - Map.empty[String, LogDirInfo] + List.empty[DescribeLogDirsResponseData.DescribeLogDirsResult] } } - sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(throttleTimeMs, logDirInfos.asJava)) + sendResponseMaybeThrottle(request, throttleTimeMs => new DescribeLogDirsResponse(new DescribeLogDirsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setResults(logDirInfos.asJava))) } def handleCreateTokenRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 008f422234f58..8fa2a32bb6742 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -41,6 +41,7 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrResponseData import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult +import org.apache.kafka.common.message.{DescribeLogDirsResponseData, LeaderAndIsrResponseData} import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName @@ -50,7 +51,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.replica.{ClientMetadata, _} -import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo} +import org.apache.kafka.common.message.DescribeLogDirsResponseData._ import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction @@ -661,7 +662,7 @@ class ReplicaManager(val config: KafkaConfig, * 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. */ - def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = { + def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) config.logDirs.toSet.map { logDir: String => @@ -672,25 +673,36 @@ class ReplicaManager(val config: KafkaConfig, logsByDir.get(absolutePath) match { case Some(logs) => - val replicaInfos = logs.filter { log => - partitions.contains(log.topicPartition) - }.map { log => - log.topicPartition -> new ReplicaInfo(log.size, getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture), log.isFuture) - }.toMap - - (absolutePath, new LogDirInfo(Errors.NONE, replicaInfos.asJava)) + val topicInfos = logs.groupBy(_.topicPartition.topic).map{case (topic, logs) => + new DescribeLogDirsResponseData.DescribeLogDirsTopic().setName(topic).setPartitions( + logs.filter { log => + partitions.contains(log.topicPartition) + }.map { log => + new DescribeLogDirsResponseData.DescribeLogDirsPartition(). + setPartitionSize(log.size). + setPartitionIndex(log.topicPartition.partition). + setOffsetLag(getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture)) + .setIsFutureKey(log.isFuture) + }.toList.asJava) + }.toList.asJava + + new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + .setErrorCode(Errors.NONE.code).setTopics(topicInfos) case None => - (absolutePath, new LogDirInfo(Errors.NONE, Map.empty[TopicPartition, ReplicaInfo].asJava)) + new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + .setErrorCode(Errors.NONE.code) } } catch { case _: KafkaStorageException => - (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava)) + new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code) case t: Throwable => error(s"Error while describing replica in dir $absolutePath", t) - (absolutePath, new LogDirInfo(Errors.forException(t), Map.empty[TopicPartition, ReplicaInfo].asJava)) + new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + .setErrorCode(Errors.forException(t).code) } - }.toMap + }.toList } def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 9c9eda6318124..b932f462b4320 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} -import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData} +import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord} @@ -528,7 +528,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build() - private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build() + private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(new DescribeLogDirsRequestData.DescribableLogDirTopicCollection(Collections.singleton( + new DescribeLogDirsRequestData.DescribableLogDirTopic().setTopic(tp.topic).setPartitionIndex(Collections.singletonList(tp.partition))).iterator()))).build() private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build() diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala index 7062016e977d6..e8bad37a9c03d 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala @@ -21,6 +21,7 @@ import java.io.File import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.DescribeLogDirsRequestData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests._ import org.junit.Assert._ @@ -43,7 +44,7 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { createTopic(topic, partitionNum, 1) TestUtils.generateAndProduceMessages(servers, topic, 10) - val request = new DescribeLogDirsRequest.Builder(null).build() + val request = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)).build() val response = connectAndReceive[DescribeLogDirsResponse](request, destination = controllerSocketServer) val logDirInfos = response.logDirInfos() diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 913868f52b1a4..7cdf3e6aa2fc2 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -454,7 +454,11 @@ class RequestQuotaTest extends BaseRequestTest { new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)) case ApiKeys.DESCRIBE_LOG_DIRS => - new DescribeLogDirsRequest.Builder(Collections.singleton(tp)) + val data = new DescribeLogDirsRequestData() + data.topics().add(new DescribeLogDirsRequestData.DescribableLogDirTopic() + .setTopic(tp.topic()) + .setPartitionIndex(Collections.singletonList(tp.partition()))) + new DescribeLogDirsRequest.Builder(data) case ApiKeys.CREATE_PARTITIONS => val data = new CreatePartitionsRequestData() From 5214f4bde000e4f009f8f4f02d291ff1e967e91c Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Thu, 16 Jan 2020 12:10:03 +0000 Subject: [PATCH 2/7] Per KIP-482, use flexible versions for DescribeLogDirs --- .../resources/common/message/DescribeLogDirsRequest.json | 5 +++-- .../resources/common/message/DescribeLogDirsResponse.json | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json index 8605931befc84..577f2eb4cf6a8 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json @@ -18,8 +18,9 @@ "type": "request", "name": "DescribeLogDirsRequest", // Version 1 is the same as version 0. - "validVersions": "0-1", - "flexibleVersions": "none", + "validVersions": "0-2", + // Version 2 is the first flexible version. + "flexibleVersions": "2+", "fields": [ { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+", "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [ diff --git a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json index d75b348b298e2..4322f1c52ee2f 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json @@ -18,8 +18,9 @@ "type": "response", "name": "DescribeLogDirsResponse", // Starting in version 1, on quota violation, brokers send out responses before throttling. - "validVersions": "0-1", - "flexibleVersions": "none", + "validVersions": "0-2", + // Version 2 is the first flexible version. + "flexibleVersions": "2+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, From b81c6c62f63aac64c40cee1fc7af438c1763d2ce Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 25 Feb 2020 12:07:32 +0000 Subject: [PATCH 3/7] Review comments --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++--- .../kafka/common/requests/DescribeLogDirsRequest.java | 5 +---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2e4c7a2444124..4895417dbcd87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2311,10 +2311,10 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection v = new ArrayList<>(); - v.add(replica.partition()); + List partitionIndex = new ArrayList<>(); + partitionIndex.add(replica.partition()); describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic()) - .setPartitionIndex(v); + .setPartitionIndex(partitionIndex); requestData.topics().add(describableLogDirTopic); } else { describableLogDirTopic.partitionIndex().add(replica.partition()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index 370703eb923ab..f421f28aa06e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -44,7 +44,6 @@ public class DescribeLogDirsRequest extends AbstractRequest { new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic."))))); private final DescribeLogDirsRequestData data; - private final short version; public static class Builder extends AbstractRequest.Builder { private final DescribeLogDirsRequestData data; @@ -69,14 +68,12 @@ public String toString() { public DescribeLogDirsRequest(Struct struct, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); this.data = new DescribeLogDirsRequestData(struct, version); - this.version = version; } // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. public DescribeLogDirsRequest(DescribeLogDirsRequestData data, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); this.data = data; - this.version = version; } public DescribeLogDirsRequestData data() { @@ -85,7 +82,7 @@ public DescribeLogDirsRequestData data() { @Override protected Struct toStruct() { - return data.toStruct(version); + return data.toStruct(version()); } @Override From 39fb997f0032ee425e92a8dba97a561b36427931 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Mon, 16 Mar 2020 09:54:34 +0000 Subject: [PATCH 4/7] Review comments --- .../kafka/common/requests/DescribeLogDirsResponse.java | 3 --- core/src/main/scala/kafka/server/ReplicaManager.scala | 6 ++++-- .../src/test/scala/unit/kafka/server/RequestQuotaTest.scala | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index 688c46b7f153f..b744769b43d75 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -41,9 +41,6 @@ public DescribeLogDirsResponse(Struct struct, short version) { this.data = new DescribeLogDirsResponseData(struct, version); } - /** - * Constructor for version 0. - */ public DescribeLogDirsResponse(DescribeLogDirsResponseData data) { this.data = data; } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8fa2a32bb6742..fd16fb862081b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -695,11 +695,13 @@ class ReplicaManager(val config: KafkaConfig, } catch { case _: KafkaStorageException => - new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setLogDir(absolutePath) .setErrorCode(Errors.KAFKA_STORAGE_ERROR.code) case t: Throwable => error(s"Error while describing replica in dir $absolutePath", t) - new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) + new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setLogDir(absolutePath) .setErrorCode(Errors.forException(t).code) } }.toList diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 7cdf3e6aa2fc2..6cc794567ad84 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -455,9 +455,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_LOG_DIRS => val data = new DescribeLogDirsRequestData() - data.topics().add(new DescribeLogDirsRequestData.DescribableLogDirTopic() - .setTopic(tp.topic()) - .setPartitionIndex(Collections.singletonList(tp.partition()))) + data.topics.add(new DescribeLogDirsRequestData.DescribableLogDirTopic() + .setTopic(tp.topic) + .setPartitionIndex(Collections.singletonList(tp.partition))) new DescribeLogDirsRequest.Builder(data) case ApiKeys.CREATE_PARTITIONS => From a7f72cba6def206c04a6ca5b1ba547faf85058cf Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 17 Mar 2020 16:16:01 +0000 Subject: [PATCH 5/7] More minor improvements --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 7 ++----- core/src/main/scala/kafka/server/ReplicaManager.scala | 8 ++++---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 4895417dbcd87..3730dab640098 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2304,11 +2304,8 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection partitionsByBroker = new HashMap<>(); for (TopicPartitionReplica replica: replicas) { - DescribeLogDirsRequestData requestData = partitionsByBroker.get(replica.brokerId()); - if (requestData == null) { - requestData = new DescribeLogDirsRequestData(); - partitionsByBroker.put(replica.brokerId(), requestData); - } + DescribeLogDirsRequestData requestData = partitionsByBroker.computeIfAbsent(replica.brokerId(), + brokerId -> new DescribeLogDirsRequestData()); DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic()); if (describableLogDirTopic == null) { List partitionIndex = new ArrayList<>(); diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fd16fb862081b..36268b9bc3eed 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -678,10 +678,10 @@ class ReplicaManager(val config: KafkaConfig, logs.filter { log => partitions.contains(log.topicPartition) }.map { log => - new DescribeLogDirsResponseData.DescribeLogDirsPartition(). - setPartitionSize(log.size). - setPartitionIndex(log.topicPartition.partition). - setOffsetLag(getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture)) + new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionSize(log.size) + .setPartitionIndex(log.topicPartition.partition) + .setOffsetLag(getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture)) .setIsFutureKey(log.isFuture) }.toList.asJava) }.toList.asJava From f72902e6c772f5254004d408b747cbe57824668c Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Thu, 19 Mar 2020 14:39:14 +0000 Subject: [PATCH 6/7] Review comments --- .../apache/kafka/common/requests/DescribeLogDirsRequest.java | 1 - core/src/main/scala/kafka/server/ReplicaManager.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index f421f28aa06e5..b90a4476a820e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -48,7 +48,6 @@ public class DescribeLogDirsRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { private final DescribeLogDirsRequestData data; - // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. public Builder(DescribeLogDirsRequestData data) { super(ApiKeys.DESCRIBE_LOG_DIRS); this.data = data; diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 36268b9bc3eed..47d96839833ce 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -39,7 +39,6 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartiti import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState -import org.apache.kafka.common.message.LeaderAndIsrResponseData import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult import org.apache.kafka.common.message.{DescribeLogDirsResponseData, LeaderAndIsrResponseData} import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError @@ -51,7 +50,6 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.replica.{ClientMetadata, _} -import org.apache.kafka.common.message.DescribeLogDirsResponseData._ import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction From e7564337cf56338735a5e4e3c18c3c0b7751e1db Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Thu, 19 Mar 2020 15:21:13 +0000 Subject: [PATCH 7/7] Review comments --- .../requests/DescribeLogDirsRequest.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index b90a4476a820e..a58ef1fedb532 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -20,29 +20,12 @@ import org.apache.kafka.common.message.DescribeLogDirsRequestData; import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; - public class DescribeLogDirsRequest extends AbstractRequest { - // request level key names - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema( - new Field(TOPICS_KEY_NAME, ArrayOf.nullable(new Schema( - TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic."))))); - private final DescribeLogDirsRequestData data; public static class Builder extends AbstractRequest.Builder { @@ -69,7 +52,6 @@ public DescribeLogDirsRequest(Struct struct, short version) { this.data = new DescribeLogDirsRequestData(struct, version); } - // topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions. public DescribeLogDirsRequest(DescribeLogDirsRequestData data, short version) { super(ApiKeys.DESCRIBE_LOG_DIRS, version); this.data = data;