Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 60 additions & 42 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse
import collection.Set

class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "

Expand Down Expand Up @@ -100,7 +100,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
}
}

private def startRequestSendThread(brokerId: Int) {
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
Expand Down Expand Up @@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
}

def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
p._2.leaderIsrAndControllerEpoch, correlationId, broker,
p._1._1, p._1._2))
try {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
p._2.leaderIsrAndControllerEpoch, correlationId, broker,
p._1._1, p._1._2))
}
controller.sendRequest(broker, leaderAndIsrRequest, null)
}
controller.sendRequest(broker, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap

val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0
val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch,
correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
"correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
correlationId, broker, p._1)))
controller.sendRequest(broker, updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
controller.sendRequest(broker, stopReplicaRequest, r.callback)
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap

val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0
val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch,
correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
"correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
correlationId, broker, p._1)))
controller.sendRequest(broker, updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
controller.sendRequest(broker, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
if(leaderAndIsrRequestMap.size > 0) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap")
}
if(updateMetadataRequestMap.size > 0) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap")
}
if(stopReplicaRequestMap.size > 0) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap")
}
throw new IllegalStateException(e)
}
}
stopReplicaRequestMap.clear()
}
}

Expand Down
52 changes: 41 additions & 11 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,20 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
} else {
// Stop the replica first. The state change below initiates ZK changes which should take some time
// before which the stop replica request should be completed (in most cases)
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)

try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
} catch {
case e : IllegalStateException => {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
controllerElector.resign()

throw e
}
}
// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition, id)), OfflineReplica)
Expand Down Expand Up @@ -341,6 +350,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
* required to clean up internal controller data structures
*/
def onControllerResignation() {
debug("Controller resigning, broker id %d".format(config.brokerId))
// de-register listeners
deregisterIsrChangeNotificationListener()
deregisterReassignedPartitionsListener()
Expand Down Expand Up @@ -888,9 +898,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
brokerRequestBatch.newBatch()
updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
try {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
} catch {
case e : IllegalStateException => {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
controllerElector.resign()

throw e
}
}
stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
"to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition))
Expand Down Expand Up @@ -998,9 +1018,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
* @param brokers The brokers that the update metadata request should be sent to
*/
def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
} catch {
case e : IllegalStateException => {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
controllerElector.resign()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that resign on exception thrown may be an overkill, but until we figured out what exceptions could possibly thrown here we may want to be as conservative as resigning the controller.


throw e
}
}
}

/**
Expand Down
187 changes: 187 additions & 0 deletions core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.controller

import java.util.concurrent.LinkedBlockingQueue
import java.util.Properties

import junit.framework.Assert._
import org.scalatest.junit.JUnit3Suite

import org.junit.{Test, After, Before}
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.log4j.{Logger, Level}

import kafka.api.RequestOrResponse
import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.server.BrokerState
import kafka.server.KafkaConfig
import kafka.server.KafkaServer
import kafka.server.RunningAsController
import kafka.utils._
import kafka.utils.TestUtils._

import scala.collection.Map
import scala.collection.mutable


class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
val log = Logger.getLogger(classOf[ControllerFailoverTest])
val numNodes = 2
val numParts = 1
val msgQueueSize = 1
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)

override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))

override def setUp() {
super.setUp()
}

override def tearDown() {
super.tearDown()
}

/**
* See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
* for the background of this test case
*/
def testMetadataUpdate() {
log.setLevel(Level.INFO)
var controller: KafkaServer = this.servers.head;
// Find the current controller
val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
for (server <- this.servers) {
epochMap += (server.config.brokerId -> server.kafkaController.epoch)
if(server.kafkaController.isActive()) {
controller = server
}
}
// Create topic with one partition
kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1)
val topicPartition = TopicAndPartition("topic1", 0)
var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
while (!partitions.contains(topicPartition)) {
partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
Thread.sleep(100)
}
// Replace channel manager with our mock manager
controller.kafkaController.controllerContext.controllerChannelManager.shutdown()
val channelManager = new MockChannelManager(controller.kafkaController.controllerContext,
controller.kafkaController.config)
channelManager.startup()
controller.kafkaController.controllerContext.controllerChannelManager = channelManager
channelManager.shrinkBlockingQueue(0)
channelManager.stopSendThread(0)
// Spawn a new thread to block on the outgoing channel
// queue
val thread = new Thread(new Runnable {
def run() {
try {
controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
} catch {
case e : Exception => {
log.info("Thread interrupted")
}
}
}
})
thread.setName("mythread")
thread.start()
while (thread.getState() != Thread.State.WAITING) {
Thread.sleep(100)
}
// Assume that the thread is WAITING because it is
// blocked on the queue, so interrupt and move forward
thread.interrupt()
thread.join()
channelManager.resumeSendThread(0)
// Wait and find current controller
var found = false
var counter = 0
while (!found && counter < 10) {
for (server <- this.servers) {
val previousEpoch = (epochMap get server.config.brokerId) match {
case Some(epoch) =>
epoch
case None =>
val msg = String.format("Missing element in epoch map %s", epochMap.mkString(", "))
throw new IllegalStateException(msg)
}

if (server.kafkaController.isActive
&& (previousEpoch) < server.kafkaController.epoch) {
controller = server
found = true
}
}
if (!found) {
Thread.sleep(100)
counter += 1
}
}
// Give it a shot to make sure that sending isn't blocking
try {
controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
} catch {
case e : Throwable => {
fail(e)
}
}
}
}

class MockChannelManager(private val controllerContext: ControllerContext,
config: KafkaConfig)
extends ControllerChannelManager(controllerContext, config) {
def stopSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
requestThread.isRunning.set(false)
requestThread.interrupt
requestThread.join
}

def shrinkBlockingQueue(brokerId: Int) {
val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, RequestOrResponse => Unit)](1)
val brokerInfo = this.brokerStateInfo(brokerId)
this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel,
brokerInfo.broker,
messageQueue,
brokerInfo.requestSendThread))
}

def resumeSendThread (brokerId: Int) {
this.startRequestSendThread(0)
}

def queueCapacity(brokerId: Int): Int = {
this.brokerStateInfo(brokerId).messageQueue.remainingCapacity
}

def queueSize(brokerId: Int): Int = {
this.brokerStateInfo(brokerId).messageQueue.size
}
}