diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 103f6cf575d43..6c503a57a23f0 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -23,15 +23,16 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse} import scala.collection._ -import com.yammer.metrics.core.Gauge +import com.yammer.metrics.core.{Gauge, Meter} import java.util.concurrent.TimeUnit + import kafka.admin.AdminUtils import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ import kafka.cluster.Broker import kafka.common._ import kafka.log.LogConfig -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.CoreUtils._ @@ -39,8 +40,9 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} +import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} import java.util.concurrent.locks.ReentrantLock + import kafka.server._ import kafka.common.TopicAndPartition @@ -1468,6 +1470,13 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo } object ControllerStats extends KafkaMetricsGroup { - val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) - val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + + private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + + // KafkaServer needs to initialize controller metrics during startup. We perform initialization + // through method calls to avoid Scala compiler warnings. + def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate + + def leaderElectionTimer: KafkaTimer = _leaderElectionTimer }