diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 9f521fa7a54a8..4396b6e12d988 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse import collection.Set class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { - private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] + protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " @@ -100,7 +100,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } - private def startRequestSendThread(brokerId: Int) { + protected def startRequestSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread if(requestThread.getState == Thread.State.NEW) requestThread.start() @@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { - leaderAndIsrRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) - for (p <- partitionStateInfos) { - val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, - p._1._1, p._1._2)) + try { + leaderAndIsrRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) + for (p <- partitionStateInfos) { + val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, + p._2.leaderIsrAndControllerEpoch, correlationId, broker, + p._1._1, p._1._2)) + } + controller.sendRequest(broker, leaderAndIsrRequest, null) } - controller.sendRequest(broker, leaderAndIsrRequest, null) - } - leaderAndIsrRequestMap.clear() - updateMetadataRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - - val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 - val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, - correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) - controller.sendRequest(broker, updateMetadataRequest, null) - } - updateMetadataRequestMap.clear() - stopReplicaRequestMap foreach { case(broker, replicaInfoList) => - val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet - val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet - debug("The stop replica request (delete = true) sent to broker %d is %s" - .format(broker, stopReplicaWithDelete.mkString(","))) - debug("The stop replica request (delete = false) sent to broker %d is %s" - .format(broker, stopReplicaWithoutDelete.mkString(","))) - replicaInfoList.foreach { r => - val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, - Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) - controller.sendRequest(broker, stopReplicaRequest, r.callback) + leaderAndIsrRequestMap.clear() + updateMetadataRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + + val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 + val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, + correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, + correlationId, broker, p._1))) + controller.sendRequest(broker, updateMetadataRequest, null) + } + updateMetadataRequestMap.clear() + stopReplicaRequestMap foreach { case(broker, replicaInfoList) => + val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet + val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet + debug("The stop replica request (delete = true) sent to broker %d is %s" + .format(broker, stopReplicaWithDelete.mkString(","))) + debug("The stop replica request (delete = false) sent to broker %d is %s" + .format(broker, stopReplicaWithoutDelete.mkString(","))) + replicaInfoList.foreach { r => + val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, + Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) + controller.sendRequest(broker, stopReplicaRequest, r.callback) + } + } + stopReplicaRequestMap.clear() + } catch { + case e : Throwable => { + if(leaderAndIsrRequestMap.size > 0) { + error("Haven't been able to send leader and isr requests, current state of " + + s"the map is $leaderAndIsrRequestMap") + } + if(updateMetadataRequestMap.size > 0) { + error("Haven't been able to send metadata update requests, current state of " + + s"the map is $updateMetadataRequestMap") + } + if(stopReplicaRequestMap.size > 0) { + error("Haven't been able to send stop replica requests, current state of " + + s"the map is $stopReplicaRequestMap") + } + throw new IllegalStateException(e) } } - stopReplicaRequestMap.clear() } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b4fc755641b9b..96216537d0733 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -263,11 +263,20 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } else { // Stop the replica first. The state change below initiates ZK changes which should take some time // before which the stop replica request should be completed (in most cases) - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) - + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } // If the broker is a follower, updates the isr in ZK and notifies the current leader replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica) @@ -341,6 +350,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * required to clean up internal controller data structures */ def onControllerResignation() { + debug("Controller resigning, broker id %d".format(config.brokerId)) // de-register listeners deregisterIsrChangeNotificationListener() deregisterReassignedPartitionsListener() @@ -888,9 +898,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt brokerRequestBatch.newBatch() updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { case Some(updatedLeaderIsrAndControllerEpoch) => - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, - topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + try { + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, + topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " + "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition)) @@ -998,9 +1018,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * @param brokers The brokers that the update metadata request should be sent to */ def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) { - brokerRequestBatch.newBatch() - brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } } /** diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala new file mode 100644 index 0000000000000..206a7c30db1c0 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import java.util.concurrent.LinkedBlockingQueue +import java.util.Properties + +import junit.framework.Assert._ +import org.scalatest.junit.JUnit3Suite + +import org.junit.{Test, After, Before} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.serialize.ZkSerializer +import org.apache.log4j.{Logger, Level} + +import kafka.api.RequestOrResponse +import kafka.common.TopicAndPartition +import kafka.integration.KafkaServerTestHarness +import kafka.server.BrokerState +import kafka.server.KafkaConfig +import kafka.server.KafkaServer +import kafka.server.RunningAsController +import kafka.utils._ +import kafka.utils.TestUtils._ + +import scala.collection.Map +import scala.collection.mutable + + +class ControllerFailoverTest extends KafkaServerTestHarness with Logging { + val log = Logger.getLogger(classOf[ControllerFailoverTest]) + val numNodes = 2 + val numParts = 1 + val msgQueueSize = 1 + val topic = "topic1" + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) + + override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) + .map(KafkaConfig.fromProps(_, overridingProps)) + + override def setUp() { + super.setUp() + } + + override def tearDown() { + super.tearDown() + } + + /** + * See @link{https://issues.apache.org/jira/browse/KAFKA-2300} + * for the background of this test case + */ + def testMetadataUpdate() { + log.setLevel(Level.INFO) + var controller: KafkaServer = this.servers.head; + // Find the current controller + val epochMap: mutable.Map[Int, Int] = mutable.Map.empty + for (server <- this.servers) { + epochMap += (server.config.brokerId -> server.kafkaController.epoch) + if(server.kafkaController.isActive()) { + controller = server + } + } + // Create topic with one partition + kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1) + val topicPartition = TopicAndPartition("topic1", 0) + var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) + while (!partitions.contains(topicPartition)) { + partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) + Thread.sleep(100) + } + // Replace channel manager with our mock manager + controller.kafkaController.controllerContext.controllerChannelManager.shutdown() + val channelManager = new MockChannelManager(controller.kafkaController.controllerContext, + controller.kafkaController.config) + channelManager.startup() + controller.kafkaController.controllerContext.controllerChannelManager = channelManager + channelManager.shrinkBlockingQueue(0) + channelManager.stopSendThread(0) + // Spawn a new thread to block on the outgoing channel + // queue + val thread = new Thread(new Runnable { + def run() { + try { + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) + } catch { + case e : Exception => { + log.info("Thread interrupted") + } + } + } + }) + thread.setName("mythread") + thread.start() + while (thread.getState() != Thread.State.WAITING) { + Thread.sleep(100) + } + // Assume that the thread is WAITING because it is + // blocked on the queue, so interrupt and move forward + thread.interrupt() + thread.join() + channelManager.resumeSendThread(0) + // Wait and find current controller + var found = false + var counter = 0 + while (!found && counter < 10) { + for (server <- this.servers) { + val previousEpoch = (epochMap get server.config.brokerId) match { + case Some(epoch) => + epoch + case None => + val msg = String.format("Missing element in epoch map %s", epochMap.mkString(", ")) + throw new IllegalStateException(msg) + } + + if (server.kafkaController.isActive + && (previousEpoch) < server.kafkaController.epoch) { + controller = server + found = true + } + } + if (!found) { + Thread.sleep(100) + counter += 1 + } + } + // Give it a shot to make sure that sending isn't blocking + try { + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + } catch { + case e : Throwable => { + fail(e) + } + } + } +} + +class MockChannelManager(private val controllerContext: ControllerContext, + config: KafkaConfig) + extends ControllerChannelManager(controllerContext, config) { + def stopSendThread(brokerId: Int) { + val requestThread = brokerStateInfo(brokerId).requestSendThread + requestThread.isRunning.set(false) + requestThread.interrupt + requestThread.join + } + + def shrinkBlockingQueue(brokerId: Int) { + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, RequestOrResponse => Unit)](1) + val brokerInfo = this.brokerStateInfo(brokerId) + this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel, + brokerInfo.broker, + messageQueue, + brokerInfo.requestSendThread)) + } + + def resumeSendThread (brokerId: Int) { + this.startRequestSendThread(0) + } + + def queueCapacity(brokerId: Int): Int = { + this.brokerStateInfo(brokerId).messageQueue.remainingCapacity + } + + def queueSize(brokerId: Int): Int = { + this.brokerStateInfo(brokerId).messageQueue.size + } +} \ No newline at end of file