From 8bea626691dfd827d50b822cc53661fcd279a342 Mon Sep 17 00:00:00 2001 From: Radai Rosenblatt Date: Thu, 7 Mar 2019 15:01:06 -0800 Subject: [PATCH 1/4] MINOR: avoid unnecessary collection copies between java and scala the metadata cache used map() to convert java.util.Lists into Iterable[Int]. the map() calls removed by this patch represent 11% of total CPU time measured under load for us. we also expect a positive impact on GC. Signed-off-by: radai-rosenblatt --- core/src/main/scala/kafka/server/MetadataCache.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index ec5a2b9ed38b1..6c53dc21aa051 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -53,7 +53,7 @@ class MetadataCache(brokerId: Int) extends Logging { // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. // filterUnavailableEndpoints exists to support v0 MetadataResponses - private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { + private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size)) brokers.foreach { brokerId => val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match { @@ -76,9 +76,9 @@ class MetadataCache(brokerId: Int) extends Logging { val leaderBrokerId = partitionState.basePartitionState.leader val leaderEpoch = partitionState.basePartitionState.leaderEpoch val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName) - val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) + val replicas = partitionState.basePartitionState.replicas.asScala val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints) - val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints) + val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => @@ -94,7 +94,7 @@ class MetadataCache(brokerId: Int) extends Logging { offlineReplicaInfo.asJava) case Some(leader) => - val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt) + val isr = partitionState.basePartitionState.isr.asScala val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints) if (replicaInfo.size < replicas.size) { From ca42c6a14fe7b1cc48ca797d2afa8f2d85c517bc Mon Sep 17 00:00:00 2001 From: radai-rosenblatt Date: Thu, 14 Mar 2019 19:25:05 -0700 Subject: [PATCH 2/4] address review comment Signed-off-by: radai-rosenblatt --- core/src/main/scala/kafka/server/MetadataCache.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 6c53dc21aa051..3dc3bc45a3d73 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -53,6 +53,8 @@ class MetadataCache(brokerId: Int) extends Logging { // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. // filterUnavailableEndpoints exists to support v0 MetadataResponses + // brokers is Iterable[j.l.Integer] and not a native scala type because the data is stored as a java collection + // and converting it to a scala collection for this invocation causes performance degradation private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size)) brokers.foreach { brokerId => From d4dd4146b49a3daa644bfc6c09006ebb946ff2de Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 22 Mar 2019 01:27:28 -0700 Subject: [PATCH 3/4] Tweak comment --- core/src/main/scala/kafka/server/MetadataCache.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 3dc3bc45a3d73..fa2a30030fabd 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -51,10 +51,9 @@ class MetadataCache(brokerId: Int) extends Logging { private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) // This method is the main hotspot when it comes to the performance of metadata requests, - // we should be careful about adding additional logic here. + // we should be careful about adding additional logic here. Relatedly, `brokers` is + // `Iterable[Integer]` instead of `Seq[Int]` to avoid a collection copy. // filterUnavailableEndpoints exists to support v0 MetadataResponses - // brokers is Iterable[j.l.Integer] and not a native scala type because the data is stored as a java collection - // and converting it to a scala collection for this invocation causes performance degradation private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size)) brokers.foreach { brokerId => From aabb6735a0168c4b04ec92cd1a9d4ac50173df8a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 22 Mar 2019 01:30:38 -0700 Subject: [PATCH 4/4] Fix comment --- core/src/main/scala/kafka/server/MetadataCache.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index fa2a30030fabd..647ccb13b04a4 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -52,7 +52,7 @@ class MetadataCache(brokerId: Int) extends Logging { // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. Relatedly, `brokers` is - // `Iterable[Integer]` instead of `Seq[Int]` to avoid a collection copy. + // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy. // filterUnavailableEndpoints exists to support v0 MetadataResponses private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))