Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ class MetadataCache(brokerId: Int) extends Logging {
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)

// This method is the main hotspot when it comes to the performance of metadata requests,
// we should be careful about adding additional logic here.
// we should be careful about adding additional logic here. Relatedly, `brokers` is
// `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
// filterUnavailableEndpoints exists to support v0 MetadataResponses
private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, do you think we can switch this to be a plain j.u.List? See next comment as well.

val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
brokers.foreach { brokerId =>
val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
Expand All @@ -76,9 +77,9 @@ class MetadataCache(brokerId: Int) extends Logging {
val leaderBrokerId = partitionState.basePartitionState.leader
val leaderEpoch = partitionState.basePartitionState.leaderEpoch
val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName)
val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
val replicas = partitionState.basePartitionState.replicas.asScala
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, these fields are all originally j.u.Lists. There seems to be an unnecessary conversion to Scala collections and then back to Java (https://github.com/apache/kafka/pull/6397/files#diff-bfeebf48d90e86c1ffb850fbbe019dd6R112). I think we can avoid these altogether. Although there are Scala-collection-specific filters applied for debug logging (https://github.com/apache/kafka/pull/6397/files#diff-bfeebf48d90e86c1ffb850fbbe019dd6R107 for example), you can just use JavaConverters.collectionAsScalaIterable which IIUC does not do any copying.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, nm - these aren't copying conversions except for the type conversion which your patch eliminates. So I think we are good.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, none of the converters do copying. It's just unfortunate that there's no way to have Int as a collection parameter without copying. We can live with Integer here.

val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)

maybeLeader match {
case None =>
Expand All @@ -94,7 +95,7 @@ class MetadataCache(brokerId: Int) extends Logging {
offlineReplicaInfo.asJava)

case Some(leader) =>
val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt)
val isr = partitionState.basePartitionState.isr.asScala
val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)

if (replicaInfo.size < replicas.size) {
Expand Down