From dbd1bf3a91c3e15ed2d14bf941c41c87b8116608 Mon Sep 17 00:00:00 2001 From: flavio junqueira Date: Wed, 29 Jul 2015 18:07:51 +0100 Subject: [PATCH 1/5] KAFKA-2300: Error in controller log when broker tries to rejoin cluster --- .../controller/ControllerChannelManager.scala | 102 +++++---- .../kafka/controller/KafkaController.scala | 17 +- .../controller/ControllerFailoverTest.scala | 193 ++++++++++++++++++ 3 files changed, 267 insertions(+), 45 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 9f521fa7a54a8..0f9eebe6ab6d7 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse import collection.Set class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { - private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] + protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " @@ -100,7 +100,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } - private def startRequestSendThread(brokerId: Int) { + protected def startRequestSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread if(requestThread.getState == Thread.State.NEW) requestThread.start() @@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { - leaderAndIsrRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) - for (p <- partitionStateInfos) { - val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, - p._1._1, p._1._2)) + try { + leaderAndIsrRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) + for (p <- partitionStateInfos) { + val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, + p._2.leaderIsrAndControllerEpoch, correlationId, broker, + p._1._1, p._1._2)) + } + controller.sendRequest(broker, leaderAndIsrRequest, null) } - controller.sendRequest(broker, leaderAndIsrRequest, null) - } - leaderAndIsrRequestMap.clear() - updateMetadataRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - - val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 - val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, - correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) - controller.sendRequest(broker, updateMetadataRequest, null) - } - updateMetadataRequestMap.clear() - stopReplicaRequestMap foreach { case(broker, replicaInfoList) => - val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet - val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet - debug("The stop replica request (delete = true) sent to broker %d is %s" - .format(broker, stopReplicaWithDelete.mkString(","))) - debug("The stop replica request (delete = false) sent to broker %d is %s" - .format(broker, stopReplicaWithoutDelete.mkString(","))) - replicaInfoList.foreach { r => - val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, - Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) - controller.sendRequest(broker, stopReplicaRequest, r.callback) + leaderAndIsrRequestMap.clear() + updateMetadataRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + + val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 + val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, + correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, + correlationId, broker, p._1))) + controller.sendRequest(broker, updateMetadataRequest, null) + } + updateMetadataRequestMap.clear() + stopReplicaRequestMap foreach { case(broker, replicaInfoList) => + val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet + val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet + debug("The stop replica request (delete = true) sent to broker %d is %s" + .format(broker, stopReplicaWithDelete.mkString(","))) + debug("The stop replica request (delete = false) sent to broker %d is %s" + .format(broker, stopReplicaWithoutDelete.mkString(","))) + replicaInfoList.foreach { r => + val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, + Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) + controller.sendRequest(broker, stopReplicaRequest, r.callback) + } + } + stopReplicaRequestMap.clear() + } catch { + case e : Throwable => { + if(leaderAndIsrRequestMap.size > 0) { + error("Haven't been able to send leader and isr requests, current state of " + + "the map is %s".format(leaderAndIsrRequestMap.toString())) + } + if(updateMetadataRequestMap.size > 0) { + error("Haven't been able to send metadata update requests, current state of " + + "the map is %s".format(updateMetadataRequestMap.toString())) + } + if(stopReplicaRequestMap.size > 0) { + error("Haven't been able to send stop replica requests, current state of " + + "the map is %s".format(stopReplicaRequestMap.toString())) + } + throw new IllegalStateException(e) } } - stopReplicaRequestMap.clear() } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b4fc755641b9b..365736bf0ac4f 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -341,6 +341,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * required to clean up internal controller data structures */ def onControllerResignation() { + debug("Controller resigning, broker id %d".format(config.brokerId)) // de-register listeners deregisterIsrChangeNotificationListener() deregisterReassignedPartitionsListener() @@ -998,9 +999,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * @param brokers The brokers that the update metadata request should be sent to */ def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) { - brokerRequestBatch.newBatch() - brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } } /** diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala new file mode 100644 index 0000000000000..0fd07e3de2f69 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.controller + +import java.util.concurrent.LinkedBlockingQueue +import java.util.Properties + +import junit.framework.Assert._ +import org.scalatest.junit.JUnit3Suite + +import org.junit.{Test, After, Before} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.serialize.ZkSerializer +import org.apache.log4j.{Logger, Level} + +import kafka.api.RequestOrResponse +import kafka.common.TopicAndPartition +import kafka.controller.ControllerBrokerStateInfo +import kafka.controller.ControllerContext +import kafka.controller.ControllerChannelManager +import kafka.controller.KafkaController +import kafka.controller.OnlinePartition +import kafka.integration.KafkaServerTestHarness +import kafka.server.BrokerState +import kafka.server.KafkaConfig +import kafka.server.KafkaServer +import kafka.server.RunningAsController +import kafka.utils._ +import kafka.utils.TestUtils._ + +import scala.collection.Map +import scala.collection.mutable + + +class ControllerFailoverTest extends KafkaServerTestHarness with Logging { + val log = Logger.getLogger(classOf[ControllerFailoverTest]) + //val zkConnect = TestZKUtils.zookeeperConnect + val numNodes = 2 + val numParts = 1 + val msgQueueSize = 1 + val topic = "topic1" + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) + + override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) + .map(KafkaConfig.fromProps(_, overridingProps)) + + override def setUp() { + super.setUp() + } + + override def tearDown() { + super.tearDown() + } + + /** + * See @link{https://issues.apache.org/jira/browse/KAFKA-2300} + * for the background of this test case + */ + def testMetadataUpdate() { + log.setLevel(Level.INFO) + var controller : KafkaServer = this.servers.head; + // Find the current controller + val epochMap : mutable.Map[Int, Int] = mutable.Map.empty + for(server <- this.servers) { + epochMap += (server.config.brokerId -> server.kafkaController.epoch) + if(server.kafkaController.isActive()) { + controller = server + } + } + // Create topic with one partition + kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1) + val topicPartition = TopicAndPartition("topic1", 0) + var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) + while(!partitions.contains(topicPartition)) { + partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) + Thread.sleep(100) + } + // Replace channel manager with our mock manager + controller.kafkaController.controllerContext.controllerChannelManager.shutdown() + val channelManager = new MockChannelManager(controller.kafkaController.controllerContext, + controller.kafkaController.config) + channelManager.startup() + controller.kafkaController.controllerContext.controllerChannelManager = channelManager + channelManager.shrinkBlockingQueue(0) + channelManager.stopSendThread(0) + // Spawn a new thread to block on the outgoing channel + // queue + val thread = new Thread(new Runnable { + def run() { + try { + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) + } catch { + case e : Exception => { + log.info("Thread interrupted") + } + } + } + }) + thread.setName("mythread") + thread.start() + while(thread.getState() != Thread.State.WAITING){ + Thread.sleep(100) + } + // Assume that the thread is WAITING because it is + // blocked on the queue, so interrupt and move forward + thread.interrupt() + thread.join() + channelManager.resumeSendThread(0) + // Wait and find current controller + var found = false + var counter = 0 + while(!found && counter < 10) { + for(server <- this.servers) { + val previousEpoch = (epochMap get server.config.brokerId) match { + case Some(epoch) => + epoch + case None => + val msg = String.format("Missing element in epoch map %s", epochMap.mkString(", ")) + throw new IllegalStateException(msg) + } + + if(server.kafkaController.isActive() + && (previousEpoch) < server.kafkaController.epoch) { + controller = server + found = true + } + } + if(!found){ + Thread.sleep(100) + counter += 1 + } + } + // Give it a shot to make sure that sending isn't blocking + try { + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + } catch { + case e : Throwable => { + fail(e) + } + } + } +} + +class MockChannelManager(private val controllerContext: ControllerContext, + config: KafkaConfig) + extends ControllerChannelManager(controllerContext, config) { + def stopSendThread(brokerId: Int) { + val requestThread = brokerStateInfo(brokerId).requestSendThread + requestThread.isRunning.set(false) + requestThread.interrupt() + requestThread.join() + } + + def shrinkBlockingQueue(brokerId: Int) { + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](1) + val brokerInfo = this.brokerStateInfo(brokerId) + this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel, + brokerInfo.broker, + messageQueue, + brokerInfo.requestSendThread)) + } + + def resumeSendThread (brokerId: Int) { + this.startRequestSendThread(0) + } + + def queueCapacity(broker : Int): Int = { + this.brokerStateInfo(broker).messageQueue.remainingCapacity() + } + + def queueSize(broker : Int): Int = { + this.brokerStateInfo(broker).messageQueue.size() + } +} \ No newline at end of file From 9b6390ae1c474b90689ff53036120b4be44a3f8f Mon Sep 17 00:00:00 2001 From: flavio junqueira Date: Wed, 29 Jul 2015 23:36:16 +0100 Subject: [PATCH 2/5] Updated package name and removed unnecessary imports. --- .../unit/kafka/controller/ControllerFailoverTest.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index 0fd07e3de2f69..b6dda975f938e 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package unit.kafka.controller +package kafka.controller import java.util.concurrent.LinkedBlockingQueue import java.util.Properties @@ -30,11 +30,6 @@ import org.apache.log4j.{Logger, Level} import kafka.api.RequestOrResponse import kafka.common.TopicAndPartition -import kafka.controller.ControllerBrokerStateInfo -import kafka.controller.ControllerContext -import kafka.controller.ControllerChannelManager -import kafka.controller.KafkaController -import kafka.controller.OnlinePartition import kafka.integration.KafkaServerTestHarness import kafka.server.BrokerState import kafka.server.KafkaConfig From f1261b15b007d08e87d0ed56f7ec3fecbeddc276 Mon Sep 17 00:00:00 2001 From: flavio junqueira Date: Thu, 30 Jul 2015 10:57:34 +0100 Subject: [PATCH 3/5] Fixed some style issues. --- .../controller/ControllerChannelManager.scala | 12 +++---- .../controller/ControllerFailoverTest.scala | 33 +++++++++---------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 0f9eebe6ab6d7..2e1eacc617562 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -327,16 +327,16 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } catch { case e : Throwable => { if(leaderAndIsrRequestMap.size > 0) { - error("Haven't been able to send leader and isr requests, current state of " + - "the map is %s".format(leaderAndIsrRequestMap.toString())) + error(s"Haven't been able to send leader and isr requests, current state of " + + "the map is %leaderAndIsrRequestMap") } if(updateMetadataRequestMap.size > 0) { - error("Haven't been able to send metadata update requests, current state of " + - "the map is %s".format(updateMetadataRequestMap.toString())) + error(s"Haven't been able to send metadata update requests, current state of " + + "the map is %updateMetadataRequestMap") } if(stopReplicaRequestMap.size > 0) { - error("Haven't been able to send stop replica requests, current state of " + - "the map is %s".format(stopReplicaRequestMap.toString())) + error(s"Haven't been able to send stop replica requests, current state of " + + "the map is %stopReplicaRequestMap") } throw new IllegalStateException(e) } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index b6dda975f938e..206a7c30db1c0 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -44,7 +44,6 @@ import scala.collection.mutable class ControllerFailoverTest extends KafkaServerTestHarness with Logging { val log = Logger.getLogger(classOf[ControllerFailoverTest]) - //val zkConnect = TestZKUtils.zookeeperConnect val numNodes = 2 val numParts = 1 val msgQueueSize = 1 @@ -69,10 +68,10 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { */ def testMetadataUpdate() { log.setLevel(Level.INFO) - var controller : KafkaServer = this.servers.head; + var controller: KafkaServer = this.servers.head; // Find the current controller - val epochMap : mutable.Map[Int, Int] = mutable.Map.empty - for(server <- this.servers) { + val epochMap: mutable.Map[Int, Int] = mutable.Map.empty + for (server <- this.servers) { epochMap += (server.config.brokerId -> server.kafkaController.epoch) if(server.kafkaController.isActive()) { controller = server @@ -82,7 +81,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1) val topicPartition = TopicAndPartition("topic1", 0) var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) - while(!partitions.contains(topicPartition)) { + while (!partitions.contains(topicPartition)) { partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) Thread.sleep(100) } @@ -112,7 +111,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { }) thread.setName("mythread") thread.start() - while(thread.getState() != Thread.State.WAITING){ + while (thread.getState() != Thread.State.WAITING) { Thread.sleep(100) } // Assume that the thread is WAITING because it is @@ -123,8 +122,8 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { // Wait and find current controller var found = false var counter = 0 - while(!found && counter < 10) { - for(server <- this.servers) { + while (!found && counter < 10) { + for (server <- this.servers) { val previousEpoch = (epochMap get server.config.brokerId) match { case Some(epoch) => epoch @@ -133,13 +132,13 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { throw new IllegalStateException(msg) } - if(server.kafkaController.isActive() + if (server.kafkaController.isActive && (previousEpoch) < server.kafkaController.epoch) { controller = server found = true } } - if(!found){ + if (!found) { Thread.sleep(100) counter += 1 } @@ -161,12 +160,12 @@ class MockChannelManager(private val controllerContext: ControllerContext, def stopSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread requestThread.isRunning.set(false) - requestThread.interrupt() - requestThread.join() + requestThread.interrupt + requestThread.join } def shrinkBlockingQueue(brokerId: Int) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](1) + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, RequestOrResponse => Unit)](1) val brokerInfo = this.brokerStateInfo(brokerId) this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel, brokerInfo.broker, @@ -178,11 +177,11 @@ class MockChannelManager(private val controllerContext: ControllerContext, this.startRequestSendThread(0) } - def queueCapacity(broker : Int): Int = { - this.brokerStateInfo(broker).messageQueue.remainingCapacity() + def queueCapacity(brokerId: Int): Int = { + this.brokerStateInfo(brokerId).messageQueue.remainingCapacity } - def queueSize(broker : Int): Int = { - this.brokerStateInfo(broker).messageQueue.size() + def queueSize(brokerId: Int): Int = { + this.brokerStateInfo(brokerId).messageQueue.size } } \ No newline at end of file From aa6ec90b15ac6d0e0f9e5a58d4fed7b1909d50c2 Mon Sep 17 00:00:00 2001 From: flavio junqueira Date: Wed, 12 Aug 2015 17:37:07 +0100 Subject: [PATCH 4/5] KAFKA-2300: Wrapped all occurences of sendRequestToBrokers with try/catch and fixed string typo. --- .../controller/ControllerChannelManager.scala | 6 ++-- .../kafka/controller/KafkaController.scala | 35 ++++++++++++++----- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 2e1eacc617562..ef2892b7128c5 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -328,15 +328,15 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging case e : Throwable => { if(leaderAndIsrRequestMap.size > 0) { error(s"Haven't been able to send leader and isr requests, current state of " + - "the map is %leaderAndIsrRequestMap") + s"the map is $leaderAndIsrRequestMap") } if(updateMetadataRequestMap.size > 0) { error(s"Haven't been able to send metadata update requests, current state of " + - "the map is %updateMetadataRequestMap") + s"the map is $updateMetadataRequestMap") } if(stopReplicaRequestMap.size > 0) { error(s"Haven't been able to send stop replica requests, current state of " + - "the map is %stopReplicaRequestMap") + s"the map is $stopReplicaRequestMap") } throw new IllegalStateException(e) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 365736bf0ac4f..96216537d0733 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -263,11 +263,20 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } else { // Stop the replica first. The state change below initiates ZK changes which should take some time // before which the stop replica request should be completed (in most cases) - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) - + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } // If the broker is a follower, updates the isr in ZK and notifies the current leader replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica) @@ -889,9 +898,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt brokerRequestBatch.newBatch() updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { case Some(updatedLeaderIsrAndControllerEpoch) => - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, - topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + try { + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, + topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " + "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition)) From 7bd2edb83054a9be72dda3425930a68ea3ad494b Mon Sep 17 00:00:00 2001 From: flavio junqueira Date: Wed, 12 Aug 2015 17:40:13 +0100 Subject: [PATCH 5/5] KAFKA-2300: Removed unnecessary s" occurrences. --- .../scala/kafka/controller/ControllerChannelManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ef2892b7128c5..4396b6e12d988 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -327,15 +327,15 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } catch { case e : Throwable => { if(leaderAndIsrRequestMap.size > 0) { - error(s"Haven't been able to send leader and isr requests, current state of " + + error("Haven't been able to send leader and isr requests, current state of " + s"the map is $leaderAndIsrRequestMap") } if(updateMetadataRequestMap.size > 0) { - error(s"Haven't been able to send metadata update requests, current state of " + + error("Haven't been able to send metadata update requests, current state of " + s"the map is $updateMetadataRequestMap") } if(stopReplicaRequestMap.size > 0) { - error(s"Haven't been able to send stop replica requests, current state of " + + error("Haven't been able to send stop replica requests, current state of " + s"the map is $stopReplicaRequestMap") } throw new IllegalStateException(e)