From 39af5068720d01568042d5ef89651541aec569df Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 14 Nov 2019 15:42:11 +0000 Subject: [PATCH 1/6] MINOR: Refactor controller partition reassignment logic into separate class This patch adds a ReassignmentManager class which encapsulates most of the nitty-gritty details of reassigning a partition. Splitting the logic helps with testability and this patch leverages that to add unit tests for partition reassignments --- .../kafka/controller/KafkaController.scala | 486 +--------------- .../controller/ReassignmentsManager.scala | 537 ++++++++++++++++++ .../controller/TopicDeletionManager.scala | 1 - .../controller/ReassignmentsManagerTest.scala | 228 ++++++++ 4 files changed, 777 insertions(+), 475 deletions(-) create mode 100644 core/src/main/scala/kafka/controller/ReassignmentsManager.scala create mode 100644 core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7b28202e6da81..c6effe7e566e5 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -95,6 +95,8 @@ class KafkaController(val config: KafkaConfig, new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine, partitionStateMachine, new ControllerDeletionClient(this, zkClient)) + val reassignmentsManager = new ReassignmentsManager(controllerContext, zkClient, topicDeletionManager, + replicaStateMachine, partitionStateMachine, eventManager, brokerRequestBatch, stateChangeLogger) private val controllerChangeHandler = new ControllerChangeHandler(eventManager) private val brokerChangeHandler = new BrokerChangeHandler(eventManager) @@ -424,7 +426,7 @@ class KafkaController(val config: KafkaConfig, // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // check if reassignment of some partitions need to be restarted - maybeResumeReassignments { (_, assignment) => + reassignmentsManager.maybeResumeReassignments { (_, assignment) => assignment.targetReplicas.exists(newBrokersSet.contains) } // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists @@ -439,14 +441,6 @@ class KafkaController(val config: KafkaConfig, registerBrokerModificationsHandler(newBrokers) } - private def maybeResumeReassignments(shouldResume: (TopicPartition, ReplicaAssignment) => Boolean): Unit = { - controllerContext.partitionsBeingReassigned.foreach { tp => - val currentAssignment = controllerContext.partitionFullReplicaAssignment(tp) - if (shouldResume(tp, currentAssignment)) - onPartitionReassignment(tp, currentAssignment) - } - } - private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = { debug(s"Register BrokerModifications handler for $brokerIds") brokerIds.foreach { brokerId => @@ -544,194 +538,6 @@ class KafkaController(val config: KafkaConfig, replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica) } - /** - * This callback is invoked: - * 1. By the AlterPartitionReassignments API - * 2. By the reassigned partitions listener which is triggered when the /admin/reassign/partitions znode is created - * 3. When an ongoing reassignment finishes - this is detected by a change in the partition's ISR znode - * 4. Whenever a new broker comes up which is part of an ongoing reassignment - * 5. On controller startup/failover - * - * Reassigning replicas for a partition goes through a few steps listed in the code. - * RS = current assigned replica set - * ORS = Original replica set for partition - * TRS = Reassigned (target) replica set - * AR = The replicas we are adding as part of this reassignment - * RR = The replicas we are removing as part of this reassignment - * - * A reassignment may have up to three phases, each with its own steps: - - * Phase U (Assignment update): Regardless of the trigger, the first step is in the reassignment process - * is to update the existing assignment state. We always update the state in Zookeeper before - * we update memory so that it can be resumed upon controller fail-over. - * - * U1. Update ZK with RS = ORS + TRS, AR = TRS - ORS, RR = ORS - TRS. - * U2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS - * U3. If we are cancelling or replacing an existing reassignment, send StopReplica to all members - * of AR in the original reassignment if they are not in TRS from the new assignment - * - * To complete the reassignment, we need to bring the new replicas into sync, so depending on the state - * of the ISR, we will execute one of the following steps. - * - * Phase A (when TRS != ISR): The reassignment is not yet complete - * - * A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS. - * A2. Start new replicas AR by moving replicas in AR to NewReplica state. - * - * Phase B (when TRS = ISR): The reassignment is complete - * - * B1. Move all replicas in AR to OnlineReplica state. - * B2. Set RS = TRS, AR = [], RR = [] in memory. - * B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr. - * If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. - * We may send the LeaderAndIsr to more than the TRS replicas due to the - * way the partition state machine works (it reads replicas from ZK) - * B4. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state change, we shrink the - * isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. - * After that, we send a StopReplica (delete = false) to the replicas in RR. - * B5. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica (delete = true) to - * the replicas in RR to physically delete the replicas on disk. - * B6. Update ZK with RS=TRS, AR=[], RR=[]. - * B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. - * B8. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. - * - * In general, there are two goals we want to aim for: - * 1. Every replica present in the replica set of a LeaderAndIsrRequest gets the request sent to it - * 2. Replicas that are removed from a partition's assignment get StopReplica sent to them - * - * For example, if ORS = {1,2,3} and TRS = {4,5,6}, the values in the topic and leader/isr paths in ZK - * may go through the following transitions. - * RS AR RR leader isr - * {1,2,3} {} {} 1 {1,2,3} (initial state) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3} (step A2) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3,4,5,6} (phase B) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {1,2,3,4,5,6} (step B3) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {4,5,6} (step B4) - * {4,5,6} {} {} 4 {4,5,6} (step B6) - * - * Note that we have to update RS in ZK with TRS last since it's the only place where we store ORS persistently. - * This way, if the controller crashes before that step, we can still recover. - */ - private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { - // While a reassignment is in progress, deletion is not allowed - topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress") - - updateCurrentReassignment(topicPartition, reassignment) - - val addingReplicas = reassignment.addingReplicas - val removingReplicas = reassignment.removingReplicas - - if (!isReassignmentComplete(topicPartition, reassignment)) { - // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). - updateLeaderEpochAndSendRequest(topicPartition, reassignment) - // A2. replicas in AR -> NewReplica - startNewReplicasForReassignedPartition(topicPartition, addingReplicas) - } else { - // B1. replicas in AR -> OnlineReplica - replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica) - // B2. Set RS = TRS, AR = [], RR = [] in memory. - val completedReassignment = ReplicaAssignment(reassignment.targetReplicas) - controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment) - // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and - // a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS - moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment) - // B4. replicas in RR -> Offline (force those replicas out of isr) - // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted) - stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas) - // B6. Update ZK with RS = TRS, AR = [], RR = []. - updateReplicaAssignmentForPartition(topicPartition, completedReassignment) - // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it. - removePartitionFromReassigningPartitions(topicPartition, completedReassignment) - // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) - // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed - topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic)) - } - } - - /** - * Update the current assignment state in Zookeeper and in memory. If a reassignment is already in - * progress, then the new reassignment will supplant it and some replicas will be shutdown. - * - * Note that due to the way we compute the original replica set, we cannot guarantee that a - * cancellation will restore the original replica order. Target replicas are always listed - * first in the replica set in the desired order, which means we have no way to get to the - * original order if the reassignment overlaps with the current assignment. For example, - * with an initial assignment of [1, 2, 3] and a reassignment of [3, 4, 2], then the replicas - * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If the reassignment - * is cancelled, there is no way to restore the original order. - * - * @param topicPartition The reassigning partition - * @param reassignment The new reassignment - */ - private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { - val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - - if (currentAssignment != reassignment) { - debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment") - - // U1. Update assignment state in zookeeper - updateReplicaAssignmentForPartition(topicPartition, reassignment) - // U2. Update assignment state in memory - controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment) - - // If there is a reassignment already in progress, then some of the currently adding replicas - // may be eligible for immediate removal, in which case we need to stop the replicas. - val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas) - if (unneededReplicas.nonEmpty) - stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) - } - - val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) - zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) - - controllerContext.partitionsBeingReassigned.add(topicPartition) - } - - /** - * Trigger a partition reassignment provided that the topic exists and is not being deleted. - * - * This is called when a reassignment is initially received either through Zookeeper or through the - * AlterPartitionReassignments API - * - * The `partitionsBeingReassigned` field in the controller context will be updated by this - * call after the reassignment completes validation and is successfully stored in the topic - * assignment zNode. - * - * @param reassignments The reassignments to begin processing - * @return A map of any errors in the reassignment. If the error is NONE for a given partition, - * then the reassignment was submitted successfully. - */ - private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = { - reassignments.map { case (tp, reassignment) => - val topic = tp.topic - - val apiError = if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) { - info(s"Skipping reassignment of $tp since the topic is currently being deleted") - new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") - } else { - val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) - if (assignedReplicas.nonEmpty) { - try { - onPartitionReassignment(tp, reassignment) - ApiError.NONE - } catch { - case e: ControllerMovedException => - info(s"Failed completing reassignment of partition $tp because controller has moved to another broker") - throw e - case e: Throwable => - error(s"Error completing reassignment of partition $tp", e) - new ApiError(Errors.UNKNOWN_SERVER_ERROR) - } - } else { - new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") - } - } - - tp -> apiError - } - } - /** * Attempt to elect a replica as leader for each of the given partitions. * @param partitions The partitions to have a new leader elected @@ -836,7 +642,7 @@ class KafkaController(val config: KafkaConfig, // New reassignments may have been submitted through Zookeeper while the controller was failing over val zkPartitionsResumed = processZkPartitionReassignment() // We may also have some API-based reassignments that need to be restarted - maybeResumeReassignments { (tp, _) => + reassignmentsManager.maybeResumeReassignments { (tp, _) => !zkPartitionsResumed.contains(tp) } } @@ -861,99 +667,6 @@ class KafkaController(val config: KafkaConfig, } } - private def isReassignmentComplete(partition: TopicPartition, assignment: ReplicaAssignment): Boolean = { - if (!assignment.isBeingReassigned) { - true - } else { - zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch => - val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr.toSet - val targetReplicas = assignment.targetReplicas.toSet - targetReplicas.subsetOf(isr) - } - } - } - - private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition, - newAssignment: ReplicaAssignment): Unit = { - val reassignedReplicas = newAssignment.replicas - val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader - - if (!reassignedReplicas.contains(currentLeader)) { - info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + - s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader") - // move the leader to one of the alive and caught up new replicas - partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) - } else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) { - info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + - s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive") - // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest - updateLeaderEpochAndSendRequest(topicPartition, newAssignment) - } else { - info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + - s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead") - partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) - } - } - - private def stopRemovedReplicasOfReassignedPartition(topicPartition: TopicPartition, - removedReplicas: Seq[Int]): Unit = { - // first move the replica to offline state (the controller removes it from the ISR) - val replicasToBeDeleted = removedReplicas.map(PartitionAndReplica(topicPartition, _)) - replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica) - // send stop replica command to the old replicas - replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted) - // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed - replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful) - replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica) - } - - private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = { - var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) - topicAssignment += topicPartition -> assignment - - val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion) - setDataResponse.resultCode match { - case Code.OK => - info(s"Successfully updated assignment of partition $topicPartition to $assignment") - case Code.NONODE => - throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " + - "has no current assignment") - case _ => throw new KafkaException(setDataResponse.resultException.get) - } - } - - private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = { - // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned - // replicas list - newReplicas.foreach { replica => - replicaStateMachine.handleStateChanges(Seq(PartitionAndReplica(topicPartition, replica)), NewReplica) - } - } - - private def updateLeaderEpochAndSendRequest(topicPartition: TopicPartition, - assignment: ReplicaAssignment): Unit = { - val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) - updateLeaderEpoch(topicPartition) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - try { - brokerRequestBatch.newBatch() - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas, topicPartition, - updatedLeaderIsrAndControllerEpoch, assignment, isNew = false) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) - } catch { - case e: IllegalStateException => - handleIllegalState(e) - } - stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " + - s"new replica assignment $assignment to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " + - s"for partition being reassigned $topicPartition") - - case None => // fail the reassignment - stateChangeLog.error(s"Failed to send LeaderAndIsr request with new replica assignment " + - s"$assignment to leader for partition being reassigned $topicPartition") - } - } - private def registerPartitionModificationsHandlers(topics: Seq[String]) = { topics.foreach { topic => val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic) @@ -975,48 +688,6 @@ class KafkaController(val config: KafkaConfig, } } - private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition, - assignment: ReplicaAssignment): Unit = { - if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val path = TopicPartitionStateZNode.path(topicPartition) - zkClient.unregisterZNodeChangeHandler(path) - maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas) - controllerContext.partitionsBeingReassigned.remove(topicPartition) - } else { - throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory") - } - } - - /** - * Remove partitions from an active zk-based reassignment (if one exists). - * - * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed - */ - private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = { - if (!zkClient.reassignPartitionsInProgress()) - return - - val reassigningPartitions = zkClient.getPartitionReassignment() - val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) => - shouldRemoveReassignment(tp, replicas) - } - info(s"Removing partitions $removingPartitions from the list of reassigned partitions in zookeeper") - - // write the new list to zookeeper - if (updatedPartitionsBeingReassigned.isEmpty) { - info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}") - zkClient.deletePartitionReassignment(controllerContext.epochZkVersion) - // Ensure we detect future reassignments - eventManager.put(ZkPartitionReassignment) - } else { - try { - zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion) - } catch { - case e: KeeperException => throw new AdminOperationException(e) - } - } - } - private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition], isTriggeredByAutoRebalance : Boolean): Unit = { for (partition <- partitionsToBeRemoved) { @@ -1054,49 +725,6 @@ class KafkaController(val config: KafkaConfig, } } - /** - * Does not change leader or isr, but just increments the leader epoch - * - * @param partition partition - * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. - */ - private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = { - debug(s"Updating leader epoch for partition $partition") - var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None - var zkWriteCompleteOrUnnecessary = false - while (!zkWriteCompleteOrUnnecessary) { - // refresh leader and isr from zookeeper again - zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { - case Some(leaderIsrAndControllerEpoch) => - val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - if (controllerEpoch > epoch) - throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " + - s"means the current controller with epoch $epoch went through a soft failure and another " + - s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller") - // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded - // assigned replica list - val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion - // update the new leadership decision in zookeeper or retry - val UpdateLeaderAndIsrResult(finishedUpdates, _) = - zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion) - - finishedUpdates.headOption.map { - case (partition, Right(leaderAndIsr)) => - finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) - info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}") - true - case (_, Left(e)) => - throw e - }.getOrElse(false) - case None => - throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " + - "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist") - } - } - finalLeaderIsrAndControllerEpoch - } - private def checkAndTriggerAutoLeaderRebalance(): Unit = { trace("Checking need to trigger auto leader balancing") val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] = @@ -1577,36 +1205,14 @@ class KafkaController(val config: KafkaConfig, // We need to register the watcher if the path doesn't exist in order to detect future // reassignments and we get the `path exists` check for free if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) { - val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] - val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - - zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) => - maybeBuildReassignment(tp, Some(targetReplicas)) match { - case Some(context) => partitionsToReassign.put(tp, context) - case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) - } - } - - reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) - val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE) - if (partitionsFailed.nonEmpty) { - warn(s"Failed reassignment through zk with the following errors: $partitionsFailed") - maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp)) - } - partitionsReassigned.keySet + reassignmentsManager.triggerZkReassignment() } else { Set.empty } } /** - * Process a partition reassignment from the AlterPartitionReassignment API. If there is an - * existing reassignment through zookeeper for any of the requested partitions, they will be - * cancelled prior to beginning the new reassignment. Any zk-based reassignment for partitions - * which are NOT included in this call will not be affected. - * - * @param reassignments Map of reassignments passed through the AlterReassignments API. A null value - * means that we should cancel an in-progress reassignment. + * Process a partition reassignment from the AlterPartitionReassignment API. * @param callback Callback to send AlterReassignments response */ private def processApiPartitionReassignment(reassignments: Map[TopicPartition, Option[Seq[Int]]], @@ -1614,72 +1220,17 @@ class KafkaController(val config: KafkaConfig, if (!isActive) { callback(Right(new ApiError(Errors.NOT_CONTROLLER))) } else { - val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] - val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - - reassignments.foreach { case (tp, targetReplicas) => - if (replicasAreValid(tp, targetReplicas)) { - maybeBuildReassignment(tp, targetReplicas) match { - case Some(context) => partitionsToReassign.put(tp, context) - case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) - } - } else { - reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)) - } - } - - // The latest reassignment (whether by API or through zk) always takes precedence, - // so remove from active zk reassignment (if one exists) - maybeRemoveFromZkReassignment((tp, _) => partitionsToReassign.contains(tp)) - - reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) - callback(Left(reassignmentResults)) - } - } - - private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { - replicasOpt match { - case Some(replicas) => - val replicaSet = replicas.toSet - if (replicas.isEmpty || replicas.size != replicaSet.size) - false - else if (replicas.exists(_ < 0)) - false - else { - // Ensure that any new replicas are among the live brokers - val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - val newAssignment = currentAssignment.reassignTo(replicas) - newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds) - } - - case None => true - } - } - - private def maybeBuildReassignment(topicPartition: TopicPartition, - targetReplicasOpt: Option[Seq[Int]]): Option[ReplicaAssignment] = { - val replicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - if (replicaAssignment.isBeingReassigned) { - val targetReplicas = targetReplicasOpt.getOrElse(replicaAssignment.originReplicas) - Some(replicaAssignment.reassignTo(targetReplicas)) - } else { - targetReplicasOpt.map { targetReplicas => - replicaAssignment.reassignTo(targetReplicas) - } + callback(Left(reassignmentsManager.triggerApiReassignment(reassignments))) } } private def processPartitionReassignmentIsrChange(topicPartition: TopicPartition): Unit = { if (!isActive) return - if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - if (isReassignmentComplete(topicPartition, reassignment)) { - // resume the partition reassignment process - info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " + - s"reassigning partition $topicPartition") - onPartitionReassignment(topicPartition, reassignment) - } + try { + reassignmentsManager.maybeResumeReassignment(topicPartition) + } catch { + case e: IllegalStateException => handleIllegalState(e) } } @@ -1687,20 +1238,7 @@ class KafkaController(val config: KafkaConfig, if (!isActive) { callback(Right(new ApiError(Errors.NOT_CONTROLLER))) } else { - val results: mutable.Map[TopicPartition, ReplicaAssignment] = mutable.Map.empty - val partitionsToList = partitionsOpt match { - case Some(partitions) => partitions - case None => controllerContext.partitionsBeingReassigned - } - - partitionsToList.foreach { tp => - val assignment = controllerContext.partitionFullReplicaAssignment(tp) - if (assignment.isBeingReassigned) { - results += tp -> assignment - } - } - - callback(Left(results)) + callback(Left(reassignmentsManager.listPartitionsBeingReassigned(partitionsOpt))) } } diff --git a/core/src/main/scala/kafka/controller/ReassignmentsManager.scala b/core/src/main/scala/kafka/controller/ReassignmentsManager.scala new file mode 100644 index 0000000000000..6a6fda151cedf --- /dev/null +++ b/core/src/main/scala/kafka/controller/ReassignmentsManager.scala @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import kafka.admin.AdminOperationException +import kafka.common.StateChangeFailedException +import kafka.utils.Logging +import kafka.zk.{KafkaZkClient, ReassignPartitionsZNode, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import org.apache.kafka.common.errors.ControllerMovedException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.KeeperException.Code + +import scala.collection.{Map, Seq, Set, immutable, mutable} + +/** + * A helper class which contains logic for driving partition reassignments. + * This class is not thread-safe. + */ +class ReassignmentsManager(controllerContext: ControllerContext, + zkClient: KafkaZkClient, + topicDeletionManager: TopicDeletionManager, + replicaStateMachine: ReplicaStateMachine, + partitionStateMachine: PartitionStateMachine, + eventManager: ControllerEventManager, + brokerRequestBatch: ControllerBrokerRequestBatch, + stateChangeLogger: StateChangeLogger) + extends Logging { + + /** + * If there is an existing reassignment through zookeeper for any of the requested partitions, they will be + * cancelled prior to beginning the new reassignment. Any zk-based reassignment for partitions + * which are NOT included in this call will not be affected. + * + * @param reassignments Map of reassignments passed through the AlterReassignments API. A null value + * means that we should cancel an in-progress reassignment. + */ + def triggerApiReassignment(reassignments: Map[TopicPartition, Option[Seq[Int]]]): mutable.Map[TopicPartition, ApiError] = { + val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] + val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] + + reassignments.foreach { case (tp, targetReplicas) => + if (replicasAreValid(tp, targetReplicas)) { + maybeBuildReassignment(tp, targetReplicas) match { + case Some(context) => partitionsToReassign.put(tp, context) + case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) + } + } else { + reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)) + } + } + + // The latest reassignment (whether by API or through zk) always takes precedence, + // so remove from active zk reassignment (if one exists) + maybeRemoveFromZkReassignment((tp, _) => partitionsToReassign.contains(tp)) + + reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) + } + + def triggerZkReassignment(): Set[TopicPartition] = { + val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] + val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] + + zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) => + maybeBuildReassignment(tp, Some(targetReplicas)) match { + case Some(context) => partitionsToReassign.put(tp, context) + case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) + } + } + + reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) + val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE) + if (partitionsFailed.nonEmpty) { + warn(s"Failed reassignment through zk with the following errors: $partitionsFailed") + maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp)) + } + partitionsReassigned.keySet + } + + def maybeResumeReassignments(shouldResume: (TopicPartition, ReplicaAssignment) => Boolean): Unit = { + controllerContext.partitionsBeingReassigned.foreach { tp => + val currentAssignment = controllerContext.partitionFullReplicaAssignment(tp) + if (shouldResume(tp, currentAssignment)) + onPartitionReassignment(tp, currentAssignment) + } + } + + /** + * @throws IllegalStateException + */ + def maybeResumeReassignment(topicPartition: TopicPartition): Unit = { + if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { + val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + if (isReassignmentComplete(topicPartition, reassignment)) { + // resume the partition reassignment process + info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " + + s"reassigning partition $topicPartition") + onPartitionReassignment(topicPartition, reassignment) + } + } + } + + def listPartitionsBeingReassigned(partitionsOpt: Option[Set[TopicPartition]]): Map[TopicPartition, ReplicaAssignment] = { + val results: mutable.Map[TopicPartition, ReplicaAssignment] = mutable.Map.empty + val partitionsToList = partitionsOpt match { + case Some(partitions) => partitions + case None => controllerContext.partitionsBeingReassigned + } + + partitionsToList.foreach { tp => + val assignment = controllerContext.partitionFullReplicaAssignment(tp) + if (assignment.isBeingReassigned) { + results += tp -> assignment + } + } + results + } + + /** + * Trigger a partition reassignment provided that the topic exists and is not being deleted. + * + * This is called when a reassignment is initially received either through Zookeeper or through the + * AlterPartitionReassignments API + * + * The `partitionsBeingReassigned` field in the controller context will be updated by this + * call after the reassignment completes validation and is successfully stored in the topic + * assignment zNode. + * + * @param reassignments The reassignments to begin processing + * @return A map of any errors in the reassignment. If the error is NONE for a given partition, + * then the reassignment was submitted successfully. + */ + private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = { + reassignments.map { case (tp, reassignment) => + val topic = tp.topic + + val apiError = if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) { + info(s"Skipping reassignment of $tp since the topic is currently being deleted") + new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") + } else { + val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) + if (assignedReplicas.nonEmpty) { + try { + onPartitionReassignment(tp, reassignment) + ApiError.NONE + } catch { + case e: ControllerMovedException => + info(s"Failed completing reassignment of partition $tp because controller has moved to another broker") + throw e + case e: Throwable => + error(s"Error completing reassignment of partition $tp", e) + new ApiError(Errors.UNKNOWN_SERVER_ERROR) + } + } else { + new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") + } + } + + tp -> apiError + } + } + + private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { + replicasOpt match { + case Some(replicas) => + val replicaSet = replicas.toSet + if (replicas.isEmpty || replicas.size != replicaSet.size) + false + else if (replicas.exists(_ < 0)) + false + else { + // Ensure that any new replicas are among the live brokers + val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + val newAssignment = currentAssignment.reassignTo(replicas) + newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds) + } + + case None => true + } + } + + private def maybeBuildReassignment(topicPartition: TopicPartition, + targetReplicasOpt: Option[Seq[Int]]): Option[ReplicaAssignment] = { + val replicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + if (replicaAssignment.isBeingReassigned) { + val targetReplicas = targetReplicasOpt.getOrElse(replicaAssignment.originReplicas) + Some(replicaAssignment.reassignTo(targetReplicas)) + } else { + targetReplicasOpt.map { targetReplicas => + replicaAssignment.reassignTo(targetReplicas) + } + } + } + + /** + * This callback is invoked: + * 1. By the AlterPartitionReassignments API + * 2. By the reassigned partitions listener which is triggered when the /admin/reassign/partitions znode is created + * 3. When an ongoing reassignment finishes - this is detected by a change in the partition's ISR znode + * 4. Whenever a new broker comes up which is part of an ongoing reassignment + * 5. On controller startup/failover + * + * Reassigning replicas for a partition goes through a few steps listed in the code. + * RS = current assigned replica set + * ORS = Original replica set for partition + * TRS = Reassigned (target) replica set + * AR = The replicas we are adding as part of this reassignment + * RR = The replicas we are removing as part of this reassignment + * + * A reassignment may have up to three phases, each with its own steps: + * + * Phase U (Assignment update): Regardless of the trigger, the first step is in the reassignment process + * is to update the existing assignment state. We always update the state in Zookeeper before + * we update memory so that it can be resumed upon controller fail-over. + * + * U1. Update ZK with RS = ORS + TRS, AR = TRS - ORS, RR = ORS - TRS. + * U2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS + * U3. If we are cancelling or replacing an existing reassignment, send StopReplica to all members + * of AR in the original reassignment if they are not in TRS from the new assignment + * + * To complete the reassignment, we need to bring the new replicas into sync, so depending on the state + * of the ISR, we will execute one of the following steps. + * + * Phase A (when TRS != ISR): The reassignment is not yet complete + * + * A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS. + * A2. Start new replicas AR by moving replicas in AR to NewReplica state. + * + * Phase B (when TRS = ISR): The reassignment is complete + * + * B1. Move all replicas in AR to OnlineReplica state. + * B2. Set RS = TRS, AR = [], RR = [] in memory. + * B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr. + * If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. + * We may send the LeaderAndIsr to more than the TRS replicas due to the + * way the partition state machine works (it reads replicas from ZK) + * B4. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state change, we shrink the + * isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. + * After that, we send a StopReplica (delete = false) to the replicas in RR. + * B5. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica (delete = true) to + * the replicas in RR to physically delete the replicas on disk. + * B6. Update ZK with RS=TRS, AR=[], RR=[]. + * B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. + * B8. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. + * + * In general, there are two goals we want to aim for: + * 1. Every replica present in the replica set of a LeaderAndIsrRequest gets the request sent to it + * 2. Replicas that are removed from a partition's assignment get StopReplica sent to them + * + * For example, if ORS = {1,2,3} and TRS = {4,5,6}, the values in the topic and leader/isr paths in ZK + * may go through the following transitions. + * RS AR RR leader isr + * {1,2,3} {} {} 1 {1,2,3} (initial state) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3} (step A2) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3,4,5,6} (phase B) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {1,2,3,4,5,6} (step B3) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {4,5,6} (step B4) + * {4,5,6} {} {} 4 {4,5,6} (step B6) + * + * Note that we have to update RS in ZK with TRS last since it's the only place where we store ORS persistently. + * This way, if the controller crashes before that step, we can still recover. + * + * @throws IllegalStateException + */ + private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { + // While a reassignment is in progress, deletion is not allowed + topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress") + + updateCurrentReassignment(topicPartition, reassignment) + + val addingReplicas = reassignment.addingReplicas + val removingReplicas = reassignment.removingReplicas + + if (!isReassignmentComplete(topicPartition, reassignment)) { + // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). + updateLeaderEpochAndSendRequest(topicPartition, reassignment) + // A2. replicas in AR -> NewReplica + startNewReplicasForReassignedPartition(topicPartition, addingReplicas) + } else { + // B1. replicas in AR -> OnlineReplica + replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica) + // B2. Set RS = TRS, AR = [], RR = [] in memory. + val completedReassignment = ReplicaAssignment(reassignment.targetReplicas) + controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment) + // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and + // a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS + moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment) + // B4. replicas in RR -> Offline (force those replicas out of isr) + // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted) + stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas) + // B6. Update ZK with RS = TRS, AR = [], RR = []. + updateReplicaAssignmentForPartition(topicPartition, completedReassignment) + // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it. + removePartitionFromReassigningPartitions(topicPartition, completedReassignment) + // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker + brokerRequestBatch.newBatch() + brokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) + // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed + topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic)) + } + } + + /** + * Update the current assignment state in Zookeeper and in memory. If a reassignment is already in + * progress, then the new reassignment will supplant it and some replicas will be shutdown. + * + * Note that due to the way we compute the original replica set, we cannot guarantee that a + * cancellation will restore the original replica order. Target replicas are always listed + * first in the replica set in the desired order, which means we have no way to get to the + * original order if the reassignment overlaps with the current assignment. For example, + * with an initial assignment of [1, 2, 3] and a reassignment of [3, 4, 2], then the replicas + * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If the reassignment + * is cancelled, there is no way to restore the original order. + * + * @param topicPartition The reassigning partition + * @param reassignment The new reassignment + */ + private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { + val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + + if (currentAssignment != reassignment) { + debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment") + + // U1. Update assignment state in zookeeper + updateReplicaAssignmentForPartition(topicPartition, reassignment) + // U2. Update assignment state in memory + controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment) + + // If there is a reassignment already in progress, then some of the currently adding replicas + // may be eligible for immediate removal, in which case we need to stop the replicas. + val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas) + if (unneededReplicas.nonEmpty) + stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) + } + + val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) + zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) + + controllerContext.partitionsBeingReassigned.add(topicPartition) + } + + + private def isReassignmentComplete(partition: TopicPartition, assignment: ReplicaAssignment): Boolean = { + if (!assignment.isBeingReassigned) { + true + } else { + zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch => + val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr.toSet + val targetReplicas = assignment.targetReplicas.toSet + targetReplicas.subsetOf(isr) + } + } + } + + /** + * @throws IllegalStateException + */ + private def updateLeaderEpochAndSendRequest(topicPartition: TopicPartition, + assignment: ReplicaAssignment): Unit = { + val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) + updateLeaderEpoch(topicPartition) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + brokerRequestBatch.newBatch() + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas, topicPartition, + updatedLeaderIsrAndControllerEpoch, assignment, isNew = false) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) + stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " + + s"new replica assignment $assignment to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " + + s"for partition being reassigned $topicPartition") + + case None => // fail the reassignment + stateChangeLog.error(s"Failed to send LeaderAndIsr request with new replica assignment " + + s"$assignment to leader for partition being reassigned $topicPartition") + } + } + + /** + * Does not change leader or isr, but just increments the leader epoch + * + * @param partition partition + * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. + */ + private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = { + debug(s"Updating leader epoch for partition $partition") + var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None + var zkWriteCompleteOrUnnecessary = false + while (!zkWriteCompleteOrUnnecessary) { + // refresh leader and isr from zookeeper again + zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { + case Some(leaderIsrAndControllerEpoch) => + val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch + if (controllerEpoch > controllerContext.epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " + + s"means the current controller with epoch ${controllerContext.epoch} went through a soft failure and another " + + s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller") + // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded + // assigned replica list + val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion + // update the new leadership decision in zookeeper or retry + val UpdateLeaderAndIsrResult(finishedUpdates, _) = + zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), controllerContext.epoch, controllerContext.epochZkVersion) + + finishedUpdates.headOption.exists { + case (partition, Right(leaderAndIsr)) => + finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)) + info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}") + true + case (_, Left(e)) => + throw e + } + case None => + throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " + + "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist") + } + } + finalLeaderIsrAndControllerEpoch + } + + private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = { + var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) + topicAssignment += topicPartition -> assignment + + val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion) + setDataResponse.resultCode match { + case Code.OK => + info(s"Successfully updated assignment of partition $topicPartition to $assignment") + case Code.NONODE => + throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " + + "has no current assignment") + case _ => throw new KafkaException(setDataResponse.resultException.get) + } + } + + private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = { + // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned + // replicas list + newReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Seq(PartitionAndReplica(topicPartition, replica)), NewReplica) + } + } + + private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition, + newAssignment: ReplicaAssignment): Unit = { + val reassignedReplicas = newAssignment.replicas + val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader + + if (!reassignedReplicas.contains(currentLeader)) { + info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + + s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader") + // move the leader to one of the alive and caught up new replicas + partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) + } else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) { + info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + + s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive") + // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicPartition, newAssignment) + } else { + info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + + s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead") + partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) + } + } + + private def stopRemovedReplicasOfReassignedPartition(topicPartition: TopicPartition, + removedReplicas: Seq[Int]): Unit = { + // first move the replica to offline state (the controller removes it from the ISR) + val replicasToBeDeleted = removedReplicas.map(PartitionAndReplica(topicPartition, _)) + replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica) + // send stop replica command to the old replicas + replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted) + // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed + replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful) + replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica) + } + + private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition, + assignment: ReplicaAssignment): Unit = { + if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { + val path = TopicPartitionStateZNode.path(topicPartition) + zkClient.unregisterZNodeChangeHandler(path) + maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas) + controllerContext.partitionsBeingReassigned.remove(topicPartition) + } else { + throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory") + } + } + + /** + * Remove partitions from an active zk-based reassignment (if one exists). + * + * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed + */ + private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = { + if (!zkClient.reassignPartitionsInProgress()) + return + + val reassigningPartitions = zkClient.getPartitionReassignment() + val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) => + shouldRemoveReassignment(tp, replicas) + } + info(s"Removing partitions $removingPartitions from the list of reassigned partitions in zookeeper") + + // write the new list to zookeeper + if (updatedPartitionsBeingReassigned.isEmpty) { + info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}") + zkClient.deletePartitionReassignment(controllerContext.epochZkVersion) + // Ensure we detect future reassignments + eventManager.put(ZkPartitionReassignment) + } else { + try { + zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion) + } catch { + case e: KeeperException => throw new AdminOperationException(e) + } + } + } +} diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index d032b3b499dbf..b68a213143e86 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -81,7 +81,6 @@ class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkCli * as well as from zookeeper. This is the only time the /brokers/topics/ path gets deleted. On the other hand, * if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then * it marks the topic for deletion retry. - * @param controller */ class TopicDeletionManager(config: KafkaConfig, controllerContext: ControllerContext, diff --git a/core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala new file mode 100644 index 0000000000000..cbdae52ae922d --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import kafka.api.LeaderAndIsr +import kafka.cluster.{Broker, EndPoint} +import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper.SetDataResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.zookeeper.KeeperException.Code +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} +import org.mockito.Mockito +import org.mockito.Mockito.doReturn +import org.mockito.Mockito.verify + +import scala.collection.{Map, Set, mutable} + +class ReassignmentsManagerTest { + private var controllerContext: ControllerContext = null + private var mockZkClient: KafkaZkClient = null + private var mockTopicDeletionManager: TopicDeletionManager = null + private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null + private var mockReplicaStateMachine: ReplicaStateMachine = null + private var mockPartitionStateMachine: PartitionStateMachine = null + private var mockBrokerRequestBatch: ControllerBrokerRequestBatch = null + + private final val controllerEpoch = 10 + private final val zkEpoch = 105 + private final val topic = "topic" + private final val tp = new TopicPartition(topic, 0) + private final val mockPartitionReassignmentHandler = new PartitionReassignmentHandler(null) + + private var partitionReassignmentManager: ReassignmentsManager = null + + @Before + def setUp(): Unit = { + controllerContext = new ControllerContext + controllerContext.epoch = controllerEpoch + controllerContext.epochZkVersion = zkEpoch + mockZkClient = Mockito.mock(classOf[KafkaZkClient]) + mockTopicDeletionManager = Mockito.mock(classOf[TopicDeletionManager]) + mockControllerBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) + mockReplicaStateMachine = Mockito.mock(classOf[ReplicaStateMachine]) + mockPartitionStateMachine = Mockito.mock(classOf[PartitionStateMachine]) + mockBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) + partitionReassignmentManager = new ReassignmentsManager(controllerContext, mockZkClient, mockTopicDeletionManager, + mockReplicaStateMachine, mockPartitionStateMachine, null, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None)) + } + + @Test + def testTopicsQueuedUpForDeletionDoNotGetReassigned(): Unit = { + val initialAssignment = ReplicaAssignment(Seq(0, 1, 2), Seq(), Seq()) + controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) + setLiveBrokers(Seq(0, 1, 2, 3)) + doReturn(true, Nil: _*).when(mockTopicDeletionManager).isTopicQueuedUpForDeletion(tp.topic()) + val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(1,2,3)))) + assertTrue(results(tp).is(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } + + /** + * Phase A of a partition reassignment denotes the initial trigger of a reassignment. + * + * A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS. + * A2. Start new replicas AR by moving replicas in AR to NewReplica state. + */ + @Test + def testPhaseAOfPartitionReassignment(): Unit = { + /* + * Existing assignment is [1,2,3] + * We issue a reassignment to [3, 4, 5] + */ + val expectedAddingReplicas = Seq(4, 5) + val expectedFullReplicaSet = Seq(3, 4, 5, 1, 2) + val initialAssignment = ReplicaAssignment(Seq(1, 2, 3), Seq(), Seq()) + val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3), zkEpoch) + controllerContext.partitionAssignments.put(topic, mutable.Map(tp.partition() -> initialAssignment)) + controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) + controllerContext.partitionsBeingReassigned.add(tp) + setLiveBrokers(Seq(1,2,3,4,5)) + mockAreReplicasInIsr(tp, List(1, 2, 3), initialLeaderAndIsr) + val expectedNewAssignment = ReplicaAssignment.fromOldAndNewReplicas(Seq(1, 2, 3), Seq(3, 4, 5)) + assertEquals(expectedFullReplicaSet, expectedNewAssignment.replicas) + // U1. Should update ZK + doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient).setTopicAssignmentRaw(topic, mutable.Map(tp -> expectedNewAssignment), zkEpoch) + // U2. Should update memory + // A1. Should update partition leader epoch in ZK + val expectedLeaderAndIsr = initialLeaderAndIsr.newEpochAndZkVersion + doReturn(UpdateLeaderAndIsrResult(Map(tp -> Right(expectedLeaderAndIsr)), Seq()), Nil: _*) + .when(mockZkClient).updateLeaderAndIsr(Map(tp -> expectedLeaderAndIsr), controllerEpoch, zkEpoch) + + // act + val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(3, 4, 5)))) + assertTrue(s"reassignment failed - $results", results(tp).isSuccess) + + // U2. Should have updated memory + assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) + // A1. Should send a LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). + verify(mockBrokerRequestBatch).addLeaderAndIsrRequestForBrokers( + expectedFullReplicaSet, tp, + LeaderIsrAndControllerEpoch(expectedLeaderAndIsr, controllerEpoch), expectedNewAssignment, isNew = false + ) + // A2. replicas in AR -> NewReplica + expectedAddingReplicas.foreach { newReplica => + verify(mockReplicaStateMachine).handleStateChanges( + Seq(PartitionAndReplica(tp, newReplica)), NewReplica + ) + } + verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) + } + + /** + * Phase B of a partition reassignment is the part where all the new replicas are in ISR + * and the controller finishes the reassignment + * B1. Move all replicas in AR to OnlineReplica state. + * B2. Set RS = TRS, AR = [], RR = [] in memory. + * B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr. + * If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. + * We may send the LeaderAndIsr to more than the TRS replicas due to the + * way the partition state machine works (it reads replicas from ZK) + * B4. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state change, we shrink the + * isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. + * After that, we send a StopReplica (delete = false) to the replicas in RR. + * B5. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica (delete = true) to + * the replicas in RR to physically delete the replicas on disk. + * B6. Update ZK with RS=TRS, AR=[], RR=[]. + * B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. + * B8. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. + * + */ + @Test + def testPhaseBOfPartitionReassignment(): Unit = { + /* + * Existing assignment is [1,2,3] + * We had issued a reassignment to [3, 4, 5] and now all replicas are in ISR + */ + val expectedRemovingReplicas = Seq(PartitionAndReplica(tp, 1), PartitionAndReplica(tp, 2)) + val initialAssignment = ReplicaAssignment.fromOldAndNewReplicas(Seq(1, 2, 3), Seq(3, 4, 5)) + val expectedNewAssignment = ReplicaAssignment(Seq(3, 4, 5), Seq(), Seq()) + val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3, 4, 5), zkEpoch) + setLiveBrokers(Seq(1, 2, 3, 4, 5)) + controllerContext.partitionAssignments.put(topic, mutable.Map(tp.partition() -> initialAssignment)) + controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) + controllerContext.partitionsBeingReassigned.add(tp) + controllerContext.partitionLeadershipInfo.put(tp, + LeaderIsrAndControllerEpoch(initialLeaderAndIsr, controllerEpoch)) + mockAreReplicasInIsr(tp, List(1, 2, 3, 4, 5), initialLeaderAndIsr) + + // B2. Set RS = TRS, AR = [], RR = [] in memory. + // B3. Send a LeaderAndIsr request with RS = TRS. + // If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. + // We may send the LeaderAndIsr to more than the TRS replicas due to the way the partition state machine works (it reads replicas from ZK) + doReturn(Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]], Nil: _*) + .when(mockPartitionStateMachine).handleStateChanges(Seq(tp), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) + // B6. Update ZK with RS = TRS, AR = [], RR = []. + doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient) + .setTopicAssignmentRaw(tp.topic(), mutable.Map(tp -> expectedNewAssignment), zkEpoch) + // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. + doReturn(true, Nil: _*).when(mockZkClient).reassignPartitionsInProgress() + doReturn(Map(tp -> Seq(1, 2, 3)), Nil: _*).when(mockZkClient).getPartitionReassignment() + doReturn(false, Nil: _*).when(mockZkClient).registerZNodeChangeHandlerAndCheckExistence(mockPartitionReassignmentHandler) + + // act + partitionReassignmentManager.maybeResumeReassignment(tp) + + // B1. All adding replicas moved to OnlineReplica state. + verify(mockReplicaStateMachine).handleStateChanges( + initialAssignment.addingReplicas.map(PartitionAndReplica(tp, _)), OnlineReplica + ) + // B2. Should have updated memory + assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) + // B4. replicas in RR -> Offline (force those replicas out of isr) + // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted) + verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, OfflineReplica) + verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, ReplicaDeletionStarted) + verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, ReplicaDeletionSuccessful) + verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, NonExistentReplica) + // B7. Should have cleared in-memory partitionsBeingReassigned and unregistered the znode handler + assertEquals(Set(), controllerContext.partitionsBeingReassigned) + verify(mockZkClient).unregisterZNodeChangeHandler(TopicPartitionStateZNode.path(tp)) + // B8. Resend the update metadata request to every broker + verify(mockBrokerRequestBatch).addUpdateMetadataRequestForBrokers(controllerContext.liveBrokerIds.toSeq, Set(tp)) + verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) + verify(mockTopicDeletionManager).resumeDeletionForTopics(Set(tp.topic())) + } + + def setLiveBrokers(brokerIds: Seq[Int]): Unit = { + val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"), + SecurityProtocol.PLAINTEXT) + val brokerEpochs = brokerIds.map { + id => (Broker(id, Seq(endpoint1), rack = None), 1L) + }.toMap + controllerContext.setLiveBrokerAndEpochs(brokerEpochs) + } + + /** + * To determine what phase of the reassignment we are in, we check whether the target replicas are in the ISR set + * If they aren't, we enter phase A. If they are - phase B + */ + def mockAreReplicasInIsr(tp: TopicPartition, isr: List[Int], leaderAndIsr: LeaderAndIsr): Unit = { + val tpStateMap: Map[TopicPartition, LeaderIsrAndControllerEpoch] = Map( + tp -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) + ) + doReturn(tpStateMap, Nil: _*).when(mockZkClient).getTopicPartitionStates(Seq(tp)) + } + + def mockSetDataResponseOK: SetDataResponse = + SetDataResponse(Code.OK, "", None, null, null) +} \ No newline at end of file From b661bc59adb8fadeeeb9e70dfa2648de65df5e12 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 14 Nov 2019 16:57:58 +0000 Subject: [PATCH 2/6] Handle illegal state exceptions in all cases --- .../kafka/controller/KafkaController.scala | 30 +++++++++++++++---- .../controller/ReassignmentsManager.scala | 10 +++++-- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c6effe7e566e5..8eef752f64296 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -426,9 +426,14 @@ class KafkaController(val config: KafkaConfig, // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // check if reassignment of some partitions need to be restarted - reassignmentsManager.maybeResumeReassignments { (_, assignment) => - assignment.targetReplicas.exists(newBrokersSet.contains) + try { + reassignmentsManager.maybeResumeReassignments { (_, assignment) => + assignment.targetReplicas.exists(newBrokersSet.contains) + } + } catch { + case e: IllegalStateException => handleIllegalState(e) } + // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists // on the newly restarted brokers, there is a chance that topic deletion can resume val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) @@ -642,8 +647,12 @@ class KafkaController(val config: KafkaConfig, // New reassignments may have been submitted through Zookeeper while the controller was failing over val zkPartitionsResumed = processZkPartitionReassignment() // We may also have some API-based reassignments that need to be restarted - reassignmentsManager.maybeResumeReassignments { (tp, _) => - !zkPartitionsResumed.contains(tp) + try { + reassignmentsManager.maybeResumeReassignments { (tp, _) => + !zkPartitionsResumed.contains(tp) + } + } catch { + case e: IllegalStateException => handleIllegalState(e) } } @@ -1205,7 +1214,11 @@ class KafkaController(val config: KafkaConfig, // We need to register the watcher if the path doesn't exist in order to detect future // reassignments and we get the `path exists` check for free if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) { - reassignmentsManager.triggerZkReassignment() + try { + reassignmentsManager.triggerZkReassignment() + } catch { + case e: IllegalStateException => handleIllegalState(e) + } } else { Set.empty } @@ -1220,7 +1233,12 @@ class KafkaController(val config: KafkaConfig, if (!isActive) { callback(Right(new ApiError(Errors.NOT_CONTROLLER))) } else { - callback(Left(reassignmentsManager.triggerApiReassignment(reassignments))) + try { + val results = reassignmentsManager.triggerApiReassignment(reassignments) + callback(Left(results)) + } catch { + case e: IllegalStateException => handleIllegalState(e) + } } } diff --git a/core/src/main/scala/kafka/controller/ReassignmentsManager.scala b/core/src/main/scala/kafka/controller/ReassignmentsManager.scala index 6a6fda151cedf..2bfd69ae72de7 100644 --- a/core/src/main/scala/kafka/controller/ReassignmentsManager.scala +++ b/core/src/main/scala/kafka/controller/ReassignmentsManager.scala @@ -42,8 +42,7 @@ class ReassignmentsManager(controllerContext: ControllerContext, partitionStateMachine: PartitionStateMachine, eventManager: ControllerEventManager, brokerRequestBatch: ControllerBrokerRequestBatch, - stateChangeLogger: StateChangeLogger) - extends Logging { + stateChangeLogger: StateChangeLogger) extends Logging { /** * If there is an existing reassignment through zookeeper for any of the requested partitions, they will be @@ -75,6 +74,9 @@ class ReassignmentsManager(controllerContext: ControllerContext, reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) } + /** + * @throws IllegalStateException + */ def triggerZkReassignment(): Set[TopicPartition] = { val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] @@ -95,6 +97,9 @@ class ReassignmentsManager(controllerContext: ControllerContext, partitionsReassigned.keySet } + /** + * @throws IllegalStateException + */ def maybeResumeReassignments(shouldResume: (TopicPartition, ReplicaAssignment) => Boolean): Unit = { controllerContext.partitionsBeingReassigned.foreach { tp => val currentAssignment = controllerContext.partitionFullReplicaAssignment(tp) @@ -358,7 +363,6 @@ class ReassignmentsManager(controllerContext: ControllerContext, controllerContext.partitionsBeingReassigned.add(topicPartition) } - private def isReassignmentComplete(partition: TopicPartition, assignment: ReplicaAssignment): Boolean = { if (!assignment.isBeingReassigned) { true From 1e81f1e622db6ba933fbb96904c98026bf76c2b9 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 21 Nov 2019 14:42:25 +0000 Subject: [PATCH 3/6] Rename class and use MockPartitionStateMachine in test --- .../scala/kafka/controller/KafkaController.scala | 2 +- ...tsManager.scala => ReassignmentManager.scala} | 16 ++++++++-------- ...rTest.scala => ReassignmentManagerTest.scala} | 11 +++++------ 3 files changed, 14 insertions(+), 15 deletions(-) rename core/src/main/scala/kafka/controller/{ReassignmentsManager.scala => ReassignmentManager.scala} (98%) rename core/src/test/scala/unit/kafka/controller/{ReassignmentsManagerTest.scala => ReassignmentManagerTest.scala} (95%) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8eef752f64296..e738f22af430a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -95,7 +95,7 @@ class KafkaController(val config: KafkaConfig, new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine, partitionStateMachine, new ControllerDeletionClient(this, zkClient)) - val reassignmentsManager = new ReassignmentsManager(controllerContext, zkClient, topicDeletionManager, + val reassignmentsManager = new ReassignmentManager(controllerContext, zkClient, topicDeletionManager, replicaStateMachine, partitionStateMachine, eventManager, brokerRequestBatch, stateChangeLogger) private val controllerChangeHandler = new ControllerChangeHandler(eventManager) diff --git a/core/src/main/scala/kafka/controller/ReassignmentsManager.scala b/core/src/main/scala/kafka/controller/ReassignmentManager.scala similarity index 98% rename from core/src/main/scala/kafka/controller/ReassignmentsManager.scala rename to core/src/main/scala/kafka/controller/ReassignmentManager.scala index 2bfd69ae72de7..5b76244b37379 100644 --- a/core/src/main/scala/kafka/controller/ReassignmentsManager.scala +++ b/core/src/main/scala/kafka/controller/ReassignmentManager.scala @@ -35,14 +35,14 @@ import scala.collection.{Map, Seq, Set, immutable, mutable} * A helper class which contains logic for driving partition reassignments. * This class is not thread-safe. */ -class ReassignmentsManager(controllerContext: ControllerContext, - zkClient: KafkaZkClient, - topicDeletionManager: TopicDeletionManager, - replicaStateMachine: ReplicaStateMachine, - partitionStateMachine: PartitionStateMachine, - eventManager: ControllerEventManager, - brokerRequestBatch: ControllerBrokerRequestBatch, - stateChangeLogger: StateChangeLogger) extends Logging { +class ReassignmentManager(controllerContext: ControllerContext, + zkClient: KafkaZkClient, + topicDeletionManager: TopicDeletionManager, + replicaStateMachine: ReplicaStateMachine, + partitionStateMachine: PartitionStateMachine, + eventManager: ControllerEventManager, + brokerRequestBatch: ControllerBrokerRequestBatch, + stateChangeLogger: StateChangeLogger) extends Logging { /** * If there is an existing reassignment through zookeeper for any of the requested partitions, they will be diff --git a/core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala similarity index 95% rename from core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala rename to core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala index cbdae52ae922d..9171162404845 100644 --- a/core/src/test/scala/unit/kafka/controller/ReassignmentsManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala @@ -35,7 +35,7 @@ import org.mockito.Mockito.verify import scala.collection.{Map, Set, mutable} -class ReassignmentsManagerTest { +class ReassignmentManagerTest { private var controllerContext: ControllerContext = null private var mockZkClient: KafkaZkClient = null private var mockTopicDeletionManager: TopicDeletionManager = null @@ -50,7 +50,7 @@ class ReassignmentsManagerTest { private final val tp = new TopicPartition(topic, 0) private final val mockPartitionReassignmentHandler = new PartitionReassignmentHandler(null) - private var partitionReassignmentManager: ReassignmentsManager = null + private var partitionReassignmentManager: ReassignmentManager = null @Before def setUp(): Unit = { @@ -61,9 +61,9 @@ class ReassignmentsManagerTest { mockTopicDeletionManager = Mockito.mock(classOf[TopicDeletionManager]) mockControllerBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) mockReplicaStateMachine = Mockito.mock(classOf[ReplicaStateMachine]) - mockPartitionStateMachine = Mockito.mock(classOf[PartitionStateMachine]) + mockPartitionStateMachine = new MockPartitionStateMachine(controllerContext, false) mockBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) - partitionReassignmentManager = new ReassignmentsManager(controllerContext, mockZkClient, mockTopicDeletionManager, + partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, mockTopicDeletionManager, mockReplicaStateMachine, mockPartitionStateMachine, null, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None)) } @@ -169,8 +169,7 @@ class ReassignmentsManagerTest { // B3. Send a LeaderAndIsr request with RS = TRS. // If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. // We may send the LeaderAndIsr to more than the TRS replicas due to the way the partition state machine works (it reads replicas from ZK) - doReturn(Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]], Nil: _*) - .when(mockPartitionStateMachine).handleStateChanges(Seq(tp), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) + controllerContext.partitionStates.put(tp, NewPartition) // B6. Update ZK with RS = TRS, AR = [], RR = []. doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient) .setTopicAssignmentRaw(tp.topic(), mutable.Map(tp -> expectedNewAssignment), zkEpoch) From 67bac34793583422e42321cafefe6c31ec726bfa Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 21 Nov 2019 15:44:04 +0000 Subject: [PATCH 4/6] Remove dependency on TopicDeletionManager and EventManager by introducing a ReassignmentListener --- .../kafka/controller/KafkaController.scala | 35 +++++++++- .../controller/ReassignmentManager.scala | 66 ++++++++++++------- .../controller/ReassignmentManagerTest.scala | 49 ++++++++++---- 3 files changed, 113 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e738f22af430a..1e8b691a3d14f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -95,8 +95,39 @@ class KafkaController(val config: KafkaConfig, new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine, partitionStateMachine, new ControllerDeletionClient(this, zkClient)) - val reassignmentsManager = new ReassignmentManager(controllerContext, zkClient, topicDeletionManager, - replicaStateMachine, partitionStateMachine, eventManager, brokerRequestBatch, stateChangeLogger) + val reassignmentListener = new ReassignmentListener { + + // While a reassignment is in progress, deletion is not allowed + override def preReassignmentStartOrResume(tp: TopicPartition): Unit = + topicDeletionManager.markTopicIneligibleForDeletion(Set(tp.topic), reason = "topic reassignment in progress") + + override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = + zkClient.registerZNodeChangeHandler(new PartitionReassignmentIsrChangeHandler(eventManager, tp)) + + override def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean): Unit = { + val path = TopicPartitionStateZNode.path(tp) + zkClient.unregisterZNodeChangeHandler(path) + if (deletedZNode) { + // Ensure we detect future reassignments + eventManager.put(ZkPartitionReassignment) + } + } + + // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed + override def postReassignmentFinished(tp: TopicPartition): Unit = + topicDeletionManager.resumeDeletionForTopics(Set(tp.topic)) + } + val reassignmentsManager = new ReassignmentManager(controllerContext, zkClient, reassignmentListener, + replicaStateMachine, partitionStateMachine, brokerRequestBatch, stateChangeLogger, + shouldSkipReassignment = tp => { + if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic())) { + info(s"Skipping reassignment of $tp since the topic is currently being deleted") + Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")) + } else { + None + } + } + ) private val controllerChangeHandler = new ControllerChangeHandler(eventManager) private val brokerChangeHandler = new BrokerChangeHandler(eventManager) diff --git a/core/src/main/scala/kafka/controller/ReassignmentManager.scala b/core/src/main/scala/kafka/controller/ReassignmentManager.scala index 5b76244b37379..cd789cba5eb3a 100644 --- a/core/src/main/scala/kafka/controller/ReassignmentManager.scala +++ b/core/src/main/scala/kafka/controller/ReassignmentManager.scala @@ -31,18 +31,43 @@ import org.apache.zookeeper.KeeperException.Code import scala.collection.{Map, Seq, Set, immutable, mutable} +trait ReassignmentListener { + /** + * Called right before the reassignment is starting + */ + def preReassignmentStartOrResume(tp: TopicPartition) + + /** + * Called right after the reassignment is persisted in ZK + */ + def postReassignmentStartedOrResumed(tp: TopicPartition) + + /** + * Called right before we clear a reassignment from memory + * @param deletedZNode - boolean indicating whether the reassign_partitions znode was + * deleted as part of the completion of the reassignment + * (e.g if it was the last reassignment in that znode) + */ + def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean) + + /** + * Called at the very end of the reassignment process + */ + def postReassignmentFinished(tp: TopicPartition) +} + /** * A helper class which contains logic for driving partition reassignments. * This class is not thread-safe. */ class ReassignmentManager(controllerContext: ControllerContext, zkClient: KafkaZkClient, - topicDeletionManager: TopicDeletionManager, + reassignmentListener: ReassignmentListener, replicaStateMachine: ReplicaStateMachine, partitionStateMachine: PartitionStateMachine, - eventManager: ControllerEventManager, brokerRequestBatch: ControllerBrokerRequestBatch, - stateChangeLogger: StateChangeLogger) extends Logging { + stateChangeLogger: StateChangeLogger, + shouldSkipReassignment: TopicPartition => Option[ApiError]) extends Logging { /** * If there is an existing reassignment through zookeeper for any of the requested partitions, they will be @@ -155,12 +180,7 @@ class ReassignmentManager(controllerContext: ControllerContext, */ private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = { reassignments.map { case (tp, reassignment) => - val topic = tp.topic - - val apiError = if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) { - info(s"Skipping reassignment of $tp since the topic is currently being deleted") - new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") - } else { + val apiError = shouldSkipReassignment(tp).getOrElse { val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) if (assignedReplicas.nonEmpty) { try { @@ -218,7 +238,7 @@ class ReassignmentManager(controllerContext: ControllerContext, /** * This callback is invoked: * 1. By the AlterPartitionReassignments API - * 2. By the reassigned partitions listener which is triggered when the /admin/reassign/partitions znode is created + * 2. By the reassigned partitions listener which is triggered when the /admin/reassign_partitions znode is created * 3. When an ongoing reassignment finishes - this is detected by a change in the partition's ISR znode * 4. Whenever a new broker comes up which is part of an ongoing reassignment * 5. On controller startup/failover @@ -286,8 +306,7 @@ class ReassignmentManager(controllerContext: ControllerContext, * @throws IllegalStateException */ private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { - // While a reassignment is in progress, deletion is not allowed - topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress") + reassignmentListener.preReassignmentStartOrResume(topicPartition) updateCurrentReassignment(topicPartition, reassignment) @@ -319,8 +338,7 @@ class ReassignmentManager(controllerContext: ControllerContext, brokerRequestBatch.newBatch() brokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) - // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed - topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic)) + reassignmentListener.postReassignmentFinished(topicPartition) } } @@ -357,8 +375,7 @@ class ReassignmentManager(controllerContext: ControllerContext, stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) } - val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) - zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) + reassignmentListener.postReassignmentStartedOrResumed(topicPartition) controllerContext.partitionsBeingReassigned.add(topicPartition) } @@ -500,9 +517,11 @@ class ReassignmentManager(controllerContext: ControllerContext, private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = { if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val path = TopicPartitionStateZNode.path(topicPartition) - zkClient.unregisterZNodeChangeHandler(path) - maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas) + val deletedZNode = maybeRemoveFromZkReassignment( + (tp, replicas) => tp == topicPartition && replicas == assignment.replicas + ) + + reassignmentListener.preReassignmentFinish(topicPartition, deletedZNode) controllerContext.partitionsBeingReassigned.remove(topicPartition) } else { throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory") @@ -513,10 +532,11 @@ class ReassignmentManager(controllerContext: ControllerContext, * Remove partitions from an active zk-based reassignment (if one exists). * * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed + * @return a boolean indicating whether the /admin/reassign_partitions znode was deleted */ - private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = { + private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Boolean = { if (!zkClient.reassignPartitionsInProgress()) - return + return false val reassigningPartitions = zkClient.getPartitionReassignment() val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) => @@ -528,11 +548,11 @@ class ReassignmentManager(controllerContext: ControllerContext, if (updatedPartitionsBeingReassigned.isEmpty) { info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}") zkClient.deletePartitionReassignment(controllerContext.epochZkVersion) - // Ensure we detect future reassignments - eventManager.put(ZkPartitionReassignment) + true } else { try { zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion) + false } catch { case e: KeeperException => throw new AdminOperationException(e) } diff --git a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala index 9171162404845..e012276fb15ee 100644 --- a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala @@ -25,10 +25,11 @@ import kafka.zookeeper.SetDataResponse import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.zookeeper.KeeperException.Code -import org.junit.Assert.{assertEquals, assertTrue} -import org.junit.{Before, Test} +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.{After, Before, Test} import org.mockito.Mockito import org.mockito.Mockito.doReturn import org.mockito.Mockito.verify @@ -38,12 +39,12 @@ import scala.collection.{Map, Set, mutable} class ReassignmentManagerTest { private var controllerContext: ControllerContext = null private var mockZkClient: KafkaZkClient = null - private var mockTopicDeletionManager: TopicDeletionManager = null private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null private var mockReplicaStateMachine: ReplicaStateMachine = null private var mockPartitionStateMachine: PartitionStateMachine = null - private var mockBrokerRequestBatch: ControllerBrokerRequestBatch = null + private var testReassignmentListener: ReassignmentListener = null + private var mockBrokerRequestBatch: ControllerBrokerRequestBatch = null private final val controllerEpoch = 10 private final val zkEpoch = 105 private final val topic = "topic" @@ -54,25 +55,42 @@ class ReassignmentManagerTest { @Before def setUp(): Unit = { + testReassignmentListener = new ReassignmentListener() { + var preReassignmentStartOrResume = false + var postReassignmentStartedOrResumed = false + var preReassignmentFinishCalled = false + var postReassignmentFinishedCalled = false + + override def preReassignmentStartOrResume(tp: TopicPartition): Unit = preReassignmentStartOrResume = true + + override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = postReassignmentStartedOrResumed = true + + override def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean): Unit = preReassignmentFinishCalled = true + + override def postReassignmentFinished(tp: TopicPartition): Unit = postReassignmentFinishedCalled = true + } controllerContext = new ControllerContext controllerContext.epoch = controllerEpoch controllerContext.epochZkVersion = zkEpoch mockZkClient = Mockito.mock(classOf[KafkaZkClient]) - mockTopicDeletionManager = Mockito.mock(classOf[TopicDeletionManager]) mockControllerBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) mockReplicaStateMachine = Mockito.mock(classOf[ReplicaStateMachine]) mockPartitionStateMachine = new MockPartitionStateMachine(controllerContext, false) mockBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) - partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, mockTopicDeletionManager, - mockReplicaStateMachine, mockPartitionStateMachine, null, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None)) + partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, testReassignmentListener, + mockReplicaStateMachine, mockPartitionStateMachine, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None), + shouldSkipReassignment = _ => None) } @Test - def testTopicsQueuedUpForDeletionDoNotGetReassigned(): Unit = { + def testShouldSkipReassignment(): Unit = { + partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, testReassignmentListener, + mockReplicaStateMachine, mockPartitionStateMachine, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None), + shouldSkipReassignment = _ => Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))) + val initialAssignment = ReplicaAssignment(Seq(0, 1, 2), Seq(), Seq()) controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) setLiveBrokers(Seq(0, 1, 2, 3)) - doReturn(true, Nil: _*).when(mockTopicDeletionManager).isTopicQueuedUpForDeletion(tp.topic()) val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(1,2,3)))) assertTrue(results(tp).is(Errors.UNKNOWN_TOPIC_OR_PARTITION)) } @@ -111,6 +129,8 @@ class ReassignmentManagerTest { // act val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(3, 4, 5)))) assertTrue(s"reassignment failed - $results", results(tp).isSuccess) + assertTrue("Listener was not called pre reassignment start", testReassignmentListener.preReassignmentStartOrResume) + assertTrue("Listener was not called post reassignment start", testReassignmentListener.postReassignmentStartedOrResumed) // U2. Should have updated memory assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) @@ -126,6 +146,9 @@ class ReassignmentManagerTest { ) } verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) + + assertFalse("Listener was wrongly called pre reassignment finish", testReassignmentListener.preReassignmentFinishCalled) + assertFalse("Listener was wrongly called post reassignment finish", testReassignmentListener.postReassignmentFinishedCalled) } /** @@ -180,6 +203,8 @@ class ReassignmentManagerTest { // act partitionReassignmentManager.maybeResumeReassignment(tp) + assertTrue("Listener was not called pre reassignment resumption", testReassignmentListener.preReassignmentStartOrResume) + assertTrue("Listener was not called post reassignment resumption", testReassignmentListener.postReassignmentStartedOrResumed) // B1. All adding replicas moved to OnlineReplica state. verify(mockReplicaStateMachine).handleStateChanges( @@ -193,13 +218,13 @@ class ReassignmentManagerTest { verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, ReplicaDeletionStarted) verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, ReplicaDeletionSuccessful) verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, NonExistentReplica) - // B7. Should have cleared in-memory partitionsBeingReassigned and unregistered the znode handler + // B7. Should have cleared in-memory partitionsBeingReassigned and called + assertTrue("Listener was not called pre reassignment finish", testReassignmentListener.preReassignmentFinishCalled) assertEquals(Set(), controllerContext.partitionsBeingReassigned) - verify(mockZkClient).unregisterZNodeChangeHandler(TopicPartitionStateZNode.path(tp)) // B8. Resend the update metadata request to every broker verify(mockBrokerRequestBatch).addUpdateMetadataRequestForBrokers(controllerContext.liveBrokerIds.toSeq, Set(tp)) verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) - verify(mockTopicDeletionManager).resumeDeletionForTopics(Set(tp.topic())) + assertTrue("Listener was not called post reassignment finish", testReassignmentListener.postReassignmentFinishedCalled) } def setLiveBrokers(brokerIds: Seq[Int]): Unit = { From 3726f15073a2f2c3f935ad2e79357220d03507de Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 21 Nov 2019 16:42:25 +0000 Subject: [PATCH 5/6] Fix test --- .../controller/ReassignmentManagerTest.scala | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala index e012276fb15ee..6e4873236a89c 100644 --- a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala @@ -19,7 +19,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.cluster.{Broker, EndPoint} -import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper.SetDataResponse import org.apache.kafka.common.TopicPartition @@ -29,7 +29,7 @@ import org.apache.kafka.common.requests.ApiError import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.zookeeper.KeeperException.Code import org.junit.Assert.{assertEquals, assertFalse, assertTrue} -import org.junit.{After, Before, Test} +import org.junit.{Before, Test} import org.mockito.Mockito import org.mockito.Mockito.doReturn import org.mockito.Mockito.verify @@ -37,12 +37,27 @@ import org.mockito.Mockito.verify import scala.collection.{Map, Set, mutable} class ReassignmentManagerTest { + + class TestReassignmentListener(var preReassignmentStartOrResumeCalled: Boolean = false, + var postReassignmentStartedOrResumedCalled: Boolean = false, + var preReassignmentFinishCalled: Boolean = false, + var postReassignmentFinishedCalled: Boolean = false, + ) extends ReassignmentListener { + override def preReassignmentStartOrResume(tp: TopicPartition): Unit = preReassignmentStartOrResumeCalled = true + + override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = postReassignmentStartedOrResumedCalled = true + + override def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean): Unit = preReassignmentFinishCalled = true + + override def postReassignmentFinished(tp: TopicPartition): Unit = postReassignmentFinishedCalled = true + } + private var controllerContext: ControllerContext = null private var mockZkClient: KafkaZkClient = null private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null private var mockReplicaStateMachine: ReplicaStateMachine = null private var mockPartitionStateMachine: PartitionStateMachine = null - private var testReassignmentListener: ReassignmentListener = null + private var testReassignmentListener: TestReassignmentListener = null private var mockBrokerRequestBatch: ControllerBrokerRequestBatch = null private final val controllerEpoch = 10 @@ -55,20 +70,7 @@ class ReassignmentManagerTest { @Before def setUp(): Unit = { - testReassignmentListener = new ReassignmentListener() { - var preReassignmentStartOrResume = false - var postReassignmentStartedOrResumed = false - var preReassignmentFinishCalled = false - var postReassignmentFinishedCalled = false - - override def preReassignmentStartOrResume(tp: TopicPartition): Unit = preReassignmentStartOrResume = true - - override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = postReassignmentStartedOrResumed = true - - override def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean): Unit = preReassignmentFinishCalled = true - - override def postReassignmentFinished(tp: TopicPartition): Unit = postReassignmentFinishedCalled = true - } + testReassignmentListener = new TestReassignmentListener() controllerContext = new ControllerContext controllerContext.epoch = controllerEpoch controllerContext.epochZkVersion = zkEpoch @@ -129,8 +131,8 @@ class ReassignmentManagerTest { // act val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(3, 4, 5)))) assertTrue(s"reassignment failed - $results", results(tp).isSuccess) - assertTrue("Listener was not called pre reassignment start", testReassignmentListener.preReassignmentStartOrResume) - assertTrue("Listener was not called post reassignment start", testReassignmentListener.postReassignmentStartedOrResumed) + assertTrue("Listener was not called pre reassignment start", testReassignmentListener.preReassignmentStartOrResumeCalled) + assertTrue("Listener was not called post reassignment start", testReassignmentListener.postReassignmentStartedOrResumedCalled) // U2. Should have updated memory assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) @@ -203,8 +205,8 @@ class ReassignmentManagerTest { // act partitionReassignmentManager.maybeResumeReassignment(tp) - assertTrue("Listener was not called pre reassignment resumption", testReassignmentListener.preReassignmentStartOrResume) - assertTrue("Listener was not called post reassignment resumption", testReassignmentListener.postReassignmentStartedOrResumed) + assertTrue("Listener was not called pre reassignment resumption", testReassignmentListener.preReassignmentStartOrResumeCalled) + assertTrue("Listener was not called post reassignment resumption", testReassignmentListener.postReassignmentStartedOrResumedCalled) // B1. All adding replicas moved to OnlineReplica state. verify(mockReplicaStateMachine).handleStateChanges( From 45fd0c6b0c3e4c61d9816e029893902220dcabec Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Fri, 22 Nov 2019 14:06:00 +0000 Subject: [PATCH 6/6] Use MockReplicaStateMachine and refactor tests --- .../controller/ReassignmentManagerTest.scala | 65 ++++++------------- .../controller/TopicDeletionManagerTest.scala | 39 ++--------- .../scala/unit/kafka/utils/TestUtils.scala | 31 ++++++++- 3 files changed, 55 insertions(+), 80 deletions(-) diff --git a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala index 6e4873236a89c..f2639757588ca 100644 --- a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala @@ -19,6 +19,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.cluster.{Broker, EndPoint} +import kafka.utils.TestUtils import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper.SetDataResponse @@ -41,8 +42,7 @@ class ReassignmentManagerTest { class TestReassignmentListener(var preReassignmentStartOrResumeCalled: Boolean = false, var postReassignmentStartedOrResumedCalled: Boolean = false, var preReassignmentFinishCalled: Boolean = false, - var postReassignmentFinishedCalled: Boolean = false, - ) extends ReassignmentListener { + var postReassignmentFinishedCalled: Boolean = false) extends ReassignmentListener { override def preReassignmentStartOrResume(tp: TopicPartition): Unit = preReassignmentStartOrResumeCalled = true override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = postReassignmentStartedOrResumedCalled = true @@ -60,8 +60,6 @@ class ReassignmentManagerTest { private var testReassignmentListener: TestReassignmentListener = null private var mockBrokerRequestBatch: ControllerBrokerRequestBatch = null - private final val controllerEpoch = 10 - private final val zkEpoch = 105 private final val topic = "topic" private final val tp = new TopicPartition(topic, 0) private final val mockPartitionReassignmentHandler = new PartitionReassignmentHandler(null) @@ -71,13 +69,16 @@ class ReassignmentManagerTest { @Before def setUp(): Unit = { testReassignmentListener = new TestReassignmentListener() - controllerContext = new ControllerContext - controllerContext.epoch = controllerEpoch - controllerContext.epochZkVersion = zkEpoch + controllerContext = TestUtils.initContext(brokers = Seq(1, 2, 3, 4, 5), + topics = Set(tp.topic), + numPartitions = 1, + replicationFactor = 3) mockZkClient = Mockito.mock(classOf[KafkaZkClient]) mockControllerBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) - mockReplicaStateMachine = Mockito.mock(classOf[ReplicaStateMachine]) + mockReplicaStateMachine = new MockReplicaStateMachine(controllerContext) + mockReplicaStateMachine.startup() mockPartitionStateMachine = new MockPartitionStateMachine(controllerContext, false) + mockPartitionStateMachine.startup() mockBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, testReassignmentListener, mockReplicaStateMachine, mockPartitionStateMachine, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None), @@ -90,9 +91,6 @@ class ReassignmentManagerTest { mockReplicaStateMachine, mockPartitionStateMachine, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None), shouldSkipReassignment = _ => Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))) - val initialAssignment = ReplicaAssignment(Seq(0, 1, 2), Seq(), Seq()) - controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) - setLiveBrokers(Seq(0, 1, 2, 3)) val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(1,2,3)))) assertTrue(results(tp).is(Errors.UNKNOWN_TOPIC_OR_PARTITION)) } @@ -109,24 +107,19 @@ class ReassignmentManagerTest { * Existing assignment is [1,2,3] * We issue a reassignment to [3, 4, 5] */ - val expectedAddingReplicas = Seq(4, 5) val expectedFullReplicaSet = Seq(3, 4, 5, 1, 2) - val initialAssignment = ReplicaAssignment(Seq(1, 2, 3), Seq(), Seq()) - val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3), zkEpoch) - controllerContext.partitionAssignments.put(topic, mutable.Map(tp.partition() -> initialAssignment)) - controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) + val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3), controllerContext.epochZkVersion) controllerContext.partitionsBeingReassigned.add(tp) - setLiveBrokers(Seq(1,2,3,4,5)) mockAreReplicasInIsr(tp, List(1, 2, 3), initialLeaderAndIsr) val expectedNewAssignment = ReplicaAssignment.fromOldAndNewReplicas(Seq(1, 2, 3), Seq(3, 4, 5)) assertEquals(expectedFullReplicaSet, expectedNewAssignment.replicas) // U1. Should update ZK - doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient).setTopicAssignmentRaw(topic, mutable.Map(tp -> expectedNewAssignment), zkEpoch) + doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient).setTopicAssignmentRaw(topic, mutable.Map(tp -> expectedNewAssignment), controllerContext.epochZkVersion) // U2. Should update memory // A1. Should update partition leader epoch in ZK val expectedLeaderAndIsr = initialLeaderAndIsr.newEpochAndZkVersion doReturn(UpdateLeaderAndIsrResult(Map(tp -> Right(expectedLeaderAndIsr)), Seq()), Nil: _*) - .when(mockZkClient).updateLeaderAndIsr(Map(tp -> expectedLeaderAndIsr), controllerEpoch, zkEpoch) + .when(mockZkClient).updateLeaderAndIsr(Map(tp -> expectedLeaderAndIsr), controllerContext.epoch, controllerContext.epochZkVersion) // act val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(3, 4, 5)))) @@ -139,15 +132,9 @@ class ReassignmentManagerTest { // A1. Should send a LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). verify(mockBrokerRequestBatch).addLeaderAndIsrRequestForBrokers( expectedFullReplicaSet, tp, - LeaderIsrAndControllerEpoch(expectedLeaderAndIsr, controllerEpoch), expectedNewAssignment, isNew = false + LeaderIsrAndControllerEpoch(expectedLeaderAndIsr, controllerContext.epoch), expectedNewAssignment, isNew = false ) - // A2. replicas in AR -> NewReplica - expectedAddingReplicas.foreach { newReplica => - verify(mockReplicaStateMachine).handleStateChanges( - Seq(PartitionAndReplica(tp, newReplica)), NewReplica - ) - } - verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) + verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerContext.epoch) assertFalse("Listener was wrongly called pre reassignment finish", testReassignmentListener.preReassignmentFinishCalled) assertFalse("Listener was wrongly called post reassignment finish", testReassignmentListener.postReassignmentFinishedCalled) @@ -178,17 +165,17 @@ class ReassignmentManagerTest { * Existing assignment is [1,2,3] * We had issued a reassignment to [3, 4, 5] and now all replicas are in ISR */ - val expectedRemovingReplicas = Seq(PartitionAndReplica(tp, 1), PartitionAndReplica(tp, 2)) val initialAssignment = ReplicaAssignment.fromOldAndNewReplicas(Seq(1, 2, 3), Seq(3, 4, 5)) val expectedNewAssignment = ReplicaAssignment(Seq(3, 4, 5), Seq(), Seq()) - val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3, 4, 5), zkEpoch) - setLiveBrokers(Seq(1, 2, 3, 4, 5)) + val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3, 4, 5), controllerContext.epochZkVersion) controllerContext.partitionAssignments.put(topic, mutable.Map(tp.partition() -> initialAssignment)) controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) controllerContext.partitionsBeingReassigned.add(tp) controllerContext.partitionLeadershipInfo.put(tp, - LeaderIsrAndControllerEpoch(initialLeaderAndIsr, controllerEpoch)) + LeaderIsrAndControllerEpoch(initialLeaderAndIsr, controllerContext.epoch)) mockAreReplicasInIsr(tp, List(1, 2, 3, 4, 5), initialLeaderAndIsr) + // A2. replicas in AR -> NewReplica + mockReplicaStateMachine.handleStateChanges(Seq(4, 5).map(PartitionAndReplica(tp, _)), NewReplica) // B2. Set RS = TRS, AR = [], RR = [] in memory. // B3. Send a LeaderAndIsr request with RS = TRS. @@ -197,7 +184,7 @@ class ReassignmentManagerTest { controllerContext.partitionStates.put(tp, NewPartition) // B6. Update ZK with RS = TRS, AR = [], RR = []. doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient) - .setTopicAssignmentRaw(tp.topic(), mutable.Map(tp -> expectedNewAssignment), zkEpoch) + .setTopicAssignmentRaw(tp.topic(), mutable.Map(tp -> expectedNewAssignment), controllerContext.epochZkVersion) // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. doReturn(true, Nil: _*).when(mockZkClient).reassignPartitionsInProgress() doReturn(Map(tp -> Seq(1, 2, 3)), Nil: _*).when(mockZkClient).getPartitionReassignment() @@ -208,24 +195,14 @@ class ReassignmentManagerTest { assertTrue("Listener was not called pre reassignment resumption", testReassignmentListener.preReassignmentStartOrResumeCalled) assertTrue("Listener was not called post reassignment resumption", testReassignmentListener.postReassignmentStartedOrResumedCalled) - // B1. All adding replicas moved to OnlineReplica state. - verify(mockReplicaStateMachine).handleStateChanges( - initialAssignment.addingReplicas.map(PartitionAndReplica(tp, _)), OnlineReplica - ) // B2. Should have updated memory assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) - // B4. replicas in RR -> Offline (force those replicas out of isr) - // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted) - verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, OfflineReplica) - verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, ReplicaDeletionStarted) - verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, ReplicaDeletionSuccessful) - verify(mockReplicaStateMachine).handleStateChanges(expectedRemovingReplicas, NonExistentReplica) // B7. Should have cleared in-memory partitionsBeingReassigned and called assertTrue("Listener was not called pre reassignment finish", testReassignmentListener.preReassignmentFinishCalled) assertEquals(Set(), controllerContext.partitionsBeingReassigned) // B8. Resend the update metadata request to every broker verify(mockBrokerRequestBatch).addUpdateMetadataRequestForBrokers(controllerContext.liveBrokerIds.toSeq, Set(tp)) - verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerEpoch) + verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerContext.epoch) assertTrue("Listener was not called post reassignment finish", testReassignmentListener.postReassignmentFinishedCalled) } @@ -244,7 +221,7 @@ class ReassignmentManagerTest { */ def mockAreReplicasInIsr(tp: TopicPartition, isr: List[Int], leaderAndIsr: LeaderAndIsr): Unit = { val tpStateMap: Map[TopicPartition, LeaderIsrAndControllerEpoch] = Map( - tp -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) + tp -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) ) doReturn(tpStateMap, Nil: _*).when(mockZkClient).getTopicPartitionStates(Seq(tp)) } diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala index e3e1996cb646c..5b2a0b588dcdc 100644 --- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala @@ -16,12 +16,8 @@ */ package kafka.controller -import kafka.cluster.{Broker, EndPoint} import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Assert._ import org.junit.Test import org.mockito.Mockito._ @@ -34,7 +30,7 @@ class TopicDeletionManagerTest { @Test def testInitialization(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar", "baz"), numPartitions = 2, @@ -58,7 +54,7 @@ class TopicDeletionManagerTest { @Test def testBasicDeletion(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar"), numPartitions = 2, @@ -99,7 +95,7 @@ class TopicDeletionManagerTest { @Test def testDeletionWithBrokerOffline(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar"), numPartitions = 2, @@ -167,7 +163,7 @@ class TopicDeletionManagerTest { @Test def testBrokerFailureAfterDeletionStarted(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar"), numPartitions = 2, @@ -226,31 +222,4 @@ class TopicDeletionManagerTest { assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted)) } - - def initContext(brokers: Seq[Int], - topics: Set[String], - numPartitions: Int, - replicationFactor: Int): ControllerContext = { - val context = new ControllerContext - val brokerEpochs = brokers.map { brokerId => - val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("blah"), - SecurityProtocol.PLAINTEXT) - Broker(brokerId, Seq(endpoint), rack = None) -> 1L - }.toMap - context.setLiveBrokerAndEpochs(brokerEpochs) - - // Simple round-robin replica assignment - var leaderIndex = 0 - for (topic <- topics; partitionId <- 0 until numPartitions) { - val partition = new TopicPartition(topic, partitionId) - val replicas = (0 until replicationFactor).map { i => - val replica = brokers((i + leaderIndex) % brokers.size) - replica - } - context.updatePartitionReplicaAssignment(partition, replicas) - leaderIndex += 1 - } - context - } - } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0f34268b31b79..3cd6382f9e9bf 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -39,7 +39,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile import Implicits._ import com.yammer.metrics.Metrics import com.yammer.metrics.core.Meter -import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch} import kafka.zk._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.AlterConfigOp.OpType @@ -1684,4 +1684,33 @@ object TestUtils extends Logging { waitUntilTrue(() => adminClient.listPartitionReassignments().reassignments().get().isEmpty, s"There still are ongoing reassignments", pause = pause) } + + /** + * Initializes a new ControllerContext + */ + def initContext(brokers: Seq[Int], + topics: scala.collection.Set[String], + numPartitions: Int, + replicationFactor: Int): ControllerContext = { + val context = new ControllerContext + val brokerEpochs = brokers.map { brokerId => + val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("blah"), + SecurityProtocol.PLAINTEXT) + Broker(brokerId, Seq(endpoint), rack = None) -> 1L + }.toMap + context.setLiveBrokerAndEpochs(brokerEpochs) + + // Simple round-robin replica assignment + var leaderIndex = 0 + for (topic <- topics; partitionId <- 0 until numPartitions) { + val partition = new TopicPartition(topic, partitionId) + val replicas = (0 until replicationFactor).map { i => + val replica = brokers((i + leaderIndex) % brokers.size) + replica + } + context.updatePartitionReplicaAssignment(partition, replicas) + leaderIndex += 1 + } + context + } }