From c0423531f81aa2a438c7da3edd9e4e8013e7a5da Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Mon, 24 Feb 2020 18:37:55 -0800 Subject: [PATCH 1/5] Optimize metadata building --- .../main/scala/kafka/server/KafkaApis.scala | 19 +++---- .../scala/kafka/server/MetadataCache.scala | 49 +++++++++++++------ 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8a14cd79c9f1b..2fa657760fb31 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -47,7 +47,7 @@ import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} +import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} @@ -76,12 +76,13 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.MetadataResponseData import org.apache.kafka.server.authorizer._ import scala.compat.java8.OptionConverters._ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.collection.{Map, Seq, Set, immutable, mutable} +import scala.collection.{immutable, mutable, Map, Seq, Set} import scala.util.{Failure, Success, Try} @@ -1050,7 +1051,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponse.TopicMetadata = { + private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = { val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName) topicMetadata.headOption.getOrElse(createInternalTopic(topic)) } @@ -1063,7 +1064,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses } else { - val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val nonExistentTopics = topics -- topicResponses.map(_.name).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => if (isInternal(topic)) { val topicMetadata = createInternalTopic(topic) @@ -1285,13 +1286,13 @@ class KafkaApis(val requestChannel: RequestChannel, .setPort(node.port) .setThrottleTimeMs(requestThrottleMs)) } - val responseBody = if (topicMetadata.error != Errors.NONE) { + val responseBody = if (topicMetadata.errorCode() != Errors.NONE.code()) { createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { - val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala - .find(_.partition == partition) - .filter(_.leaderId.isPresent) - .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId.get)) + val coordinatorEndpoint = topicMetadata.partitions().asScala + .find(_.partitionIndex() == partition) + .filter(_.leaderId() != MetadataResponse.NO_LEADER_ID) + .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) .flatMap(_.getNode(request.context.listenerName)) .filterNot(_.isEmpty) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 10c25b594984a..ca59cca09354c 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -21,7 +21,7 @@ import java.util import java.util.{Collections, Optional} import java.util.concurrent.locks.ReentrantReadWriteLock -import scala.collection.{Seq, Set, mutable} +import scala.collection.{mutable, Seq, Set} import scala.collection.JavaConverters._ import kafka.cluster.{Broker, EndPoint} import kafka.api._ @@ -31,8 +31,11 @@ import kafka.utils.Logging import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition} +import org.apache.kafka.common.message.MetadataResponseData +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol @@ -78,9 +81,11 @@ class MetadataCache(brokerId: Int) extends Logging { // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, - errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { + errorUnavailableListeners: Boolean): Option[MetadataResponseData.MetadataResponseTopic] = { snapshot.partitionStates.get(topic).map { partitions => - partitions.map { case (partitionId, partitionState) => + val metadataResponseTopic = new MetadataResponseData.MetadataResponseTopic + metadataResponseTopic.setName(topic) + partitions.map { case (partitionId, partitionState) => val topicPartition = new TopicPartition(topic, partitionId.toInt) val leaderBrokerId = partitionState.leader val leaderEpoch = partitionState.leaderEpoch @@ -104,26 +109,44 @@ class MetadataCache(brokerId: Int) extends Logging { s"not found on leader $leaderBrokerId") if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE } + /* new MetadataResponse.PartitionMetadata(error, topicPartition, Optional.empty(), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas) + */ + + metadataResponseTopic.partitions.add(new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId.toInt) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas)); case Some(leader) => - if (filteredReplicas.size < replicas.size) { + val error = if (filteredReplicas.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + s"following brokers ${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}") - new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition, - Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas) + Errors.REPLICA_NOT_AVAILABLE } else if (filteredIsr.size < isr.size) { debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " + s"following brokers ${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}") - new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition, - Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas) + Errors.REPLICA_NOT_AVAILABLE } else { - new MetadataResponse.PartitionMetadata(Errors.NONE, topicPartition, - Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas) + Errors.NONE } + + metadataResponseTopic.partitions.add(new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId.toInt) + .setLeaderId(maybeLeader.map(_.id()).getOrElse(MetadataResponse.NO_LEADER_ID)) + .setLeaderEpoch(leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas)); } } + metadataResponseTopic } } @@ -150,12 +173,10 @@ class MetadataCache(brokerId: Int) extends Logging { def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, - errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { + errorUnavailableListeners: Boolean = false): Seq[MetadataResponseData.MetadataResponseTopic] = { val snapshot = metadataSnapshot topics.toSeq.flatMap { topic => - getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => - new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava) - } + getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) } } From 408f8160f202e3a19bf1d0f3d27879ddc0ec0e78 Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Mon, 2 Mar 2020 15:43:06 +0530 Subject: [PATCH 2/5] Cleanups --- .../common/requests/MetadataResponse.java | 21 +++++ .../main/scala/kafka/server/KafkaApis.scala | 53 +++++++----- .../scala/kafka/server/MetadataCache.scala | 32 ++++--- .../TopicCommandWithAdminClientTest.scala | 4 +- .../unit/kafka/server/MetadataCacheTest.scala | 83 ++++++++++--------- 5 files changed, 110 insertions(+), 83 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 4fee5636f522f..0e57e78711fa0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -478,6 +478,27 @@ public static MetadataResponse prepareResponse(Collection brokers, String return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata); } + public static MetadataResponse prepareResponse(int throttleTimeMs, List topicMetadataList, + Collection brokers, String clusterId, int controllerId, + int clusterAuthorizedOperations) { + MetadataResponseData responseData = new MetadataResponseData(); + responseData.setThrottleTimeMs(throttleTimeMs); + brokers.forEach(broker -> + responseData.brokers().add(new MetadataResponseBroker() + .setNodeId(broker.id()) + .setHost(broker.host()) + .setPort(broker.port()) + .setRack(broker.rack())) + ); + + responseData.setClusterId(clusterId); + responseData.setControllerId(controllerId); + responseData.setClusterAuthorizedOperations(clusterAuthorizedOperations); + + topicMetadataList.forEach(topicMetadata -> responseData.topics().add(topicMetadata)); + return new MetadataResponse(responseData); + } + @Override public boolean shouldClientThrottle(short version) { return version >= 6; diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2fa657760fb31..95fdceae3692e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -47,7 +47,7 @@ import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} +import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} @@ -77,12 +77,13 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.MetadataResponseData +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} import org.apache.kafka.server.authorizer._ import scala.compat.java8.OptionConverters._ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.collection.{immutable, mutable, Map, Seq, Set} +import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.util.{Failure, Success, Try} @@ -1001,24 +1002,30 @@ class KafkaApis(val requestChannel: RequestChannel, private def createTopic(topic: String, numPartitions: Int, replicationFactor: Int, - properties: util.Properties = new util.Properties()): MetadataResponse.TopicMetadata = { + properties: util.Properties = new util.Properties()): MetadataResponseTopic = { try { adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) info("Auto creation of topic %s with %d partitions and replication factor %d is successful" .format(topic, numPartitions, replicationFactor)) - new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), - util.Collections.emptyList()) + metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE.code(), topic, isInternal(topic), util.Collections.emptyList()) } catch { case _: TopicExistsException => // let it go, possibly another broker created this topic - new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), - util.Collections.emptyList()) + metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE.code(), topic, isInternal(topic), util.Collections.emptyList()) case ex: Throwable => // Catch all to prevent unhandled errors - new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic), - util.Collections.emptyList()) + metadataResponseTopic(Errors.forException(ex).code(), topic, isInternal(topic), util.Collections.emptyList()) } } - private def createInternalTopic(topic: String): MetadataResponse.TopicMetadata = { + private def metadataResponseTopic(errorCode: Short, topic: String, isInternal: Boolean, + partitionData: util.List[MetadataResponsePartition]): MetadataResponseTopic = { + new MetadataResponseTopic() + .setErrorCode(errorCode) + .setName(topic) + .setIsInternal(isInternal) + .setPartitions(partitionData) + } + + private def createInternalTopic(topic: String): MetadataResponseTopic = { if (topic == null) throw new IllegalArgumentException("topic must not be null") @@ -1031,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel, s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + s"and not all brokers are up yet.") - new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) + metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE.code(), topic, true, util.Collections.emptyList()) } else { createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs) @@ -1042,7 +1049,7 @@ class KafkaApis(val requestChannel: RequestChannel, s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " + s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + s"and not all brokers are up yet.") - new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) + metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE.code(), topic, true, util.Collections.emptyList()) } else { createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt, txnCoordinator.transactionTopicConfigs) @@ -1058,7 +1065,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean, - errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = { + errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = { val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { @@ -1068,14 +1075,14 @@ class KafkaApis(val requestChannel: RequestChannel, val responsesForNonExistentTopics = nonExistentTopics.map { topic => if (isInternal(topic)) { val topicMetadata = createInternalTopic(topic) - if (topicMetadata.error == Errors.COORDINATOR_NOT_AVAILABLE) - new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) + if (topicMetadata.errorCode() == Errors.COORDINATOR_NOT_AVAILABLE.code()) + metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR.code(), topic, true, util.Collections.emptyList()) else topicMetadata } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()) + metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), topic, false, util.Collections.emptyList()) } } topicResponses ++ responsesForNonExistentTopics @@ -1110,17 +1117,17 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), - util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topic, isInternal(topic), util.Collections.emptyList())) + // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not val unauthorizedForDescribeTopicMetadata = // In case of all topics, don't include topics unauthorized for Describe if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics) - Set.empty[MetadataResponse.TopicMetadata] + Set.empty[MetadataResponseTopic] else unauthorizedForDescribeTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topic, false, util.Collections.emptyList())) // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list @@ -1130,7 +1137,7 @@ class KafkaApis(val requestChannel: RequestChannel, val errorUnavailableListeners = requestVersion >= 6 val topicMetadata = if (authorizedTopics.isEmpty) - Seq.empty[MetadataResponse.TopicMetadata] + Seq.empty[MetadataResponseTopic] else getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) @@ -1148,7 +1155,7 @@ class KafkaApis(val requestChannel: RequestChannel, // get topic authorized operations if (metadataRequest.data.includeTopicAuthorizedOperations) { topicMetadata.foreach { topicData => - topicData.authorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.topic))) + topicData.setTopicAuthorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name()))) } } } @@ -1163,10 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(request, requestThrottleMs => MetadataResponse.prepareResponse( requestThrottleMs, + completeTopicMetadata.asJava, brokers.flatMap(_.getNode(request.context.listenerName)).asJava, clusterId, metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), - completeTopicMetadata.asJava, clusterAuthorizedOperations )) } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index ca59cca09354c..40f5b3791a052 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -18,7 +18,7 @@ package kafka.server import java.util -import java.util.{Collections, Optional} +import java.util.{Collections} import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.{mutable, Seq, Set} @@ -31,11 +31,10 @@ import kafka.utils.Logging import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition} -import org.apache.kafka.common.message.MetadataResponseData +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol @@ -81,10 +80,8 @@ class MetadataCache(brokerId: Int) extends Logging { // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, - errorUnavailableListeners: Boolean): Option[MetadataResponseData.MetadataResponseTopic] = { + errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponsePartition]] = { snapshot.partitionStates.get(topic).map { partitions => - val metadataResponseTopic = new MetadataResponseData.MetadataResponseTopic - metadataResponseTopic.setName(topic) partitions.map { case (partitionId, partitionState) => val topicPartition = new TopicPartition(topic, partitionId.toInt) val leaderBrokerId = partitionState.leader @@ -109,19 +106,15 @@ class MetadataCache(brokerId: Int) extends Logging { s"not found on leader $leaderBrokerId") if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE } - /* - new MetadataResponse.PartitionMetadata(error, topicPartition, Optional.empty(), - Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas) - */ - metadataResponseTopic.partitions.add(new MetadataResponsePartition() + new MetadataResponsePartition() .setErrorCode(error.code()) .setPartitionIndex(partitionId.toInt) .setLeaderId(MetadataResponse.NO_LEADER_ID) .setLeaderEpoch(leaderEpoch) .setReplicaNodes(filteredReplicas) .setIsrNodes(filteredIsr) - .setOfflineReplicas(offlineReplicas)); + .setOfflineReplicas(offlineReplicas) case Some(leader) => val error = if (filteredReplicas.size < replicas.size) { @@ -136,17 +129,16 @@ class MetadataCache(brokerId: Int) extends Logging { Errors.NONE } - metadataResponseTopic.partitions.add(new MetadataResponsePartition() + new MetadataResponsePartition() .setErrorCode(error.code()) .setPartitionIndex(partitionId.toInt) .setLeaderId(maybeLeader.map(_.id()).getOrElse(MetadataResponse.NO_LEADER_ID)) .setLeaderEpoch(leaderEpoch) .setReplicaNodes(filteredReplicas) .setIsrNodes(filteredIsr) - .setOfflineReplicas(offlineReplicas)); + .setOfflineReplicas(offlineReplicas) } } - metadataResponseTopic } } @@ -173,10 +165,16 @@ class MetadataCache(brokerId: Int) extends Logging { def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, - errorUnavailableListeners: Boolean = false): Seq[MetadataResponseData.MetadataResponseTopic] = { + errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { val snapshot = metadataSnapshot topics.toSeq.flatMap { topic => - getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) + getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => + new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code()) + .setName(topic) + .setIsInternal(Topic.isInternal(topic)) + .setPartitions(partitionMetadata.toBuffer.asJava) + } } } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 5281d80d4ffad..fd55891ae00e0 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -580,11 +580,11 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin (result, server) => { val topicMetadatas = server.dataPlaneRequestProcessor.metadataCache .getTopicMetadata(Set(testTopicName), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - val testPartitionMetadata = topicMetadatas.find(_.topic().equals(testTopicName)).get.partitionMetadata().asScala.find(_.partition() == partitionOnBroker0) + val testPartitionMetadata = topicMetadatas.find(_.name().equals(testTopicName)).get.partitions().asScala.find(_.partitionIndex() == partitionOnBroker0) testPartitionMetadata match { case None => fail(s"Partition metadata is not found in metadata cache") case Some(metadata) => { - result && metadata.error == Errors.LEADER_NOT_AVAILABLE + result && metadata.errorCode() == Errors.LEADER_NOT_AVAILABLE.code() } } } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 60cff13898bf8..eeae292617881 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -23,6 +23,7 @@ import util.Arrays.asList import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Test @@ -118,22 +119,22 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE, topicMetadata.error) - assertEquals(topic, topicMetadata.topic) + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) + assertEquals(topic, topicMetadata.name()) val topicPartitionStates = partitionStates.filter { ps => ps.topicName == topic } - val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) + val partitionMetadatas = topicMetadata.partitions().asScala.sortBy(_.partitionIndex()) assertEquals(s"Unexpected partition count for topic $topic", topicPartitionStates.size, partitionMetadatas.size) partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) => - assertEquals(Errors.NONE, partitionMetadata.error) - assertEquals(partitionId, partitionMetadata.partition) + assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()) + assertEquals(partitionId, partitionMetadata.partitionIndex()) val partitionState = topicPartitionStates.find(_.partitionIndex == partitionId).getOrElse( Assertions.fail(s"Unable to find partition state for partition $partitionId")) - assertEquals(Optional.of(partitionState.leader), partitionMetadata.leaderId) - assertEquals(Optional.of(partitionState.leaderEpoch), partitionMetadata.leaderEpoch) - assertEquals(partitionState.isr, partitionMetadata.inSyncReplicaIds) - assertEquals(partitionState.replicas, partitionMetadata.replicaIds) + assertEquals(partitionState.leader, partitionMetadata.leaderId) + assertEquals(partitionState.leaderEpoch, partitionMetadata.leaderEpoch) + assertEquals(partitionState.isr, partitionMetadata.isrNodes()) + assertEquals(partitionState.replicas, partitionMetadata.replicaNodes()) } } @@ -255,16 +256,16 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) - val partitionMetadatas = topicMetadata.partitionMetadata + val partitionMetadatas = topicMetadata.partitions() assertEquals(1, partitionMetadatas.size) val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partition) - assertEquals(expectedError, partitionMetadata.error) - assertFalse(partitionMetadata.inSyncReplicaIds.isEmpty) - assertEquals(List(0), partitionMetadata.replicaIds.asScala) + assertEquals(0, partitionMetadata.partitionIndex()) + assertEquals(expectedError.code(), partitionMetadata.errorCode()) + assertFalse(partitionMetadata.isrNodes().isEmpty) + assertEquals(List(0), partitionMetadata.replicaNodes().asScala) } @Test @@ -313,32 +314,32 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) - val partitionMetadatas = topicMetadata.partitionMetadata + val partitionMetadatas = topicMetadata.partitions() assertEquals(1, partitionMetadatas.size) val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partition) - assertEquals(Errors.NONE, partitionMetadata.error) - assertEquals(Set(0, 1), partitionMetadata.replicaIds.asScala.toSet) - assertEquals(Set(0), partitionMetadata.inSyncReplicaIds.asScala.toSet) + assertEquals(0, partitionMetadata.partitionIndex()) + assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()) + assertEquals(Set(0, 1), partitionMetadata.replicaNodes().asScala.toSet) + assertEquals(Set(0), partitionMetadata.isrNodes().asScala.toSet) // Validate errorUnavailableEndpoints = true val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true) assertEquals(1, topicMetadatasWithError.size) val topicMetadataWithError = topicMetadatasWithError.head - assertEquals(Errors.NONE, topicMetadataWithError.error) + assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode()) - val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata + val partitionMetadatasWithError = topicMetadataWithError.partitions() assertEquals(1, partitionMetadatasWithError.size) val partitionMetadataWithError = partitionMetadatasWithError.get(0) - assertEquals(0, partitionMetadataWithError.partition) - assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error) - assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet) - assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet) + assertEquals(0, partitionMetadataWithError.partitionIndex()) + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), partitionMetadataWithError.errorCode()) + assertEquals(Set(0), partitionMetadataWithError.replicaNodes().asScala.toSet) + assertEquals(Set(0), partitionMetadataWithError.isrNodes().asScala.toSet) } @Test @@ -387,32 +388,32 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) - val partitionMetadatas = topicMetadata.partitionMetadata + val partitionMetadatas = topicMetadata.partitions() assertEquals(1, partitionMetadatas.size) val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partition) - assertEquals(Errors.NONE, partitionMetadata.error) - assertEquals(Set(0), partitionMetadata.replicaIds.asScala.toSet) - assertEquals(Set(0, 1), partitionMetadata.inSyncReplicaIds.asScala.toSet) + assertEquals(0, partitionMetadata.partitionIndex()) + assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()) + assertEquals(Set(0), partitionMetadata.replicaNodes().asScala.toSet) + assertEquals(Set(0, 1), partitionMetadata.isrNodes().asScala.toSet) // Validate errorUnavailableEndpoints = true val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true) assertEquals(1, topicMetadatasWithError.size) val topicMetadataWithError = topicMetadatasWithError.head - assertEquals(Errors.NONE, topicMetadataWithError.error) + assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode()) - val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata + val partitionMetadatasWithError = topicMetadataWithError.partitions() assertEquals(1, partitionMetadatasWithError.size) val partitionMetadataWithError = partitionMetadatasWithError.get(0) - assertEquals(0, partitionMetadataWithError.partition) - assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error) - assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet) - assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet) + assertEquals(0, partitionMetadataWithError.partitionIndex()) + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), partitionMetadataWithError.errorCode()) + assertEquals(Set(0), partitionMetadataWithError.replicaNodes().asScala.toSet) + assertEquals(Set(0), partitionMetadataWithError.isrNodes().asScala.toSet) } @Test @@ -449,8 +450,8 @@ class MetadataCacheTest { val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) assertEquals(1, topicMetadata.size) - assertEquals(1, topicMetadata.head.partitionMetadata.size) - assertEquals(Optional.empty, topicMetadata.head.partitionMetadata.get(0).leaderId) + assertEquals(1, topicMetadata.head.partitions().size) + assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, topicMetadata.head.partitions().get(0).leaderId) } @Test From 745a99309bc7d0a4b949927b58f996286f075495 Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Mon, 2 Mar 2020 17:15:37 +0530 Subject: [PATCH 3/5] Add MetadataRequestBenchmark --- build.gradle | 2 +- checkstyle/import-control-jmh-benchmarks.xml | 4 + .../unit/kafka/server/MetadataCacheTest.scala | 1 - gradle/spotbugs-exclude.xml | 6 + .../metadata/MetadataRequestBenchmark.java | 206 ++++++++++++++++++ 5 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java diff --git a/build.gradle b/build.gradle index 975d56b4881ed..e372ad1d48152 100644 --- a/build.gradle +++ b/build.gradle @@ -1553,7 +1553,7 @@ project(':jmh-benchmarks') { doFirst { if (System.getProperty("jmhArgs")) { - args System.getProperty("jmhArgs").split(',') + args System.getProperty("jmhArgs").split(' ') } args = [shadowJar.archivePath, *args] } diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 5ada90af26329..d2c60e9e0f1e7 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -36,6 +36,10 @@ + + + + diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index eeae292617881..c75dbde26eb78 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -17,7 +17,6 @@ package kafka.server import java.util -import java.util.Optional import util.Arrays.asList import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 175ad6a264353..eb23fee0efd47 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -431,4 +431,10 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java new file mode 100644 index 0000000000000..378dbd48d0e97 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.metadata; + +import kafka.controller.KafkaController; +import kafka.coordinator.group.GroupCoordinator; +import kafka.coordinator.transaction.TransactionCoordinator; +import kafka.network.RequestChannel; +import kafka.server.AdminManager; +import kafka.server.BrokerTopicStats; +import kafka.server.ClientQuotaManager; +import kafka.server.ClientRequestQuotaManager; +import kafka.server.FetchManager; +import kafka.server.KafkaApis; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.MetadataCache; +import kafka.server.QuotaFactory; +import kafka.server.ReplicaManager; +import kafka.server.ReplicationQuotaManager; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker; +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint; +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.UpdateMetadataRequest; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.SystemTime; +import org.mockito.Mockito; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import scala.Option; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) + +public class MetadataRequestBenchmark { + @Param({"500", "1000", "5000"}) + private int topicCount; + @Param({"10", "20", "50"}) + private int partitionCount; + + private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly()); + private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class); + private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); + private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class); + private AdminManager adminManager = Mockito.mock(AdminManager.class); + private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class); + private KafkaController kafkaController = Mockito.mock(KafkaController.class); + private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class); + private Metrics metrics = new Metrics(); + private int brokerId = 1; + private MetadataCache metadataCache = new MetadataCache(brokerId); + private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); + private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); + private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class); + private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager, + clientQuotaManager, clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager, + replicaQuotaManager, Option.empty()); + private FetchManager fetchManager = Mockito.mock(FetchManager.class); + private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); + private KafkaApis kafkaApis; + private RequestChannel.Request allTopicMetadataRequest; + + @Setup(Level.Trial) + public void setup() { + initializeMetadataCache(); + kafkaApis = createKafkaApis(); + allTopicMetadataRequest = buildAllTopicMetadataRequest(); + } + + private void initializeMetadataCache() { + List liveBrokers = new LinkedList<>(); + List partitionStates = new LinkedList<>(); + + IntStream.range(0, 5).forEach(brokerId -> liveBrokers.add( + new UpdateMetadataBroker().setId(brokerId) + .setEndpoints(endpoints(brokerId)) + .setRack("rack1"))); + + IntStream.range(0, topicCount).forEach(topicId -> { + String topicName = "topic-" + topicId; + + IntStream.range(0, partitionCount).forEach(partitionId -> { + partitionStates.add( + new UpdateMetadataPartitionState().setTopicName(topicName) + .setPartitionIndex(partitionId) + .setControllerEpoch(1) + .setLeader(partitionCount % 5) + .setLeaderEpoch(0) + .setIsr(Arrays.asList(0, 1, 3)) + .setZkVersion(1) + .setReplicas(Arrays.asList(0, 1, 3))); + }); + }); + + UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder( + ApiKeys.UPDATE_METADATA.latestVersion(), + 1, 1, 1, + partitionStates, liveBrokers).build(); + metadataCache.updateMetadata(100, updateMetadataRequest); + } + + private List endpoints(final int brokerId) { + return Collections.singletonList( + new UpdateMetadataEndpoint() + .setHost("host_" + brokerId) + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value())); + } + + private KafkaApis createKafkaApis() { + Properties kafkaProps = new Properties(); + kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk"); + kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + ""); + return new KafkaApis(requestChannel, + replicaManager, + adminManager, + groupCoordinator, + transactionCoordinator, + kafkaController, + kafkaZkClient, + brokerId, + new KafkaConfig(kafkaProps), + metadataCache, + metrics, + Option.empty(), + quotaManagers, + fetchManager, + brokerTopicStats, + "clusterId", + new SystemTime(), + null); + } + + @TearDown(Level.Trial) + public void tearDown() { + kafkaApis.close(); + metrics.close(); + } + + private RequestChannel.Request buildAllTopicMetadataRequest() { + MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build(); + ByteBuffer buffer = metadataRequest.serialize(new RequestHeader(metadataRequest.api, + metadataRequest.version(), "", 0)); + RequestHeader header = RequestHeader.parse(buffer); + + RequestContext context = new RequestContext(header, "1", null, principal, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY); + return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer, requestChannelMetrics); + } + + @Benchmark + public void testMetadataRequestForAllTopics() { + kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest); + } +} From 3a763daddaab1ec60cbdc3e603371199d506845b Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Sun, 8 Mar 2020 06:06:58 +0530 Subject: [PATCH 4/5] Update MetadataResponse.prepareResponse() --- .../clients/admin/KafkaAdminClientTest.java | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 28b46401e0b96..4f7173965bbb3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -82,6 +82,8 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection; @@ -342,28 +344,33 @@ private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors err } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { - List metadata = new ArrayList<>(); + List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { - List pms = new ArrayList<>(); + List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { - PartitionMetadata pm = new PartitionMetadata(error, - new TopicPartition(topic, pInfo.partition()), - Optional.of(pInfo.leader().id()), - Optional.of(234), - Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()), - Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()), - Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); + MetadataResponsePartition pm = new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(pInfo.partition()) + .setLeaderId(pInfo.leader().id()) + .setLeaderEpoch(234) + .setReplicaNodes(Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList())) + .setIsrNodes(Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList())) + .setOfflineReplicas(Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); pms.add(pm); } - TopicMetadata tm = new TopicMetadata(error, topic, false, pms); + MetadataResponseTopic tm = new MetadataResponseTopic() + .setErrorCode(error.code()) + .setName(topic) + .setIsInternal(false) + .setPartitions(pms); metadata.add(tm); } return MetadataResponse.prepareResponse(0, - cluster.nodes(), - cluster.clusterResource().clusterId(), - cluster.controller().id(), - metadata, - MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); + metadata, + cluster.nodes(), + cluster.clusterResource().clusterId(), + cluster.controller().id(), + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } /** @@ -1107,19 +1114,19 @@ public void testDescribeCluster() throws Exception { // Prepare the metadata response used for the first describe cluster MetadataResponse response = MetadataResponse.prepareResponse(0, + Collections.emptyList(), env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - Collections.emptyList(), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); env.kafkaClient().prepareResponse(response); // Prepare the metadata response used for the second describe cluster MetadataResponse response2 = MetadataResponse.prepareResponse(0, + Collections.emptyList(), env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 3, - Collections.emptyList(), 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code()); env.kafkaClient().prepareResponse(response2); From ddd4825b6cd4da85a5724ecabb3585b066cd4b4f Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Fri, 13 Mar 2020 13:01:53 +0530 Subject: [PATCH 5/5] Address review comments --- .../clients/admin/KafkaAdminClientTest.java | 2 - .../main/scala/kafka/server/KafkaApis.scala | 34 ++++----- .../scala/kafka/server/MetadataCache.scala | 8 +- .../TopicCommandWithAdminClientTest.scala | 4 +- .../unit/kafka/server/MetadataCacheTest.scala | 76 +++++++++---------- 5 files changed, 61 insertions(+), 63 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 4f7173965bbb3..9731476a3626b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -114,8 +114,6 @@ import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; -import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetDeleteResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 95fdceae3692e..2f275024e7983 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1007,19 +1007,19 @@ class KafkaApis(val requestChannel: RequestChannel, adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) info("Auto creation of topic %s with %d partitions and replication factor %d is successful" .format(topic, numPartitions, replicationFactor)) - metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE.code(), topic, isInternal(topic), util.Collections.emptyList()) + metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList()) } catch { case _: TopicExistsException => // let it go, possibly another broker created this topic - metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE.code(), topic, isInternal(topic), util.Collections.emptyList()) + metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList()) case ex: Throwable => // Catch all to prevent unhandled errors - metadataResponseTopic(Errors.forException(ex).code(), topic, isInternal(topic), util.Collections.emptyList()) + metadataResponseTopic(Errors.forException(ex), topic, isInternal(topic), util.Collections.emptyList()) } } - private def metadataResponseTopic(errorCode: Short, topic: String, isInternal: Boolean, + private def metadataResponseTopic(error: Errors, topic: String, isInternal: Boolean, partitionData: util.List[MetadataResponsePartition]): MetadataResponseTopic = { new MetadataResponseTopic() - .setErrorCode(errorCode) + .setErrorCode(error.code) .setName(topic) .setIsInternal(isInternal) .setPartitions(partitionData) @@ -1038,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel, s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + s"and not all brokers are up yet.") - metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE.code(), topic, true, util.Collections.emptyList()) + metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) } else { createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs) @@ -1049,7 +1049,7 @@ class KafkaApis(val requestChannel: RequestChannel, s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " + s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + s"and not all brokers are up yet.") - metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE.code(), topic, true, util.Collections.emptyList()) + metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) } else { createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt, txnCoordinator.transactionTopicConfigs) @@ -1075,14 +1075,14 @@ class KafkaApis(val requestChannel: RequestChannel, val responsesForNonExistentTopics = nonExistentTopics.map { topic => if (isInternal(topic)) { val topicMetadata = createInternalTopic(topic) - if (topicMetadata.errorCode() == Errors.COORDINATOR_NOT_AVAILABLE.code()) - metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR.code(), topic, true, util.Collections.emptyList()) + if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) + metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) else topicMetadata } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), topic, false, util.Collections.emptyList()) + metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()) } } topicResponses ++ responsesForNonExistentTopics @@ -1117,7 +1117,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topic, isInternal(topic), util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), util.Collections.emptyList())) // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not @@ -1127,7 +1127,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponseTopic] else unauthorizedForDescribeTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED.code(), topic, false, util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list @@ -1155,7 +1155,7 @@ class KafkaApis(val requestChannel: RequestChannel, // get topic authorized operations if (metadataRequest.data.includeTopicAuthorizedOperations) { topicMetadata.foreach { topicData => - topicData.setTopicAuthorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name()))) + topicData.setTopicAuthorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name))) } } } @@ -1293,12 +1293,12 @@ class KafkaApis(val requestChannel: RequestChannel, .setPort(node.port) .setThrottleTimeMs(requestThrottleMs)) } - val responseBody = if (topicMetadata.errorCode() != Errors.NONE.code()) { + val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { - val coordinatorEndpoint = topicMetadata.partitions().asScala - .find(_.partitionIndex() == partition) - .filter(_.leaderId() != MetadataResponse.NO_LEADER_ID) + val coordinatorEndpoint = topicMetadata.partitions.asScala + .find(_.partitionIndex == partition) + .filter(_.leaderId != MetadataResponse.NO_LEADER_ID) .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) .flatMap(_.getNode(request.context.listenerName)) .filterNot(_.isEmpty) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 40f5b3791a052..ee06dcd8cfa32 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -82,7 +82,7 @@ class MetadataCache(brokerId: Int) extends Logging { private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponsePartition]] = { snapshot.partitionStates.get(topic).map { partitions => - partitions.map { case (partitionId, partitionState) => + partitions.map { case (partitionId, partitionState) => val topicPartition = new TopicPartition(topic, partitionId.toInt) val leaderBrokerId = partitionState.leader val leaderEpoch = partitionState.leaderEpoch @@ -108,7 +108,7 @@ class MetadataCache(brokerId: Int) extends Logging { } new MetadataResponsePartition() - .setErrorCode(error.code()) + .setErrorCode(error.code) .setPartitionIndex(partitionId.toInt) .setLeaderId(MetadataResponse.NO_LEADER_ID) .setLeaderEpoch(leaderEpoch) @@ -130,7 +130,7 @@ class MetadataCache(brokerId: Int) extends Logging { } new MetadataResponsePartition() - .setErrorCode(error.code()) + .setErrorCode(error.code) .setPartitionIndex(partitionId.toInt) .setLeaderId(maybeLeader.map(_.id()).getOrElse(MetadataResponse.NO_LEADER_ID)) .setLeaderEpoch(leaderEpoch) @@ -170,7 +170,7 @@ class MetadataCache(brokerId: Int) extends Logging { topics.toSeq.flatMap { topic => getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => new MetadataResponseTopic() - .setErrorCode(Errors.NONE.code()) + .setErrorCode(Errors.NONE.code) .setName(topic) .setIsInternal(Topic.isInternal(topic)) .setPartitions(partitionMetadata.toBuffer.asJava) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index fd55891ae00e0..cdeb49a120846 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -580,11 +580,11 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin (result, server) => { val topicMetadatas = server.dataPlaneRequestProcessor.metadataCache .getTopicMetadata(Set(testTopicName), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - val testPartitionMetadata = topicMetadatas.find(_.name().equals(testTopicName)).get.partitions().asScala.find(_.partitionIndex() == partitionOnBroker0) + val testPartitionMetadata = topicMetadatas.find(_.name.equals(testTopicName)).get.partitions.asScala.find(_.partitionIndex == partitionOnBroker0) testPartitionMetadata match { case None => fail(s"Partition metadata is not found in metadata cache") case Some(metadata) => { - result && metadata.errorCode() == Errors.LEADER_NOT_AVAILABLE.code() + result && metadata.errorCode == Errors.LEADER_NOT_AVAILABLE.code } } } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index c75dbde26eb78..ce804e81ed244 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -118,22 +118,22 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) - assertEquals(topic, topicMetadata.name()) + assertEquals(Errors.NONE.code, topicMetadata.errorCode) + assertEquals(topic, topicMetadata.name) val topicPartitionStates = partitionStates.filter { ps => ps.topicName == topic } - val partitionMetadatas = topicMetadata.partitions().asScala.sortBy(_.partitionIndex()) + val partitionMetadatas = topicMetadata.partitions.asScala.sortBy(_.partitionIndex) assertEquals(s"Unexpected partition count for topic $topic", topicPartitionStates.size, partitionMetadatas.size) partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) => - assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()) - assertEquals(partitionId, partitionMetadata.partitionIndex()) + assertEquals(Errors.NONE.code, partitionMetadata.errorCode) + assertEquals(partitionId, partitionMetadata.partitionIndex) val partitionState = topicPartitionStates.find(_.partitionIndex == partitionId).getOrElse( Assertions.fail(s"Unable to find partition state for partition $partitionId")) assertEquals(partitionState.leader, partitionMetadata.leaderId) assertEquals(partitionState.leaderEpoch, partitionMetadata.leaderEpoch) - assertEquals(partitionState.isr, partitionMetadata.isrNodes()) - assertEquals(partitionState.replicas, partitionMetadata.replicaNodes()) + assertEquals(partitionState.isr, partitionMetadata.isrNodes) + assertEquals(partitionState.replicas, partitionMetadata.replicaNodes) } } @@ -255,16 +255,16 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) + assertEquals(Errors.NONE.code, topicMetadata.errorCode) - val partitionMetadatas = topicMetadata.partitions() + val partitionMetadatas = topicMetadata.partitions assertEquals(1, partitionMetadatas.size) val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partitionIndex()) - assertEquals(expectedError.code(), partitionMetadata.errorCode()) - assertFalse(partitionMetadata.isrNodes().isEmpty) - assertEquals(List(0), partitionMetadata.replicaNodes().asScala) + assertEquals(0, partitionMetadata.partitionIndex) + assertEquals(expectedError.code, partitionMetadata.errorCode) + assertFalse(partitionMetadata.isrNodes.isEmpty) + assertEquals(List(0), partitionMetadata.replicaNodes.asScala) } @Test @@ -313,32 +313,32 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) + assertEquals(Errors.NONE.code(), topicMetadata.errorCode) - val partitionMetadatas = topicMetadata.partitions() + val partitionMetadatas = topicMetadata.partitions assertEquals(1, partitionMetadatas.size) val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partitionIndex()) - assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()) - assertEquals(Set(0, 1), partitionMetadata.replicaNodes().asScala.toSet) - assertEquals(Set(0), partitionMetadata.isrNodes().asScala.toSet) + assertEquals(0, partitionMetadata.partitionIndex) + assertEquals(Errors.NONE.code, partitionMetadata.errorCode) + assertEquals(Set(0, 1), partitionMetadata.replicaNodes.asScala.toSet) + assertEquals(Set(0), partitionMetadata.isrNodes.asScala.toSet) // Validate errorUnavailableEndpoints = true val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true) assertEquals(1, topicMetadatasWithError.size) val topicMetadataWithError = topicMetadatasWithError.head - assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode()) + assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode) val partitionMetadatasWithError = topicMetadataWithError.partitions() assertEquals(1, partitionMetadatasWithError.size) val partitionMetadataWithError = partitionMetadatasWithError.get(0) - assertEquals(0, partitionMetadataWithError.partitionIndex()) - assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), partitionMetadataWithError.errorCode()) - assertEquals(Set(0), partitionMetadataWithError.replicaNodes().asScala.toSet) - assertEquals(Set(0), partitionMetadataWithError.isrNodes().asScala.toSet) + assertEquals(0, partitionMetadataWithError.partitionIndex) + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, partitionMetadataWithError.errorCode) + assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet) + assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet) } @Test @@ -387,32 +387,32 @@ class MetadataCacheTest { assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code(), topicMetadata.errorCode()) + assertEquals(Errors.NONE.code(), topicMetadata.errorCode) - val partitionMetadatas = topicMetadata.partitions() + val partitionMetadatas = topicMetadata.partitions assertEquals(1, partitionMetadatas.size) val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partitionIndex()) - assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()) - assertEquals(Set(0), partitionMetadata.replicaNodes().asScala.toSet) - assertEquals(Set(0, 1), partitionMetadata.isrNodes().asScala.toSet) + assertEquals(0, partitionMetadata.partitionIndex) + assertEquals(Errors.NONE.code, partitionMetadata.errorCode) + assertEquals(Set(0), partitionMetadata.replicaNodes.asScala.toSet) + assertEquals(Set(0, 1), partitionMetadata.isrNodes.asScala.toSet) // Validate errorUnavailableEndpoints = true val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true) assertEquals(1, topicMetadatasWithError.size) val topicMetadataWithError = topicMetadatasWithError.head - assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode()) + assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode) - val partitionMetadatasWithError = topicMetadataWithError.partitions() + val partitionMetadatasWithError = topicMetadataWithError.partitions assertEquals(1, partitionMetadatasWithError.size) val partitionMetadataWithError = partitionMetadatasWithError.get(0) - assertEquals(0, partitionMetadataWithError.partitionIndex()) - assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), partitionMetadataWithError.errorCode()) - assertEquals(Set(0), partitionMetadataWithError.replicaNodes().asScala.toSet) - assertEquals(Set(0), partitionMetadataWithError.isrNodes().asScala.toSet) + assertEquals(0, partitionMetadataWithError.partitionIndex) + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, partitionMetadataWithError.errorCode) + assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet) + assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet) } @Test @@ -449,8 +449,8 @@ class MetadataCacheTest { val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) assertEquals(1, topicMetadata.size) - assertEquals(1, topicMetadata.head.partitions().size) - assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, topicMetadata.head.partitions().get(0).leaderId) + assertEquals(1, topicMetadata.head.partitions.size) + assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, topicMetadata.head.partitions.get(0).leaderId) } @Test