From 507325f8b477114aeb9c002961fe68756b88a322 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 27 Apr 2016 23:16:54 -0700 Subject: [PATCH 1/4] Remove some warning --- core/src/main/scala/kafka/controller/KafkaController.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 103f6cf575d43..4b02ac2eb5404 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1468,6 +1468,6 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo } object ControllerStats extends KafkaMetricsGroup { - val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) - val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + def uncleanLeaderElectionRate() = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + def leaderElectionTimer() = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f998d82104d36..6a2214f9b6cd0 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -304,8 +304,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr */ private def registerStats() { BrokerTopicStats.getBrokerAllTopicsStats() - ControllerStats.uncleanLeaderElectionRate - ControllerStats.leaderElectionTimer + ControllerStats.uncleanLeaderElectionRate() + ControllerStats.leaderElectionTimer() } /** From 196573888c8547a3c3f12a0eda3842bbdcb49f02 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 11:44:06 -0700 Subject: [PATCH 2/4] Address the review comments --- .../kafka/controller/KafkaController.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4b02ac2eb5404..4cbd699b28130 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -23,15 +23,16 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse} import scala.collection._ -import com.yammer.metrics.core.Gauge +import com.yammer.metrics.core.{Gauge, Meter} import java.util.concurrent.TimeUnit + import kafka.admin.AdminUtils import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ import kafka.cluster.Broker import kafka.common._ import kafka.log.LogConfig -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.CoreUtils._ @@ -39,8 +40,9 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} +import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} import java.util.concurrent.locks.ReentrantLock + import kafka.server._ import kafka.common.TopicAndPartition @@ -1468,6 +1470,15 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo } object ControllerStats extends KafkaMetricsGroup { - def uncleanLeaderElectionRate() = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) - def leaderElectionTimer() = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + + private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + + def uncleanLeaderElectionRate() = { + _uncleanLeaderElectionRate + } + + def leaderElectionTimer() = { + _leaderElectionTimer + } } From ec25df006a4cfa1305aeaaeb07b97cb104b0f5fc Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 14:33:53 -0700 Subject: [PATCH 3/4] Style fixes --- .../src/main/scala/kafka/controller/KafkaController.scala | 8 ++------ core/src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4cbd699b28130..494611fc52232 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1474,11 +1474,7 @@ object ControllerStats extends KafkaMetricsGroup { private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - def uncleanLeaderElectionRate() = { - _uncleanLeaderElectionRate - } + def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate - def leaderElectionTimer() = { - _leaderElectionTimer - } + def leaderElectionTimer: KafkaTimer = _leaderElectionTimer } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 6a2214f9b6cd0..f998d82104d36 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -304,8 +304,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr */ private def registerStats() { BrokerTopicStats.getBrokerAllTopicsStats() - ControllerStats.uncleanLeaderElectionRate() - ControllerStats.leaderElectionTimer() + ControllerStats.uncleanLeaderElectionRate + ControllerStats.leaderElectionTimer } /** From 65d759749ae4f553eeb38f9d0b8f4cfc27ed5626 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 28 Apr 2016 14:42:53 -0700 Subject: [PATCH 4/4] Add comments --- core/src/main/scala/kafka/controller/KafkaController.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 494611fc52232..6c503a57a23f0 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1474,6 +1474,8 @@ object ControllerStats extends KafkaMetricsGroup { private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + // KafkaServer needs to initialize controller metrics during startup. We perform initialization + // through method calls to avoid Scala compiler warnings. def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate def leaderElectionTimer: KafkaTimer = _leaderElectionTimer