Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
try {
leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach { case (topicPartition, state) =>
val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
val typeOfRequest =
if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader"
else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
state.leaderIsrAndControllerEpoch, broker,
Expand Down
113 changes: 50 additions & 63 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,20 @@ object KafkaController extends Logging {
}
}

class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkUtils)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)

// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val kafkaScheduler = new KafkaScheduler(1)
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
// visible for testing
private[controller] val kafkaScheduler = new KafkaScheduler(1)

private val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
// visible for testing
private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
_ => updateMetrics())

val topicDeletionManager = new TopicDeletionManager(this, eventManager)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
Expand Down Expand Up @@ -290,7 +292,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
* Note:We need to resign as a controller out of the controller lock to avoid potential deadlock issue
*/
def onControllerResignation() {
debug("Controller resigning, broker id %d".format(config.brokerId))
Expand Down Expand Up @@ -318,9 +319,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
replicaStateMachine.shutdown()
deregisterBrokerChangeListener()

// reset controller context
resetControllerContext()
brokerState.newState(RunningAsBroker)

info("Broker %d resigned as the controller".format(config.brokerId))
}
Expand Down Expand Up @@ -746,18 +745,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
}

def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}

private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
zkUtils.getLeaderAndIsrForPartition(topic, partition) match {
case Some(leaderAndIsr) =>
val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
replicasNotInIsr.isEmpty
case None => false
}
zkUtils.getLeaderAndIsrForPartition(topic, partition).map { leaderAndIsr =>
replicas.forall(leaderAndIsr.isr.contains)
}.getOrElse(false)
}

private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
Expand Down Expand Up @@ -824,22 +820,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
}

private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
brokerRequestBatch.newBatch()
updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
try {
brokerRequestBatch.newBatch()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be in the try/catch since it can throw an IllegalStateException and it seems like updateLeaderEpoch doesn't do anything with this field. But please double-check.

brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e : IllegalStateException => {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
brokerRequestBatch.clear()
triggerControllerMove()

throw e
}
case e: IllegalStateException =>
handleIllegalState(e)
}
stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
"to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
Expand Down Expand Up @@ -986,14 +976,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e : IllegalStateException => {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
brokerRequestBatch.clear()
triggerControllerMove()

throw e
}
case e: IllegalStateException =>
handleIllegalState(e)
}
}

Expand Down Expand Up @@ -1425,39 +1409,32 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
controllerContext.partitionsOnBroker(id)
.map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))

allPartitionsAndReplicationFactorOnBroker.foreach {
case(topicAndPartition, replicationFactor) =>
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (replicationFactor > 1) {
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
// If the broker leads the topic partition, transition the leader and update isr. Updates zk and
// notifies all affected brokers
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector)
} else {
// Stop the replica first. The state change below initiates ZK changes which should take some time
// before which the stop replica request should be completed (in most cases)
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e : IllegalStateException => {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
brokerRequestBatch.clear()
triggerControllerMove()

throw e
}
}
// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition, id)), OfflineReplica)
allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor) =>
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (replicationFactor > 1) {
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
// If the broker leads the topic partition, transition the leader and update isr. Updates zk and
// notifies all affected brokers
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector)
} else {
// Stop the replica first. The state change below initiates ZK changes which should take some time
// before which the stop replica request should be completed (in most cases)
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}
// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition, id)), OfflineReplica)
}
}
}
}
def replicatedPartitionsBrokerLeads() = {
trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
Expand Down Expand Up @@ -1557,7 +1534,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
}
}

// visible for testing
private[controller] def handleIllegalState(e: IllegalStateException): Nothing = {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
brokerRequestBatch.clear()
triggerControllerMove()
throw e
}

private def triggerControllerMove(): Unit = {
onControllerResignation()
activeControllerId = -1
controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.newBatch()
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
for((topicAndPartition, partitionState) <- partitionState
for ((topicAndPartition, partitionState) <- partitionState
if !controller.topicDeletionManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
}
Expand Down Expand Up @@ -111,7 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
} catch {
case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
*/
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if(replicas.nonEmpty) {
if (replicas.nonEmpty) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
} catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
replicaManager.startup()

/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
kafkaController.startup()

adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,7 @@ class ZkUtils(val zkClient: ZkClient,
cluster
}

def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
for(topicAndPartition <- topicAndPartitions) {
ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Timer
import kafka.utils.TestUtils
import org.easymock.{EasyMock, IAnswer}
import org.junit.{After, Test}
import org.junit.Assert.{assertEquals, fail}

Expand Down Expand Up @@ -60,21 +59,13 @@ class ControllerEventManagerTest {

val initialTimerCount = timer(metricName).count

// `ControllerEvent` is sealed so we use EasyMock to create a subclass
val eventMock = EasyMock.createMock(classOf[ControllerEvent])
EasyMock.expect(eventMock.state).andReturn(controllerState)

// Only return from `process()` once we have checked `controllerEventManager.state`
val latch = new CountDownLatch(1)
EasyMock.expect(eventMock.process()).andAnswer(new IAnswer[Unit]() {
def answer(): Unit = {
latch.await()
process()
}
val eventMock = ControllerTestUtils.createMockControllerEvent(controllerState, { () =>
latch.await()
process()
})

EasyMock.replay(eventMock)

controllerEventManager.put(eventMock)
TestUtils.waitUntilTrue(() => controllerEventManager.state == controllerState,
s"Controller state is not $controllerState")
Expand Down
Loading