Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
Expand All @@ -35,6 +36,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

Expand Down Expand Up @@ -147,6 +149,9 @@ private NetworkClient(MetadataUpdater metadataUpdater,
*/
@Override
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);

if (isReady(node, now))
return true;

Expand Down Expand Up @@ -578,9 +583,10 @@ private void handleResponse(RequestHeader header, Struct body, long now) {
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
if (response.errors().size() > 0) {
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
}
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);

// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,14 @@ public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, lo
throw new TopicAuthorizationException(unauthorizedTopics);

boolean shouldRetry = false;
if (!response.errors().isEmpty()) {
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty()) {
// if there were errors, we need to check whether they were fatal or whether
// we should just retry

log.debug("Topic metadata fetch included errors: {}", response.errors());
log.debug("Topic metadata fetch included errors: {}", errors);

for (Map.Entry<String, Errors> errorEntry : response.errors().entrySet()) {
for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
String topic = errorEntry.getKey();
Errors error = errorEntry.getValue();

Expand Down
13 changes: 12 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
public class Node {

private static final Node NO_NODE = new Node(-1, "", -1);

private final int id;
private final String idString;
private final String host;
Expand All @@ -31,7 +33,16 @@ public Node(int id, String host, int port) {
}

public static Node noNode() {
return new Node(-1, "", -1);
return NO_NODE;
}

/**
* Check whether this node is empty, which may be the case if noNode() is used as a placeholder
* in a response payload with an error.
* @return true if it is, false otherwise
*/
public boolean isEmpty() {
return host == null || host.isEmpty() || port < 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
Expand All @@ -24,9 +22,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MetadataRequest extends AbstractRequest {

Expand All @@ -44,24 +40,24 @@ public MetadataRequest(List<String> topics) {
public MetadataRequest(Struct struct) {
super(struct);
Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
topics = new ArrayList<String>();
topics = new ArrayList<>();
for (Object topicObj: topicArray) {
topics.add((String) topicObj);
}
}

@Override
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<String, Errors> topicErrors = new HashMap<String, Errors>();
for (String topic : topics) {
topicErrors.put(topic, Errors.forException(e));
}
List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
Errors error = Errors.forException(e);
List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();

for (String topic : topics)
topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, partitions));

Cluster cluster = new Cluster(Collections.<Node>emptyList(), Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet());
switch (versionId) {
case 0:
return new MetadataResponse(cluster, topicErrors);
return new MetadataResponse(Collections.<Node>emptyList(), topicMetadatas);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
Expand Down
Loading