diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d4c406902111d..4d01cdeb2e27d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataRequest; @@ -35,6 +36,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -147,6 +149,9 @@ private NetworkClient(MetadataUpdater metadataUpdater, */ @Override public boolean ready(Node node, long now) { + if (node.isEmpty()) + throw new IllegalArgumentException("Cannot connect to empty node " + node); + if (isReady(node, now)) return true; @@ -578,9 +583,10 @@ private void handleResponse(RequestHeader header, Struct body, long now) { MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); // check if any topics metadata failed to get updated - if (response.errors().size() > 0) { - log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors()); - } + Map errors = response.errors(); + if (!errors.isEmpty()) + log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors); + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 7a1a72090a3a3..1cb1933cc3f9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -215,13 +215,14 @@ public Map> getTopicMetadata(List topics, lo throw new TopicAuthorizationException(unauthorizedTopics); boolean shouldRetry = false; - if (!response.errors().isEmpty()) { + Map errors = response.errors(); + if (!errors.isEmpty()) { // if there were errors, we need to check whether they were fatal or whether // we should just retry - log.debug("Topic metadata fetch included errors: {}", response.errors()); + log.debug("Topic metadata fetch included errors: {}", errors); - for (Map.Entry errorEntry : response.errors().entrySet()) { + for (Map.Entry errorEntry : errors.entrySet()) { String topic = errorEntry.getKey(); Errors error = errorEntry.getValue(); diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 644cd71c8cb4f..5a5b8a0f31a39 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -17,6 +17,8 @@ */ public class Node { + private static final Node NO_NODE = new Node(-1, "", -1); + private final int id; private final String idString; private final String host; @@ -31,7 +33,16 @@ public Node(int id, String host, int port) { } public static Node noNode() { - return new Node(-1, "", -1); + return NO_NODE; + } + + /** + * Check whether this node is empty, which may be the case if noNode() is used as a placeholder + * in a response payload with an error. + * @return true if it is, false otherwise + */ + public boolean isEmpty() { + return host == null || host.isEmpty() || port < 0; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index a6c249fd0bacc..92d8c6dc785a9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -12,9 +12,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; @@ -24,9 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class MetadataRequest extends AbstractRequest { @@ -44,7 +40,7 @@ public MetadataRequest(List topics) { public MetadataRequest(Struct struct) { super(struct); Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); - topics = new ArrayList(); + topics = new ArrayList<>(); for (Object topicObj: topicArray) { topics.add((String) topicObj); } @@ -52,16 +48,16 @@ public MetadataRequest(Struct struct) { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - Map topicErrors = new HashMap(); - for (String topic : topics) { - topicErrors.put(topic, Errors.forException(e)); - } + List topicMetadatas = new ArrayList<>(); + Errors error = Errors.forException(e); + List partitions = Collections.emptyList(); + + for (String topic : topics) + topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, partitions)); - Cluster cluster = new Cluster(Collections.emptyList(), Collections.emptyList(), - Collections.emptySet()); switch (versionId) { case 0: - return new MetadataResponse(cluster, topicErrors); + return new MetadataResponse(Collections.emptyList(), topicMetadatas); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 805b9e713b9da..13e0d8f7c5648 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -12,15 +12,6 @@ */ package org.apache.kafka.common.requests; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -30,11 +21,20 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class MetadataResponse extends AbstractRequestResponse { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id); private static final String BROKERS_KEY_NAME = "brokers"; - private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata"; + private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata"; // broker level field names private static final String NODE_ID_KEY_NAME = "node_id"; @@ -71,18 +71,18 @@ public class MetadataResponse extends AbstractRequestResponse { private static final String REPLICAS_KEY_NAME = "replicas"; private static final String ISR_KEY_NAME = "isr"; - private final Cluster cluster; - private final Map errors; + private final Collection brokers; + private final List topicMetadata; - /** - * Constructor for MetadataResponse where there are errors for some of the topics, - * error data take precedence over cluster information for particular topic - */ - public MetadataResponse(Cluster cluster, Map errors) { + + public MetadataResponse(List brokers, List topicMetadata) { super(new Struct(CURRENT_SCHEMA)); - List brokerArray = new ArrayList(); - for (Node node : cluster.nodes()) { + this.brokers = brokers; + this.topicMetadata = topicMetadata; + + List brokerArray = new ArrayList<>(); + for (Node node : brokers) { Struct broker = struct.instance(BROKERS_KEY_NAME); broker.set(NODE_ID_KEY_NAME, node.id()); broker.set(HOST_KEY_NAME, node.host()); @@ -91,51 +91,39 @@ public MetadataResponse(Cluster cluster, Map errors) { } struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); + List topicMetadataArray = new ArrayList<>(topicMetadata.size()); + for (TopicMetadata metadata : topicMetadata) { + Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, metadata.topic); + topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code()); + + List partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size()); + for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { + Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); + partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, partitionMetadata.error.code()); + partitionData.set(PARTITION_KEY_NAME, partitionMetadata.partition); + partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id()); + ArrayList replicas = new ArrayList<>(partitionMetadata.replicas.size()); + for (Node node : partitionMetadata.replicas) + replicas.add(node.id()); + partitionData.set(REPLICAS_KEY_NAME, replicas.toArray()); + ArrayList isr = new ArrayList<>(partitionMetadata.isr.size()); + for (Node node : partitionMetadata.isr) + isr.add(node.id()); + partitionData.set(ISR_KEY_NAME, isr.toArray()); + partitionMetadataArray.add(partitionData); - List topicArray = new ArrayList(); - for (Map.Entry errorEntry : errors.entrySet()) { - Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, errorEntry.getKey()); - topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errorEntry.getValue().code()); - topicData.set(PARTITION_METADATA_KEY_NAME, new Struct[0]); - topicArray.add(topicData); - } - - for (String topic : cluster.topics()) { - if (!errors.containsKey(topic)) { - Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topic); - topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code()); - List partitionArray = new ArrayList(); - for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) { - Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); - partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code()); - partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition()); - partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id()); - ArrayList replicas = new ArrayList(); - for (Node node : fetchPartitionData.replicas()) - replicas.add(node.id()); - partitionData.set(REPLICAS_KEY_NAME, replicas.toArray()); - ArrayList isr = new ArrayList(); - for (Node node : fetchPartitionData.inSyncReplicas()) - isr.add(node.id()); - partitionData.set(ISR_KEY_NAME, isr.toArray()); - partitionArray.add(partitionData); - } - topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray()); - topicArray.add(topicData); } + topicData.set(PARTITION_METADATA_KEY_NAME, partitionMetadataArray.toArray()); + topicMetadataArray.add(topicData); } - struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray()); - - this.cluster = cluster; - this.errors = errors; + struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray()); } public MetadataResponse(Struct struct) { super(struct); - Map errors = new HashMap(); - Map brokers = new HashMap(); + + Map brokers = new HashMap<>(); Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME); for (int i = 0; i < brokerStructs.length; i++) { Struct broker = (Struct) brokerStructs[i]; @@ -144,63 +132,155 @@ public MetadataResponse(Struct struct) { int port = broker.getInt(PORT_KEY_NAME); brokers.put(nodeId, new Node(nodeId, host, port)); } - List partitions = new ArrayList(); - Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME); + + List topicMetadata = new ArrayList<>(); + Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME); for (int i = 0; i < topicInfos.length; i++) { Struct topicInfo = (Struct) topicInfos[i]; - short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME); + Errors topicError = Errors.forCode(topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME)); String topic = topicInfo.getString(TOPIC_KEY_NAME); - if (topicError == Errors.NONE.code()) { - Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); - for (int j = 0; j < partitionInfos.length; j++) { - Struct partitionInfo = (Struct) partitionInfos[j]; - int partition = partitionInfo.getInt(PARTITION_KEY_NAME); - int leader = partitionInfo.getInt(LEADER_KEY_NAME); - Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); - Node[] replicaNodes = new Node[replicas.length]; - for (int k = 0; k < replicas.length; k++) - replicaNodes[k] = brokers.get(replicas[k]); - Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); - Node[] isrNodes = new Node[isr.length]; - for (int k = 0; k < isr.length; k++) - isrNodes[k] = brokers.get(isr[k]); - partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); - } - } else { - errors.put(topic, Errors.forCode(topicError)); + List partitionMetadata = new ArrayList<>(); + + Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); + for (int j = 0; j < partitionInfos.length; j++) { + Struct partitionInfo = (Struct) partitionInfos[j]; + Errors partitionError = Errors.forCode(partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME)); + int partition = partitionInfo.getInt(PARTITION_KEY_NAME); + int leader = partitionInfo.getInt(LEADER_KEY_NAME); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); + List replicaNodes = new ArrayList<>(replicas.length); + for (Object replicaNodeId : replicas) + replicaNodes.add(brokers.get(replicaNodeId)); + Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); + List isrNodes = new ArrayList<>(isr.length); + for (Object isrNode : isr) + isrNodes.add(brokers.get(isrNode)); + partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes)); } + + topicMetadata.add(new TopicMetadata(topicError, topic, partitionMetadata)); } - this.errors = errors; - this.cluster = new Cluster(brokers.values(), partitions, unauthorizedTopics(errors)); + this.brokers = brokers.values(); + this.topicMetadata = topicMetadata; } - private Set unauthorizedTopics(Map topicErrors) { - if (topicErrors.isEmpty()) - return Collections.emptySet(); + /** + * Get a map of the topics which had metadata errors + * @return the map + */ + public Map errors() { + Map errors = new HashMap<>(); + for (TopicMetadata metadata : topicMetadata) { + if (metadata.error != Errors.NONE) + errors.put(metadata.topic(), metadata.error); + } + return errors; + } + /** + * Get a snapshot of the cluster metadata from this response + * @return the cluster snapshot + */ + public Cluster cluster() { Set unauthorizedTopics = new HashSet<>(); - for (Map.Entry topicErrorEntry : topicErrors.entrySet()) { - if (topicErrorEntry.getValue() == Errors.TOPIC_AUTHORIZATION_FAILED) - unauthorizedTopics.add(topicErrorEntry.getKey()); + List partitions = new ArrayList<>(); + for (TopicMetadata metadata : topicMetadata) { + if (metadata.error == Errors.NONE) { + for (PartitionMetadata partitionMetadata : metadata.partitionMetadata) + partitions.add(new PartitionInfo( + metadata.topic, + partitionMetadata.partition, + partitionMetadata.leader, + partitionMetadata.replicas.toArray(new Node[0]), + partitionMetadata.isr.toArray(new Node[0]))); + } else if (metadata.error == Errors.TOPIC_AUTHORIZATION_FAILED) { + unauthorizedTopics.add(metadata.topic); + } } - return unauthorizedTopics; + + return new Cluster(this.brokers, partitions, unauthorizedTopics); } - public Map errors() { - return this.errors; + /** + * Get all brokers returned in metadata response + * @return the brokers + */ + public Collection brokers() { + return brokers; } - public boolean hasError(String topic) { - return this.errors.containsKey(topic); + public static MetadataResponse parse(ByteBuffer buffer) { + return new MetadataResponse(CURRENT_SCHEMA.read(buffer)); } - public Cluster cluster() { - return this.cluster; + public static class TopicMetadata { + private final Errors error; + private final String topic; + private final List partitionMetadata; + + public TopicMetadata(Errors error, + String topic, + List partitionMetadata) { + this.error = error; + this.topic = topic; + this.partitionMetadata = partitionMetadata; + } + + public Errors error() { + return error; + } + + public String topic() { + return topic; + } + + public List partitionMetadata() { + return partitionMetadata; + } + } - public static MetadataResponse parse(ByteBuffer buffer) { - return new MetadataResponse(CURRENT_SCHEMA.read(buffer)); + public static class PartitionMetadata { + private final Errors error; + private final int partition; + private final Node leader; + private final List replicas; + private final List isr; + + public PartitionMetadata(Errors error, + int partition, + Node leader, + List replicas, + List isr) { + this.error = error; + this.partition = partition; + this.leader = leader; + this.replicas = replicas; + this.isr = isr; + } + + public Errors error() { + return error; + } + + public int partition() { + return partition; + } + + public Node leader() { + return leader; + } + + public List replicas() { + return replicas; + } + + public List isr() { + return isr; + } + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 823d04ee6d553..58c38412588ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -53,6 +53,7 @@ import org.junit.Test; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -441,8 +442,7 @@ public void testUpdateFetchPositionDisconnect() { @Test public void testGetAllTopics() { // sending response before request, as getTopicMetadata is a blocking call - client.prepareResponse( - new MetadataResponse(cluster, Collections.emptyMap()).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct()); Map> allTopics = fetcher.getAllTopicMetadata(5000L); @@ -453,7 +453,7 @@ public void testGetAllTopics() { public void testGetAllTopicsDisconnect() { // first try gets a disconnect, next succeeds client.prepareResponse(null, true); - client.prepareResponse(new MetadataResponse(cluster, Collections.emptyMap()).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct()); Map> allTopics = fetcher.getAllTopicMetadata(5000L); assertEquals(cluster.topics().size(), allTopics.size()); } @@ -466,8 +466,7 @@ public void testGetAllTopicsTimeout() { @Test public void testGetAllTopicsUnauthorized() { - client.prepareResponse(new MetadataResponse(cluster, - Collections.singletonMap(topicName, Errors.TOPIC_AUTHORIZATION_FAILED)).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.TOPIC_AUTHORIZATION_FAILED).toStruct()); try { fetcher.getAllTopicMetadata(10L); fail(); @@ -478,17 +477,13 @@ public void testGetAllTopicsUnauthorized() { @Test(expected = InvalidTopicException.class) public void testGetTopicMetadataInvalidTopic() { - client.prepareResponse(new MetadataResponse(cluster, - Collections.singletonMap(topicName, Errors.INVALID_TOPIC_EXCEPTION)).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct()); fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); } @Test public void testGetTopicMetadataUnknownTopic() { - Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.emptyList(), - Collections.emptySet()); - client.prepareResponse(new MetadataResponse(emptyCluster, - Collections.singletonMap(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct()); Map> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); assertNull(topicMetadata.get(topicName)); @@ -496,12 +491,9 @@ public void testGetTopicMetadataUnknownTopic() { @Test public void testGetTopicMetadataLeaderNotAvailable() { - Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.emptyList(), - Collections.emptySet()); - client.prepareResponse(new MetadataResponse(emptyCluster, - Collections.singletonMap(topicName, Errors.LEADER_NOT_AVAILABLE)).toStruct()); - client.prepareResponse(new MetadataResponse(this.cluster, - Collections.emptyMap()).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct()); + client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct()); + Map> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); assertTrue(topicMetadata.containsKey(topicName)); } @@ -565,6 +557,23 @@ private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int thrott return response.toStruct(); } + private MetadataResponse newMetadataResponse(String topic, Errors error) { + List partitionsMetadata = new ArrayList<>(); + if (error == Errors.NONE) { + for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) { + partitionsMetadata.add(new MetadataResponse.PartitionMetadata( + Errors.NONE, + partitionInfo.partition(), + partitionInfo.leader(), + Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas()))); + } + } + + MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, partitionsMetadata); + return new MetadataResponse(cluster.nodes(), Arrays.asList(topicMetadata)); + } + private Fetcher createFetcher(int maxPollRecords, SubscriptionState subscriptions, Metrics metrics) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java index 7a5cef6886a28..fd8a5bc3b0d4c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java @@ -12,19 +12,19 @@ */ package org.apache.kafka.clients.producer.internals; -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Collections; -import java.util.List; - import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.junit.Test; +import java.util.Collections; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class DefaultPartitionerTest { private byte[] keyBytes = "key".getBytes(); private Partitioner partitioner = new DefaultPartitioner(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index edeaf639016a7..30238378f7b3f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -14,9 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.BrokerEndPoint; -import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.protocol.ApiKeys; @@ -278,16 +276,16 @@ private AbstractRequest createMetadataRequest() { private AbstractRequestResponse createMetadataResponse() { Node node = new Node(1, "host1", 1001); - Node[] replicas = new Node[1]; - replicas[0] = node; - Node[] isr = new Node[1]; - isr[0] = node; - Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)), - Collections.emptySet()); - - Map errors = new HashMap(); - errors.put("topic2", Errors.LEADER_NOT_AVAILABLE); - return new MetadataResponse(cluster, errors); + List replicas = Arrays.asList(node); + List isr = Arrays.asList(node); + + List allTopicMetadata = new ArrayList<>(); + allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic1", + Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr)))); + allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", + Collections.emptyList())); + + return new MetadataResponse(Arrays.asList(node), allTopicMetadata); } private AbstractRequest createOffsetCommitRequest() { diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index a8d9964c8dede..b8573153e5b8d 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -92,8 +92,9 @@ class AdminClient(val time: Time, val request = new MetadataRequest(List[String]()) val responseBody = sendAnyNode(ApiKeys.METADATA, request) val response = new MetadataResponse(responseBody) - if (!response.errors().isEmpty) - debug(s"Metadata request contained errors: ${response.errors()}") + val errors = response.errors() + if (!errors.isEmpty) + debug(s"Metadata request contained errors: ${errors}") response.cluster().nodes().asScala.toList } diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a868400efec64..3fb44d320afa3 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,23 +18,25 @@ package kafka.admin import kafka.common._ -import kafka.cluster.{BrokerEndPoint, Broker} +import kafka.cluster.Broker import kafka.log.LogConfig import kafka.server.ConfigType import kafka.utils._ import kafka.utils.ZkUtils._ -import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random import java.util.Properties +import org.apache.kafka.common.Node import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException} import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.requests.MetadataResponse import scala.Predef._ import scala.collection._ -import mutable.ListBuffer +import scala.collection.JavaConverters._ import scala.collection.mutable +import mutable.ListBuffer import collection.Map import collection.Set @@ -390,15 +392,18 @@ object AdminUtils extends Logging { def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] = zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap - def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata = + def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata = fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker]) - def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[TopicMetadata] = { + def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] = { val cachedBrokerInfo = new mutable.HashMap[Int, Broker]() topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo)) } - private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = { + private def fetchTopicMetadataFromZk(topic: String, + zkUtils: ZkUtils, + cachedBrokerInfo: mutable.HashMap[Int, Broker], + protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = { if(zkUtils.pathExists(getTopicPath(topic))) { val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) @@ -409,22 +414,22 @@ object AdminUtils extends Logging { val leader = zkUtils.getLeaderForPartition(topic, partition) debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - var leaderInfo: Option[BrokerEndPoint] = None - var replicaInfo: Seq[BrokerEndPoint] = Nil - var isrInfo: Seq[BrokerEndPoint] = Nil + var leaderInfo: Node = Node.noNode() + var replicaInfo: Seq[Node] = Nil + var isrInfo: Seq[Node] = Nil try { leaderInfo = leader match { case Some(l) => try { - Some(getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol)) + getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(protocol) } catch { case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) } case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } try { - replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol)) - isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol)) + replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(protocol)) + isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(protocol)) } catch { case e: Throwable => throw new ReplicaNotAvailableException(e) } @@ -434,18 +439,17 @@ object AdminUtils extends Logging { if(isrInfo.size < inSyncReplicas.size) throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code) + new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava) } catch { case e: Throwable => debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) - new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, - Errors.forException(e).code) + new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava) } } - new TopicMetadata(topic, partitionMetadata) + new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toList.asJava) } else { // topic doesn't exist, send appropriate error code - new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList()) } } diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index b0e41ec575c49..ae5ea582d063e 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -21,7 +21,6 @@ import kafka.cluster.BrokerEndPoint import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.Logging -import kafka.common._ import org.apache.kafka.common.protocol.Errors object TopicMetadata { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index be135869d86f9..0654e3de22640 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -30,22 +30,6 @@ import scala.collection.mutable.ListBuffer object TopicMetadataRequest extends Logging { val CurrentVersion = 0.shortValue val DefaultClientId = "" - - /** - * TopicMetadataRequest has the following format - - * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes) - */ - - def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) - val topics = new ListBuffer[String]() - for(i <- 0 until numTopics) - topics += readShortString(buffer) - new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList) - } } case class TopicMetadataRequest(versionId: Short, diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index b56cae97b2379..7340f148af1ec 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException} import kafka.utils.Json +import org.apache.kafka.common.Node import org.apache.kafka.common.protocol.SecurityProtocol /** @@ -103,7 +104,7 @@ object Broker { } } -case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) { +case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]) { override def toString: String = id + " : " + endPoints.values.mkString("(",",",")") @@ -133,13 +134,14 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) { endPoints.contains(protocolType) } + def getNode(protocolType: SecurityProtocol): Node = { + val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))) + new Node(id, endpoint.host, endpoint.port) + } + def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = { - val endpoint = endPoints.get(protocolType) - endpoint match { - case Some(endpoint) => new BrokerEndPoint(id, endpoint.host, endpoint.port) - case None => - throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)) - } + val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))) + new BrokerEndPoint(id, endpoint.host, endpoint.port) } } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 916c438496b00..1105802e0890d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -63,7 +63,6 @@ object RequestChannel extends Logging { // o.a.k.common.requests.AbstractRequest.getRequest() private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]= Map(ApiKeys.FETCH.id -> FetchRequest.readFrom, - ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom, ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom ) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8f3a2ad458dc2..5f9ec8ba4cee4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,6 +19,7 @@ package kafka.server import java.nio.ByteBuffer import java.lang.{Long => JLong, Short => JShort} +import java.util.Properties import kafka.admin.AdminUtils import kafka.api._ @@ -40,7 +41,7 @@ import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse, -OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse} +MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Node} @@ -258,7 +259,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicPartition, partitionData) => val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { - if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).size <= 0) + if (!metadataCache.hasTopicMetadata(topicPartition.topic)) (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) @@ -618,84 +619,105 @@ class KafkaApis(val requestChannel: RequestChannel, ret.toSeq.sortBy(- _) } - private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[TopicMetadata] = { + private def createTopic(topic: String, + numPartitions: Int, + replicationFactor: Int, + properties: Properties = new Properties()): MetadataResponse.TopicMetadata = { + try { + AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful" + .format(topic, numPartitions, replicationFactor)) + new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList()) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic + new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList()) + case itex: InvalidTopicException => + new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, java.util.Collections.emptyList()) + } + } + + private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = { + val aliveBrokers = metadataCache.getAliveBrokers + val offsetsTopicReplicationFactor = + if (aliveBrokers.nonEmpty) + Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) + else + config.offsetsTopicReplicationFactor.toInt + createTopic(GroupCoordinator.GroupMetadataTopicName, config.offsetsTopicPartitions, + offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs) + } + + private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = { + val topicMetadata = metadataCache.getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), securityProtocol) + topicMetadata.headOption.getOrElse(createGroupMetadataTopic()) + } + + private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol) - if (topics.size > 0 && topicResponses.size != topics.size) { + if (topics.isEmpty || topicResponses.size == topics.size) { + topicResponses + } else { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == GroupCoordinator.GroupMetadataTopicName || config.autoCreateTopicsEnable) { - try { - if (topic == GroupCoordinator.GroupMetadataTopicName) { - val aliveBrokers = metadataCache.getAliveBrokers - val offsetsTopicReplicationFactor = - if (aliveBrokers.length > 0) - Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) - else - config.offsetsTopicReplicationFactor.toInt - AdminUtils.createTopic(zkUtils, topic, config.offsetsTopicPartitions, - offsetsTopicReplicationFactor, - coordinator.offsetsTopicConfigs) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor)) - } - else { - AdminUtils.createTopic(zkUtils, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" - .format(topic, config.numPartitions, config.defaultReplicationFactor)) - } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code) - case itex: InvalidTopicException => - new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.INVALID_TOPIC_EXCEPTION.code) - } + if (topic == GroupCoordinator.GroupMetadataTopicName) { + createGroupMetadataTopic() + } else if (config.autoCreateTopicsEnable) { + createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList()) } } - topicResponses.appendAll(responsesForNonExistentTopics) + topicResponses ++ responsesForNonExistentTopics } - topicResponses } /** * Handle a topic metadata request */ def handleTopicMetadataRequest(request: RequestChannel.Request) { - val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - - //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized - val topics = if (metadataRequest.topics.isEmpty) { - val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol) - topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))).toSet + val metadataRequest = request.body.asInstanceOf[MetadataRequest] + + val topics = metadataRequest.topics.asScala.toSet + var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) { + //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized + val authorized = metadataCache.getAllTopics() + .filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))) + (authorized, mutable.Set[String]()) } else { - metadataRequest.topics.toSet + topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic))) } - //when topics is empty this will be a duplicate authorization check but given this should just be a cache lookup, it should not matter. - var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic))) - - if (!authorizedTopics.isEmpty) { - val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol) - if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) { - val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet - authorizer.foreach { - az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) { - authorizedTopics --= nonExistentTopics - unauthorizedTopics ++= nonExistentTopics + if (authorizedTopics.nonEmpty) { + val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) + if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { + authorizer.foreach { az => + if (!az.authorize(request.session, Create, Resource.ClusterResource)) { + authorizedTopics --= nonExistingTopics + unauthorizedTopics ++= nonExistingTopics } } } } - val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.TOPIC_AUTHORIZATION_FAILED.code)) + val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, java.util.Collections.emptyList())) + + val topicMetadata = if (authorizedTopics.isEmpty) + Seq.empty[MetadataResponse.TopicMetadata] + else + getTopicMetadata(authorizedTopics, request.securityProtocol) - val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol) val brokers = metadataCache.getAliveBrokers - trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) - val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata ++ unauthorizedTopicMetaData, metadataRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) + + trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), + brokers.mkString(","), request.header.correlationId, request.header.clientId)) + + val responseHeader = new ResponseHeader(request.header.correlationId) + val responseBody = new MetadataResponse( + brokers.map(_.getNode(request.securityProtocol)).asJava, + (topicMetadata ++ unauthorizedTopicMetadata).asJava + ) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } /* @@ -725,7 +747,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseInfo = authorizedTopicPartitions.map { topicPartition => val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic) try { - if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).isEmpty) + if (!metadataCache.hasTopicMetadata(topicPartition.topic)) (topicPartition, unknownTopicPartitionResponse) else { val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 @@ -769,16 +791,21 @@ class KafkaApis(val requestChannel: RequestChannel, val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) // get metadata (and create the topic if necessary) - val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head - val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap { - partitionMetadata => partitionMetadata.leader - } + val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol) - val responseBody = coordinatorEndpoint match { - case None => - new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode()) - case Some(endpoint) => - new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port)) + val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) { + new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode) + } else { + val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala + .find(_.partition == partition) + .map(_.leader()) + + coordinatorEndpoint match { + case Some(endpoint) if !endpoint.isEmpty => + new GroupCoordinatorResponse(Errors.NONE.code, endpoint) + case _ => + new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode) + } } trace("Sending consumer metadata %s for correlation id %d to client %s." @@ -788,8 +815,6 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleDescribeGroupRequest(request: RequestChannel.Request) { - import JavaConverters._ - val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest] val responseHeader = new ResponseHeader(request.header.correlationId) @@ -814,8 +839,6 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleListGroupsRequest(request: RequestChannel.Request) { - import JavaConverters._ - val responseHeader = new ResponseHeader(request.header.correlationId) val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) { ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 4be795dbbbc7a..1fdd717044c2f 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,124 +17,151 @@ package kafka.server -import kafka.cluster.{EndPoint, BrokerEndPoint, Broker} -import kafka.common.TopicAndPartition +import java.util.EnumMap +import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.api._ -import kafka.controller.KafkaController.StateChangeLogger -import kafka.controller.LeaderIsrAndControllerEpoch -import org.apache.kafka.common.errors.{ReplicaNotAvailableException, LeaderNotAvailableException} -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.requests.UpdateMetadataRequest -import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState import scala.collection.{Seq, Set, mutable} import scala.collection.JavaConverters._ -import kafka.utils.Logging +import kafka.cluster.{Broker, EndPoint} +import kafka.api._ +import kafka.common.TopicAndPartition +import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch} import kafka.utils.CoreUtils._ - -import java.util.concurrent.locks.ReentrantReadWriteLock - +import kafka.utils.Logging +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ private[server] class MetadataCache(brokerId: Int) extends Logging { - private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = - new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() - private var aliveBrokers: Map[Int, Broker] = Map() + private val stateChangeLogger = KafkaController.stateChangeLogger + private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]() + private val aliveBrokers = mutable.Map[Int, Broker]() + private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]() private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId) - def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = { + private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = { + val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size)) + brokers.foreach { brokerId => + getAliveEndpoint(brokerId, protocol).foreach(result +=) + } + result + } - val isAllTopics = topics.isEmpty - val topicsRequested = if(isAllTopics) cache.keySet else topics - val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): Option[Node] = + aliveNodes.get(brokerId).flatMap(_.get(protocol)) + + private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]] = { + cache.get(topic).map { partitions => + partitions.map { case (partitionId, partitionState) => + val topicPartition = TopicAndPartition(topic, partitionId) + + val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol) + + val replicas = partitionState.allReplicas + val replicaInfo = getAliveEndpoints(replicas, protocol) + + maybeLeader match { + case None => + debug("Error while fetching metadata for %s: leader not available".format(topicPartition)) + new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(), + replicaInfo.asJava, java.util.Collections.emptyList()) + + case Some(leader) => + val isr = leaderAndIsr.isr + val isrInfo = getAliveEndpoints(isr, protocol) + + if (replicaInfo.size < replicas.size) { + debug("Error while fetching metadata for %s: replica information not available for following brokers %s" + .format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(","))) + + new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, + replicaInfo.asJava, isrInfo.asJava) + } else if (isrInfo.size < isr.size) { + debug("Error while fetching metadata for %s: in sync replica information not available for following brokers %s" + .format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(","))) + new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, + replicaInfo.asJava, isrInfo.asJava) + } else { + new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava, + isrInfo.asJava) + } + } + } + } + } + + def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = { inReadLock(partitionMetadataLock) { - for (topic <- topicsRequested) { - if (isAllTopics || cache.contains(topic)) { - val partitionStateInfos = cache(topic) - val partitionMetadata = partitionStateInfos.map { - case (partitionId, partitionState) => - val replicas = partitionState.allReplicas - val replicaInfo: Seq[BrokerEndPoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol)) - var leaderInfo: Option[BrokerEndPoint] = None - var leaderBrokerInfo: Option[Broker] = None - var isrInfo: Seq[BrokerEndPoint] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - val topicPartition = TopicAndPartition(topic, partitionId) - try { - leaderBrokerInfo = aliveBrokers.get(leader) - if (!leaderBrokerInfo.isDefined) - throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition)) - else - leaderInfo = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol)) - if (replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if (isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code) - } catch { - case e: Throwable => - debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString)) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, - Errors.forException(e).code) - } - } - topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + val topicsRequested = if (topics.isEmpty) cache.keySet else topics + topicsRequested.toSeq.flatMap { topic => + getPartitionMetadata(topic, protocol).map { partitionMetadata => + new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toBuffer.asJava) } } } - topicResponses } - def getAliveBrokers = { + def hasTopicMetadata(topic: String): Boolean = { + inReadLock(partitionMetadataLock) { + cache.contains(topic) + } + } + + def getAllTopics(): Set[String] = { + inReadLock(partitionMetadataLock) { + cache.keySet.toSet + } + } + + def getNonExistingTopics(topics: Set[String]): Set[String] = { + inReadLock(partitionMetadataLock) { + topics -- cache.keySet + } + } + + def getAliveBrokers: Seq[Broker] = { inReadLock(partitionMetadataLock) { aliveBrokers.values.toSeq } } - def addOrUpdatePartitionInfo(topic: String, - partitionId: Int, - stateInfo: PartitionStateInfo) { + private def addOrUpdatePartitionInfo(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { inWriteLock(partitionMetadataLock) { - cache.get(topic) match { - case Some(infos) => infos.put(partitionId, stateInfo) - case None => { - val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] - cache.put(topic, newInfos) - newInfos.put(partitionId, stateInfo) - } - } + val infos = cache.getOrElseUpdate(topic, mutable.Map()) + infos(partitionId) = stateInfo } } def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = { inReadLock(partitionMetadataLock) { - cache.get(topic) match { - case Some(partitionInfos) => partitionInfos.get(partitionId) - case None => None - } + cache.get(topic).flatMap(_.get(partitionId)) } } - def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, - brokerId: Int, - stateChangeLogger: StateChangeLogger) { + def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) { inWriteLock(partitionMetadataLock) { - aliveBrokers = updateMetadataRequest.liveBrokers.asScala.map { broker => - val endPoints = broker.endPoints.asScala.map { case (protocol, ep) => - (protocol, EndPoint(ep.host, ep.port, protocol)) - }.toMap - (broker.id, Broker(broker.id, endPoints)) - }.toMap + aliveNodes.clear() + aliveBrokers.clear() + updateMetadataRequest.liveBrokers.asScala.foreach { broker => + val nodes = new EnumMap[SecurityProtocol, Node](classOf[SecurityProtocol]) + val endPoints = new EnumMap[SecurityProtocol, EndPoint](classOf[SecurityProtocol]) + broker.endPoints.asScala.foreach { case (protocol, ep) => + endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol)) + nodes.put(protocol, new Node(broker.id, ep.host, ep.port)) + } + aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala) + aliveNodes(broker.id) = nodes.asScala + } updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => if (info.leader == LeaderAndIsr.LeaderDuringDelete) { @@ -167,16 +194,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } - private def removePartitionInfo(topic: String, partitionId: Int) = { - cache.get(topic) match { - case Some(infos) => { - infos.remove(partitionId) - if(infos.isEmpty) { - cache.remove(topic) - } - true - } - case None => false - } + private def removePartitionInfo(topic: String, partitionId: Int): Boolean = { + cache.get(topic).map { infos => + infos.remove(partitionId) + if (infos.isEmpty) cache.remove(topic) + true + }.getOrElse(false) } + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e388d98046c15..56553134f247f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -582,7 +582,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } else { - metadataCache.updateCache(correlationId, updateMetadataRequest, localBrokerId, stateChangeLogger) + metadataCache.updateCache(correlationId, updateMetadataRequest) controllerEpoch = updateMetadataRequest.controllerEpoch } } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ccc86dfec5e1e..f39ed014cec9b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -264,7 +264,7 @@ class ZkUtils(val zkClient: ZkClient, * @param advertisedEndpoints * @param jmxPort */ - def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) { + def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int) { val brokerIdPath = BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fafc4b0aafc26..ca9dac424db21 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -145,14 +145,6 @@ object SerializationTestUtils { ) } - def createTestTopicMetadataRequest: TopicMetadataRequest = { - new TopicMetadataRequest(1, 1, "client 1", Seq(topic1, topic2)) - } - - def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).toSeq, Seq(topicmetaData1, topicmetaData2), 1) - } - def createTestOffsetCommitRequestV2: OffsetCommitRequest = { new OffsetCommitRequest( groupId = "group 1", @@ -217,8 +209,6 @@ class RequestResponseSerializationTest extends JUnitSuite { private val fetchRequest = SerializationTestUtils.createTestFetchRequest private val offsetRequest = SerializationTestUtils.createTestOffsetRequest private val offsetResponse = SerializationTestUtils.createTestOffsetResponse - private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest - private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0 private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1 private val offsetCommitRequestV2 = SerializationTestUtils.createTestOffsetCommitRequestV2 @@ -234,8 +224,7 @@ class RequestResponseSerializationTest extends JUnitSuite { val requestsAndResponses = collection.immutable.Seq(producerRequest, producerResponse, - fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest, - topicMetadataResponse, + fetchRequest, offsetRequest, offsetResponse, offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala index 2400cfbb672cb..7c9f3aee22bf8 100644 --- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala @@ -18,10 +18,9 @@ package kafka.integration import java.io.File -import java.nio.ByteBuffer import kafka.admin.AdminUtils -import kafka.api.{TopicMetadataRequest, TopicMetadataResponse} +import kafka.api.TopicMetadataResponse import kafka.client.ClientUtils import kafka.cluster.{Broker, BrokerEndPoint} import kafka.server.{KafkaConfig, KafkaServer, NotRunning} @@ -63,23 +62,6 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { super.tearDown() } - @Test - def testTopicMetadataRequest { - // create topic - val topic = "test" - AdminUtils.createTopic(zkUtils, topic, 1, 1) - - // create a topic metadata request - val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0) - - val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2) - topicMetadataRequest.writeTo(serializedMetadataRequest) - serializedMetadataRequest.rewind() - val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest) - - assertEquals(topicMetadataRequest, deserializedMetadataRequest) - } - @Test def testBasicTopicMetadata { // create topic diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 12b3583d01e5a..de19f6f1b87e7 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -353,7 +353,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ // create topic AdminUtils.createTopic(zkUtils, "new-topic", 2, 1) TestUtils.waitUntilTrue(() => - AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).errorCode != Errors.UNKNOWN_TOPIC_OR_PARTITION.code, + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).error != Errors.UNKNOWN_TOPIC_OR_PARTITION, "Topic new-topic not created after timeout", waitTime = zookeeper.tickTime) TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "new-topic", 0) diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala new file mode 100644 index 0000000000000..f3f0c8709f143 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import util.Arrays.asList + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.requests.UpdateMetadataRequest +import org.apache.kafka.common.requests.UpdateMetadataRequest.{PartitionState, Broker, EndPoint} +import org.junit.Test +import org.junit.Assert._ + +import scala.collection.JavaConverters._ + +class MetadataCacheTest { + + private def asSet[T](elems: T*): util.Set[T] = new util.HashSet(elems.asJava) + + @Test + def getTopicMetadataNonExistingTopics() { + val topic = "topic" + val cache = new MetadataCache(1) + val topicMetadata = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + assertTrue(topicMetadata.isEmpty) + } + + @Test + def getTopicMetadata() { + val topic = "topic" + + val cache = new MetadataCache(1) + + val zkVersion = 3 + val controllerId = 2 + val controllerEpoch = 1 + val brokers = Set( + new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava), + new Broker(1, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava), + new Broker(2, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + + val partitionStates = Map( + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0), zkVersion, asSet(0)), + new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1), zkVersion, asSet(1)), + new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 2, asList(2), zkVersion, asSet(2))) + + val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) + cache.updateCache(15, updateMetadataRequest) + + val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + assertEquals(1, topicMetadatas.size) + + val topicMetadata = topicMetadatas.head + assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(topic, topicMetadata.topic) + + val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) + assertEquals(3, partitionMetadatas.size) + + for (i <- 0 to 2) { + val partitionMetadata = partitionMetadatas(i) + assertEquals(Errors.NONE, partitionMetadata.error) + assertEquals(i, partitionMetadata.partition) + assertEquals(i, partitionMetadata.leader.id) + assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id)) + assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id)) + } + } + + @Test + def getTopicMetadataPartitionLeaderNotAvailable() { + val topic = "topic" + + val cache = new MetadataCache(1) + + val zkVersion = 3 + val controllerId = 2 + val controllerEpoch = 1 + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + + val leader = 1 + val leaderEpoch = 1 + val partitionStates = Map( + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asSet(0))) + + val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) + cache.updateCache(15, updateMetadataRequest) + + val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + assertEquals(1, topicMetadatas.size) + + val topicMetadata = topicMetadatas.head + assertEquals(Errors.NONE, topicMetadata.error) + + val partitionMetadatas = topicMetadata.partitionMetadata + assertEquals(1, partitionMetadatas.size) + + val partitionMetadata = partitionMetadatas.get(0) + assertEquals(0, partitionMetadata.partition) + assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error) + assertTrue(partitionMetadata.isr.isEmpty) + assertEquals(1, partitionMetadata.replicas.size) + assertEquals(0, partitionMetadata.replicas.get(0).id) + } + + @Test + def getTopicMetadataReplicaNotAvailable() { + val topic = "topic" + + val cache = new MetadataCache(1) + + val zkVersion = 3 + val controllerId = 2 + val controllerEpoch = 1 + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + + // replica 1 is not available + val leader = 0 + val leaderEpoch = 0 + val replicas = asSet[Integer](0, 1) + val isr = asList[Integer](0) + + val partitionStates = Map( + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas)) + + val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) + cache.updateCache(15, updateMetadataRequest) + + val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + assertEquals(1, topicMetadatas.size) + + val topicMetadata = topicMetadatas.head + assertEquals(Errors.NONE, topicMetadata.error) + + val partitionMetadatas = topicMetadata.partitionMetadata + assertEquals(1, partitionMetadatas.size) + + val partitionMetadata = partitionMetadatas.get(0) + assertEquals(0, partitionMetadata.partition) + assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error) + assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet) + assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet) + } + + @Test + def getTopicMetadataIsrNotAvailable() { + val topic = "topic" + + val cache = new MetadataCache(1) + + val zkVersion = 3 + val controllerId = 2 + val controllerEpoch = 1 + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + + // replica 1 is not available + val leader = 0 + val leaderEpoch = 0 + val replicas = asSet[Integer](0) + val isr = asList[Integer](0, 1) + + val partitionStates = Map( + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas)) + + val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) + cache.updateCache(15, updateMetadataRequest) + + val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + assertEquals(1, topicMetadatas.size) + + val topicMetadata = topicMetadatas.head + assertEquals(Errors.NONE, topicMetadata.error) + + val partitionMetadatas = topicMetadata.partitionMetadata + assertEquals(1, partitionMetadatas.size) + + val partitionMetadata = partitionMetadatas.get(0) + assertEquals(0, partitionMetadata.partition) + assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error) + assertEquals(Set(0), partitionMetadata.replicas.asScala.map(_.id).toSet) + assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet) + } + +}