Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
Expand Down Expand Up @@ -2267,7 +2269,7 @@ public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, Descri
@Override
public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) {
// Query selected partitions in all log directories
return new DescribeLogDirsRequest.Builder(null);
return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null));
}

@Override
Expand Down Expand Up @@ -2299,20 +2301,33 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicParti
futures.put(replica, new KafkaFutureImpl<>());
}

Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
Map<Integer, DescribeLogDirsRequestData> partitionsByBroker = new HashMap<>();

for (TopicPartitionReplica replica : replicas) {
partitionsByBroker.computeIfAbsent(replica.brokerId(), key -> new HashSet<>())
.add(new TopicPartition(replica.topic(), replica.partition()));
for (TopicPartitionReplica replica: replicas) {
DescribeLogDirsRequestData requestData = partitionsByBroker.computeIfAbsent(replica.brokerId(),
brokerId -> new DescribeLogDirsRequestData());
DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic());
if (describableLogDirTopic == null) {
List<Integer> partitionIndex = new ArrayList<>();
partitionIndex.add(replica.partition());
describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic())
.setPartitionIndex(partitionIndex);
requestData.topics().add(describableLogDirTopic);
} else {
describableLogDirTopic.partitionIndex().add(replica.partition());
}
}

final long now = time.milliseconds();
for (Map.Entry<Integer, Set<TopicPartition>> entry: partitionsByBroker.entrySet()) {
for (Map.Entry<Integer, DescribeLogDirsRequestData> entry: partitionsByBroker.entrySet()) {
final int brokerId = entry.getKey();
final Set<TopicPartition> topicPartitions = entry.getValue();
final DescribeLogDirsRequestData topicPartitions = entry.getValue();
final Map<TopicPartition, ReplicaLogDirInfo> replicaDirInfoByPartition = new HashMap<>();
for (TopicPartition topicPartition: topicPartitions)
replicaDirInfoByPartition.put(topicPartition, new ReplicaLogDirInfo());
for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) {
for (Integer partitionId : topicPartition.partitionIndex()) {
replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo());
}
}

runnable.call(new Call("describeReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()),
new ConstantNodeIdProvider(brokerId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.protocol;

import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
Expand Down Expand Up @@ -47,6 +49,8 @@
import org.apache.kafka.common.message.DescribeDelegationTokenResponseData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
Expand All @@ -69,8 +73,6 @@
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
Expand Down Expand Up @@ -110,8 +112,6 @@
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
Expand Down Expand Up @@ -190,8 +190,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) {
AlterConfigsResponse.schemaVersions()),
ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(),
AlterReplicaLogDirsResponse.schemaVersions()),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
DescribeLogDirsResponse.schemaVersions()),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS,
DescribeLogDirsResponseData.SCHEMAS),
SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS,
SaslAuthenticateResponseData.SCHEMAS),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, shor
case ALTER_REPLICA_LOG_DIRS:
return new AlterReplicaLogDirsResponse(struct);
case DESCRIBE_LOG_DIRS:
return new DescribeLogDirsResponse(struct);
return new DescribeLogDirsResponse(struct, version);
case SASL_AUTHENTICATE:
return new SaslAuthenticateResponse(struct, version);
case CREATE_PARTITIONS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,137 +17,62 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT32;

public class DescribeLogDirsRequest extends AbstractRequest {

// request level key names
private static final String TOPICS_KEY_NAME = "topics";

// topic level key names
private static final String PARTITIONS_KEY_NAME = "partitions";

private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema(
new Field(TOPICS_KEY_NAME, ArrayOf.nullable(new Schema(
TOPIC_NAME,
new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic.")))));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V1 = DESCRIBE_LOG_DIRS_REQUEST_V0;

public static Schema[] schemaVersions() {
return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0, DESCRIBE_LOG_DIRS_REQUEST_V1};
}

private final Set<TopicPartition> topicPartitions;
private final DescribeLogDirsRequestData data;

public static class Builder extends AbstractRequest.Builder<DescribeLogDirsRequest> {
private final Set<TopicPartition> topicPartitions;
private final DescribeLogDirsRequestData data;

// topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions.
public Builder(Set<TopicPartition> partitions) {
public Builder(DescribeLogDirsRequestData data) {
super(ApiKeys.DESCRIBE_LOG_DIRS);
this.topicPartitions = partitions;
this.data = data;
}

@Override
public DescribeLogDirsRequest build(short version) {
return new DescribeLogDirsRequest(topicPartitions, version);
return new DescribeLogDirsRequest(data, version);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("(type=DescribeLogDirsRequest")
.append(", topicPartitions=")
.append(topicPartitions)
.append(")");
return builder.toString();
return data.toString();
}
}

public DescribeLogDirsRequest(Struct struct, short version) {
super(ApiKeys.DESCRIBE_LOG_DIRS, version);

if (struct.getArray(TOPICS_KEY_NAME) == null) {
topicPartitions = null;
} else {
topicPartitions = new HashSet<>();
for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
Struct topicStruct = (Struct) topicStructObj;
String topic = topicStruct.get(TOPIC_NAME);
for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
int partition = (Integer) partitionObj;
topicPartitions.add(new TopicPartition(topic, partition));
}
}
}
this.data = new DescribeLogDirsRequestData(struct, version);
}

// topicPartitions == null indicates requesting all partitions, and an empty list indicates requesting no partitions.
public DescribeLogDirsRequest(Set<TopicPartition> topicPartitions, short version) {
public DescribeLogDirsRequest(DescribeLogDirsRequestData data, short version) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above is out of date

super(ApiKeys.DESCRIBE_LOG_DIRS, version);
this.topicPartitions = topicPartitions;
this.data = data;
}

public DescribeLogDirsRequestData data() {
return data;
}

@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.requestSchema(version()));
if (topicPartitions == null) {
struct.set(TOPICS_KEY_NAME, null);
return struct;
}

Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
for (TopicPartition tp : topicPartitions) {
if (!partitionsByTopic.containsKey(tp.topic())) {
partitionsByTopic.put(tp.topic(), new ArrayList<Integer>());
}
partitionsByTopic.get(tp.topic()).add(tp.partition());
}

List<Struct> topicStructArray = new ArrayList<>();
for (Map.Entry<String, List<Integer>> partitionsByTopicEntry : partitionsByTopic.entrySet()) {
Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
topicStruct.set(TOPIC_NAME, partitionsByTopicEntry.getKey());
topicStruct.set(PARTITIONS_KEY_NAME, partitionsByTopicEntry.getValue().toArray());
topicStructArray.add(topicStruct);
}
struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());

return struct;
return data.toStruct(version());
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new DescribeLogDirsResponse(throttleTimeMs, new HashMap<String, LogDirInfo>());
return new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setThrottleTimeMs(throttleTimeMs));
}

public boolean isAllTopicPartitions() {
return topicPartitions == null;
}

public Set<TopicPartition> topicPartitions() {
return topicPartitions;
return data.topics() == null;
}

public static DescribeLogDirsRequest parse(ByteBuffer buffer, short version) {
Expand Down
Loading