diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index ec5a2b9ed38b1..647ccb13b04a4 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -51,9 +51,10 @@ class MetadataCache(brokerId: Int) extends Logging { private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) // This method is the main hotspot when it comes to the performance of metadata requests, - // we should be careful about adding additional logic here. + // we should be careful about adding additional logic here. Relatedly, `brokers` is + // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy. // filterUnavailableEndpoints exists to support v0 MetadataResponses - private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { + private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size)) brokers.foreach { brokerId => val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match { @@ -76,9 +77,9 @@ class MetadataCache(brokerId: Int) extends Logging { val leaderBrokerId = partitionState.basePartitionState.leader val leaderEpoch = partitionState.basePartitionState.leaderEpoch val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName) - val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) + val replicas = partitionState.basePartitionState.replicas.asScala val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints) - val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints) + val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => @@ -94,7 +95,7 @@ class MetadataCache(brokerId: Int) extends Logging { offlineReplicaInfo.asJava) case Some(leader) => - val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt) + val isr = partitionState.basePartitionState.isr.asScala val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints) if (replicaInfo.size < replicas.size) {