diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ea8d13b8f8116..8f98a8c26c246 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -381,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging try { leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) => partitionStateInfos.foreach { case (topicPartition, state) => - val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + val typeOfRequest = + if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" + else "become-follower" stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, state.leaderIsrAndControllerEpoch, broker, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index dbce485feb0a6..956d1caba1264 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -146,18 +146,20 @@ object KafkaController extends Logging { } } -class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { +class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkUtils) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) - // have a separate scheduler for the controller to be able to start and stop independently of the - // kafka server - private val kafkaScheduler = new KafkaScheduler(1) + // have a separate scheduler for the controller to be able to start and stop independently of the kafka server + // visible for testing + private[controller] val kafkaScheduler = new KafkaScheduler(1) - private val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics()) + // visible for testing + private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, + _ => updateMetrics()) val topicDeletionManager = new TopicDeletionManager(this, eventManager) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) @@ -290,7 +292,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState /** * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is * required to clean up internal controller data structures - * Note:We need to resign as a controller out of the controller lock to avoid potential deadlock issue */ def onControllerResignation() { debug("Controller resigning, broker id %d".format(config.brokerId)) @@ -318,9 +319,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState replicaStateMachine.shutdown() deregisterBrokerChangeListener() - // reset controller context resetControllerContext() - brokerState.newState(RunningAsBroker) info("Broker %d resigned as the controller".format(config.brokerId)) } @@ -746,18 +745,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) { - val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions) - for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) + val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions) + for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) } private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = { - zkUtils.getLeaderAndIsrForPartition(topic, partition) match { - case Some(leaderAndIsr) => - val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r)) - replicasNotInIsr.isEmpty - case None => false - } + zkUtils.getLeaderAndIsrForPartition(topic, partition).map { leaderAndIsr => + replicas.forall(leaderAndIsr.isr.contains) + }.getOrElse(false) } private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition, @@ -824,22 +820,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) { - brokerRequestBatch.newBatch() updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { case Some(updatedLeaderIsrAndControllerEpoch) => try { + brokerRequestBatch.newBatch() brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { - case e : IllegalStateException => { - // Resign if the controller is in an illegal state - error("Forcing the controller to resign") - brokerRequestBatch.clear() - triggerControllerMove() - - throw e - } + case e: IllegalStateException => + handleIllegalState(e) } stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " + "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch, @@ -986,14 +976,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) brokerRequestBatch.sendRequestsToBrokers(epoch) } catch { - case e : IllegalStateException => { - // Resign if the controller is in an illegal state - error("Forcing the controller to resign") - brokerRequestBatch.clear() - triggerControllerMove() - - throw e - } + case e: IllegalStateException => + handleIllegalState(e) } } @@ -1425,39 +1409,32 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState controllerContext.partitionsOnBroker(id) .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)) - allPartitionsAndReplicationFactorOnBroker.foreach { - case(topicAndPartition, replicationFactor) => - controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => - if (replicationFactor > 1) { - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { - // If the broker leads the topic partition, transition the leader and update isr. Updates zk and - // notifies all affected brokers - partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, - controlledShutdownPartitionLeaderSelector) - } else { - // Stop the replica first. The state change below initiates ZK changes which should take some time - // before which the stop replica request should be completed (in most cases) - try { - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch) - } catch { - case e : IllegalStateException => { - // Resign if the controller is in an illegal state - error("Forcing the controller to resign") - brokerRequestBatch.clear() - triggerControllerMove() - - throw e - } - } - // If the broker is a follower, updates the isr in ZK and notifies the current leader - replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, - topicAndPartition.partition, id)), OfflineReplica) + allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor) => + controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => + if (replicationFactor > 1) { + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { + // If the broker leads the topic partition, transition the leader and update isr. Updates zk and + // notifies all affected brokers + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, + controlledShutdownPartitionLeaderSelector) + } else { + // Stop the replica first. The state change below initiates ZK changes which should take some time + // before which the stop replica request should be completed (in most cases) + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch) + } catch { + case e: IllegalStateException => + handleIllegalState(e) } + // If the broker is a follower, updates the isr in ZK and notifies the current leader + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, + topicAndPartition.partition, id)), OfflineReplica) } } + } } def replicatedPartitionsBrokerLeads() = { trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(",")) @@ -1557,7 +1534,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } } + // visible for testing + private[controller] def handleIllegalState(e: IllegalStateException): Nothing = { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + brokerRequestBatch.clear() + triggerControllerMove() + throw e + } + private def triggerControllerMove(): Unit = { + onControllerResignation() activeControllerId = -1 controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath) } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 4cffc13260609..5751e17de0c05 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -79,9 +79,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { brokerRequestBatch.newBatch() // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions // that belong to topics to be deleted - for((topicAndPartition, partitionState) <- partitionState + for ((topicAndPartition, partitionState) <- partitionState if !controller.topicDeletionManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) { - if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) + if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector, (new CallbackBuilder).build) } @@ -111,7 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch) - }catch { + } catch { case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 0759ed4b109ce..60b99908d5635 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -81,13 +81,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { */ def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState, callbacks: Callbacks = (new CallbackBuilder).build) { - if(replicas.nonEmpty) { + if (replicas.nonEmpty) { info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(","))) try { brokerRequestBatch.newBatch() replicas.foreach(r => handleStateChange(r, targetState, callbacks)) brokerRequestBatch.sendRequestsToBrokers(controller.epoch) - }catch { + } catch { case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c7dac0d28028b..0a87750d9495b 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -226,7 +226,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP replicaManager.startup() /* start kafka controller */ - kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix) + kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix) kafkaController.startup() adminManager = new AdminManager(config, metrics, metadataCache, zkUtils) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fc78501b4918a..ac497c4eb9139 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -701,8 +701,7 @@ class ZkUtils(val zkClient: ZkClient, cluster } - def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition]) - : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { + def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] for(topicAndPartition <- topicAndPartitions) { ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match { diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala index 727c4f34c6aea..ec9343e042560 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger import com.yammer.metrics.Metrics import com.yammer.metrics.core.Timer import kafka.utils.TestUtils -import org.easymock.{EasyMock, IAnswer} import org.junit.{After, Test} import org.junit.Assert.{assertEquals, fail} @@ -60,21 +59,13 @@ class ControllerEventManagerTest { val initialTimerCount = timer(metricName).count - // `ControllerEvent` is sealed so we use EasyMock to create a subclass - val eventMock = EasyMock.createMock(classOf[ControllerEvent]) - EasyMock.expect(eventMock.state).andReturn(controllerState) - // Only return from `process()` once we have checked `controllerEventManager.state` val latch = new CountDownLatch(1) - EasyMock.expect(eventMock.process()).andAnswer(new IAnswer[Unit]() { - def answer(): Unit = { - latch.await() - process() - } + val eventMock = ControllerTestUtils.createMockControllerEvent(controllerState, { () => + latch.await() + process() }) - EasyMock.replay(eventMock) - controllerEventManager.put(eventMock) TestUtils.waitUntilTrue(() => controllerEventManager.state == controllerState, s"Controller state is not $controllerState") diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index 83a315f3b1532..7a91bef570d78 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -18,18 +18,17 @@ package kafka.controller import java.util.Properties -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.CountDownLatch +import kafka.admin.AdminUtils import kafka.common.TopicAndPartition import kafka.integration.KafkaServerTestHarness -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.utils.Time -import org.apache.log4j.{Level, Logger} -import org.junit.{After, Ignore, Test} - -import scala.collection.mutable +import org.apache.log4j.Logger +import org.junit.{After, Test} +import org.junit.Assert._ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { val log = Logger.getLogger(classOf[ControllerFailoverTest]) @@ -54,119 +53,44 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { * See @link{https://issues.apache.org/jira/browse/KAFKA-2300} * for the background of this test case */ - @Ignore // This needs to be reworked as described here: https://github.com/apache/kafka/pull/2935#discussion_r114374412 @Test - def testMetadataUpdate() { - log.setLevel(Level.INFO) - var controller: KafkaServer = this.servers.head - // Find the current controller - val epochMap: mutable.Map[Int, Int] = mutable.Map.empty - for (server <- this.servers) { - epochMap += (server.config.brokerId -> server.kafkaController.epoch) - if(server.kafkaController.isActive) { - controller = server - } + def testHandleIllegalStateException() { + val initialController = servers.find(_.kafkaController.isActive).map(_.kafkaController).getOrElse { + fail("Could not find controller") } + val initialEpoch = initialController.epoch // Create topic with one partition - kafka.admin.AdminUtils.createTopic(controller.zkUtils, topic, 1, 1) + AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1) val topicPartition = TopicAndPartition("topic1", 0) - var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) - while (!partitions.contains(topicPartition)) { - partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) - Thread.sleep(100) - } - // Replace channel manager with our mock manager - controller.kafkaController.controllerContext.controllerChannelManager.shutdown() - val channelManager = new MockChannelManager(controller.kafkaController.controllerContext, - controller.kafkaController.config, metrics) - channelManager.startup() - controller.kafkaController.controllerContext.controllerChannelManager = channelManager - channelManager.shrinkBlockingQueue(0) - channelManager.stopSendThread(0) - // Spawn a new thread to block on the outgoing channel - // queue - val thread = new Thread(new Runnable { - def run() { - try { - controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) - log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) - controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) - log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) - } catch { - case _: Exception => log.info("Thread interrupted") - } + TestUtils.waitUntilTrue(() => + initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition), + s"Partition $topicPartition did not transition to online state") + + // Wait until we have verified that we have resigned + val latch = new CountDownLatch(1) + @volatile var expectedExceptionThrown = false + @volatile var unexpectedExceptionThrown: Option[Throwable] = None + val illegalStateEvent = ControllerTestUtils.createMockControllerEvent(ControllerState.BrokerChange, { () => + try initialController.handleIllegalState(new IllegalStateException("Thrown for test purposes")) + catch { + case _: IllegalStateException => expectedExceptionThrown = true + case t: Throwable => unexpectedExceptionThrown = Some(t) } + latch.await() }) - thread.setName("mythread") - thread.start() - while (thread.getState() != Thread.State.WAITING) { - Thread.sleep(100) - } - // Assume that the thread is WAITING because it is - // blocked on the queue, so interrupt and move forward - thread.interrupt() - thread.join() - channelManager.resumeSendThread(0) - // Wait and find current controller - var found = false - var counter = 0 - while (!found && counter < 10) { - for (server <- this.servers) { - val previousEpoch = epochMap get server.config.brokerId match { - case Some(epoch) => - epoch - case None => - val msg = String.format("Missing element in epoch map %s", epochMap.mkString(", ")) - throw new IllegalStateException(msg) - } + initialController.eventManager.put(illegalStateEvent) + // Check that we have shutdown the scheduler (via onControllerResigned) + TestUtils.waitUntilTrue(() => !initialController.kafkaScheduler.isStarted, "Scheduler was not shutdown") + TestUtils.waitUntilTrue(() => !initialController.isActive, "Controller did not become inactive") + latch.countDown() + assertTrue("IllegalStateException was not thrown", expectedExceptionThrown) + assertEquals("Unexpected exception thrown", None, unexpectedExceptionThrown) - if (server.kafkaController.isActive - && previousEpoch < server.kafkaController.epoch) { - controller = server - found = true - } + TestUtils.waitUntilTrue(() => { + servers.exists { server => + server.kafkaController.isActive && server.kafkaController.epoch > initialController.epoch } - if (!found) { - Thread.sleep(100) - counter += 1 - } - } - // Give it a shot to make sure that sending isn't blocking - try { - controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) - } catch { - case e : Throwable => { - fail(e) - } - } - } -} - -class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig, metrics: Metrics) - extends ControllerChannelManager(controllerContext, config, Time.SYSTEM, metrics) { - - def stopSendThread(brokerId: Int) { - val requestThread = brokerStateInfo(brokerId).requestSendThread - requestThread.isRunning.set(false) - requestThread.interrupt - requestThread.join - } - - def shrinkBlockingQueue(brokerId: Int) { - val messageQueue = new LinkedBlockingQueue[QueueItem](1) - val brokerInfo = this.brokerStateInfo(brokerId) - this.brokerStateInfo.put(brokerId, brokerInfo.copy(messageQueue = messageQueue)) - } - - def resumeSendThread (brokerId: Int) { - this.startRequestSendThread(0) - } - - def queueCapacity(brokerId: Int): Int = { - this.brokerStateInfo(brokerId).messageQueue.remainingCapacity - } + }, "Failed to find controller") - def queueSize(brokerId: Int): Int = { - this.brokerStateInfo(brokerId).messageQueue.size } } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 2df93c7de8281..d5f2fe0a2d612 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -264,7 +264,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { servers(otherBrokerId).shutdown() servers(otherBrokerId).awaitShutdown() TestUtils.waitUntilTrue(() => { - val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp)) + val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp)) leaderIsrAndControllerEpochMap.contains(tp) && isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId) @@ -284,7 +284,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { servers(1).shutdown() servers(1).awaitShutdown() TestUtils.waitUntilTrue(() => { - val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp)) + val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp)) leaderIsrAndControllerEpochMap.contains(tp) && isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List.empty @@ -301,7 +301,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { leaderEpoch: Int, message: String): Unit = { TestUtils.waitUntilTrue(() => { - val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp)) + val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp)) leaderIsrAndControllerEpochMap.contains(tp) && isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), controllerEpoch, leader, leaderEpoch) }, message) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala new file mode 100644 index 0000000000000..407297a871314 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.controller + +import org.easymock.{EasyMock, IAnswer} + +object ControllerTestUtils { + + /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */ + def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = { + val mockEvent = EasyMock.createMock(classOf[ControllerEvent]) + EasyMock.expect(mockEvent.state).andReturn(controllerState) + EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() { + def answer(): Unit = { + process() + } + }) + EasyMock.replay(mockEvent) + mockEvent + } +}