diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7b28202e6da81..1e8b691a3d14f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -95,6 +95,39 @@ class KafkaController(val config: KafkaConfig, new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger)) val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine, partitionStateMachine, new ControllerDeletionClient(this, zkClient)) + val reassignmentListener = new ReassignmentListener { + + // While a reassignment is in progress, deletion is not allowed + override def preReassignmentStartOrResume(tp: TopicPartition): Unit = + topicDeletionManager.markTopicIneligibleForDeletion(Set(tp.topic), reason = "topic reassignment in progress") + + override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = + zkClient.registerZNodeChangeHandler(new PartitionReassignmentIsrChangeHandler(eventManager, tp)) + + override def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean): Unit = { + val path = TopicPartitionStateZNode.path(tp) + zkClient.unregisterZNodeChangeHandler(path) + if (deletedZNode) { + // Ensure we detect future reassignments + eventManager.put(ZkPartitionReassignment) + } + } + + // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed + override def postReassignmentFinished(tp: TopicPartition): Unit = + topicDeletionManager.resumeDeletionForTopics(Set(tp.topic)) + } + val reassignmentsManager = new ReassignmentManager(controllerContext, zkClient, reassignmentListener, + replicaStateMachine, partitionStateMachine, brokerRequestBatch, stateChangeLogger, + shouldSkipReassignment = tp => { + if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic())) { + info(s"Skipping reassignment of $tp since the topic is currently being deleted") + Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")) + } else { + None + } + } + ) private val controllerChangeHandler = new ControllerChangeHandler(eventManager) private val brokerChangeHandler = new BrokerChangeHandler(eventManager) @@ -424,9 +457,14 @@ class KafkaController(val config: KafkaConfig, // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // check if reassignment of some partitions need to be restarted - maybeResumeReassignments { (_, assignment) => - assignment.targetReplicas.exists(newBrokersSet.contains) + try { + reassignmentsManager.maybeResumeReassignments { (_, assignment) => + assignment.targetReplicas.exists(newBrokersSet.contains) + } + } catch { + case e: IllegalStateException => handleIllegalState(e) } + // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists // on the newly restarted brokers, there is a chance that topic deletion can resume val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) @@ -439,14 +477,6 @@ class KafkaController(val config: KafkaConfig, registerBrokerModificationsHandler(newBrokers) } - private def maybeResumeReassignments(shouldResume: (TopicPartition, ReplicaAssignment) => Boolean): Unit = { - controllerContext.partitionsBeingReassigned.foreach { tp => - val currentAssignment = controllerContext.partitionFullReplicaAssignment(tp) - if (shouldResume(tp, currentAssignment)) - onPartitionReassignment(tp, currentAssignment) - } - } - private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = { debug(s"Register BrokerModifications handler for $brokerIds") brokerIds.foreach { brokerId => @@ -544,194 +574,6 @@ class KafkaController(val config: KafkaConfig, replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica) } - /** - * This callback is invoked: - * 1. By the AlterPartitionReassignments API - * 2. By the reassigned partitions listener which is triggered when the /admin/reassign/partitions znode is created - * 3. When an ongoing reassignment finishes - this is detected by a change in the partition's ISR znode - * 4. Whenever a new broker comes up which is part of an ongoing reassignment - * 5. On controller startup/failover - * - * Reassigning replicas for a partition goes through a few steps listed in the code. - * RS = current assigned replica set - * ORS = Original replica set for partition - * TRS = Reassigned (target) replica set - * AR = The replicas we are adding as part of this reassignment - * RR = The replicas we are removing as part of this reassignment - * - * A reassignment may have up to three phases, each with its own steps: - - * Phase U (Assignment update): Regardless of the trigger, the first step is in the reassignment process - * is to update the existing assignment state. We always update the state in Zookeeper before - * we update memory so that it can be resumed upon controller fail-over. - * - * U1. Update ZK with RS = ORS + TRS, AR = TRS - ORS, RR = ORS - TRS. - * U2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS - * U3. If we are cancelling or replacing an existing reassignment, send StopReplica to all members - * of AR in the original reassignment if they are not in TRS from the new assignment - * - * To complete the reassignment, we need to bring the new replicas into sync, so depending on the state - * of the ISR, we will execute one of the following steps. - * - * Phase A (when TRS != ISR): The reassignment is not yet complete - * - * A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS. - * A2. Start new replicas AR by moving replicas in AR to NewReplica state. - * - * Phase B (when TRS = ISR): The reassignment is complete - * - * B1. Move all replicas in AR to OnlineReplica state. - * B2. Set RS = TRS, AR = [], RR = [] in memory. - * B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr. - * If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. - * We may send the LeaderAndIsr to more than the TRS replicas due to the - * way the partition state machine works (it reads replicas from ZK) - * B4. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state change, we shrink the - * isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. - * After that, we send a StopReplica (delete = false) to the replicas in RR. - * B5. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica (delete = true) to - * the replicas in RR to physically delete the replicas on disk. - * B6. Update ZK with RS=TRS, AR=[], RR=[]. - * B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. - * B8. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. - * - * In general, there are two goals we want to aim for: - * 1. Every replica present in the replica set of a LeaderAndIsrRequest gets the request sent to it - * 2. Replicas that are removed from a partition's assignment get StopReplica sent to them - * - * For example, if ORS = {1,2,3} and TRS = {4,5,6}, the values in the topic and leader/isr paths in ZK - * may go through the following transitions. - * RS AR RR leader isr - * {1,2,3} {} {} 1 {1,2,3} (initial state) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3} (step A2) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3,4,5,6} (phase B) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {1,2,3,4,5,6} (step B3) - * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {4,5,6} (step B4) - * {4,5,6} {} {} 4 {4,5,6} (step B6) - * - * Note that we have to update RS in ZK with TRS last since it's the only place where we store ORS persistently. - * This way, if the controller crashes before that step, we can still recover. - */ - private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { - // While a reassignment is in progress, deletion is not allowed - topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress") - - updateCurrentReassignment(topicPartition, reassignment) - - val addingReplicas = reassignment.addingReplicas - val removingReplicas = reassignment.removingReplicas - - if (!isReassignmentComplete(topicPartition, reassignment)) { - // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). - updateLeaderEpochAndSendRequest(topicPartition, reassignment) - // A2. replicas in AR -> NewReplica - startNewReplicasForReassignedPartition(topicPartition, addingReplicas) - } else { - // B1. replicas in AR -> OnlineReplica - replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica) - // B2. Set RS = TRS, AR = [], RR = [] in memory. - val completedReassignment = ReplicaAssignment(reassignment.targetReplicas) - controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment) - // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and - // a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS - moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment) - // B4. replicas in RR -> Offline (force those replicas out of isr) - // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted) - stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas) - // B6. Update ZK with RS = TRS, AR = [], RR = []. - updateReplicaAssignmentForPartition(topicPartition, completedReassignment) - // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it. - removePartitionFromReassigningPartitions(topicPartition, completedReassignment) - // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) - // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed - topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic)) - } - } - - /** - * Update the current assignment state in Zookeeper and in memory. If a reassignment is already in - * progress, then the new reassignment will supplant it and some replicas will be shutdown. - * - * Note that due to the way we compute the original replica set, we cannot guarantee that a - * cancellation will restore the original replica order. Target replicas are always listed - * first in the replica set in the desired order, which means we have no way to get to the - * original order if the reassignment overlaps with the current assignment. For example, - * with an initial assignment of [1, 2, 3] and a reassignment of [3, 4, 2], then the replicas - * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If the reassignment - * is cancelled, there is no way to restore the original order. - * - * @param topicPartition The reassigning partition - * @param reassignment The new reassignment - */ - private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { - val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - - if (currentAssignment != reassignment) { - debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment") - - // U1. Update assignment state in zookeeper - updateReplicaAssignmentForPartition(topicPartition, reassignment) - // U2. Update assignment state in memory - controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment) - - // If there is a reassignment already in progress, then some of the currently adding replicas - // may be eligible for immediate removal, in which case we need to stop the replicas. - val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas) - if (unneededReplicas.nonEmpty) - stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) - } - - val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition) - zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler) - - controllerContext.partitionsBeingReassigned.add(topicPartition) - } - - /** - * Trigger a partition reassignment provided that the topic exists and is not being deleted. - * - * This is called when a reassignment is initially received either through Zookeeper or through the - * AlterPartitionReassignments API - * - * The `partitionsBeingReassigned` field in the controller context will be updated by this - * call after the reassignment completes validation and is successfully stored in the topic - * assignment zNode. - * - * @param reassignments The reassignments to begin processing - * @return A map of any errors in the reassignment. If the error is NONE for a given partition, - * then the reassignment was submitted successfully. - */ - private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = { - reassignments.map { case (tp, reassignment) => - val topic = tp.topic - - val apiError = if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) { - info(s"Skipping reassignment of $tp since the topic is currently being deleted") - new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") - } else { - val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) - if (assignedReplicas.nonEmpty) { - try { - onPartitionReassignment(tp, reassignment) - ApiError.NONE - } catch { - case e: ControllerMovedException => - info(s"Failed completing reassignment of partition $tp because controller has moved to another broker") - throw e - case e: Throwable => - error(s"Error completing reassignment of partition $tp", e) - new ApiError(Errors.UNKNOWN_SERVER_ERROR) - } - } else { - new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") - } - } - - tp -> apiError - } - } - /** * Attempt to elect a replica as leader for each of the given partitions. * @param partitions The partitions to have a new leader elected @@ -836,8 +678,12 @@ class KafkaController(val config: KafkaConfig, // New reassignments may have been submitted through Zookeeper while the controller was failing over val zkPartitionsResumed = processZkPartitionReassignment() // We may also have some API-based reassignments that need to be restarted - maybeResumeReassignments { (tp, _) => - !zkPartitionsResumed.contains(tp) + try { + reassignmentsManager.maybeResumeReassignments { (tp, _) => + !zkPartitionsResumed.contains(tp) + } + } catch { + case e: IllegalStateException => handleIllegalState(e) } } @@ -861,99 +707,6 @@ class KafkaController(val config: KafkaConfig, } } - private def isReassignmentComplete(partition: TopicPartition, assignment: ReplicaAssignment): Boolean = { - if (!assignment.isBeingReassigned) { - true - } else { - zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch => - val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr.toSet - val targetReplicas = assignment.targetReplicas.toSet - targetReplicas.subsetOf(isr) - } - } - } - - private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition, - newAssignment: ReplicaAssignment): Unit = { - val reassignedReplicas = newAssignment.replicas - val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader - - if (!reassignedReplicas.contains(currentLeader)) { - info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + - s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader") - // move the leader to one of the alive and caught up new replicas - partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) - } else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) { - info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + - s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive") - // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest - updateLeaderEpochAndSendRequest(topicPartition, newAssignment) - } else { - info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + - s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead") - partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) - } - } - - private def stopRemovedReplicasOfReassignedPartition(topicPartition: TopicPartition, - removedReplicas: Seq[Int]): Unit = { - // first move the replica to offline state (the controller removes it from the ISR) - val replicasToBeDeleted = removedReplicas.map(PartitionAndReplica(topicPartition, _)) - replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica) - // send stop replica command to the old replicas - replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted) - // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed - replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful) - replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica) - } - - private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = { - var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) - topicAssignment += topicPartition -> assignment - - val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion) - setDataResponse.resultCode match { - case Code.OK => - info(s"Successfully updated assignment of partition $topicPartition to $assignment") - case Code.NONODE => - throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " + - "has no current assignment") - case _ => throw new KafkaException(setDataResponse.resultException.get) - } - } - - private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = { - // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned - // replicas list - newReplicas.foreach { replica => - replicaStateMachine.handleStateChanges(Seq(PartitionAndReplica(topicPartition, replica)), NewReplica) - } - } - - private def updateLeaderEpochAndSendRequest(topicPartition: TopicPartition, - assignment: ReplicaAssignment): Unit = { - val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) - updateLeaderEpoch(topicPartition) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - try { - brokerRequestBatch.newBatch() - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas, topicPartition, - updatedLeaderIsrAndControllerEpoch, assignment, isNew = false) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) - } catch { - case e: IllegalStateException => - handleIllegalState(e) - } - stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " + - s"new replica assignment $assignment to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " + - s"for partition being reassigned $topicPartition") - - case None => // fail the reassignment - stateChangeLog.error(s"Failed to send LeaderAndIsr request with new replica assignment " + - s"$assignment to leader for partition being reassigned $topicPartition") - } - } - private def registerPartitionModificationsHandlers(topics: Seq[String]) = { topics.foreach { topic => val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic) @@ -975,48 +728,6 @@ class KafkaController(val config: KafkaConfig, } } - private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition, - assignment: ReplicaAssignment): Unit = { - if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val path = TopicPartitionStateZNode.path(topicPartition) - zkClient.unregisterZNodeChangeHandler(path) - maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas) - controllerContext.partitionsBeingReassigned.remove(topicPartition) - } else { - throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory") - } - } - - /** - * Remove partitions from an active zk-based reassignment (if one exists). - * - * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed - */ - private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = { - if (!zkClient.reassignPartitionsInProgress()) - return - - val reassigningPartitions = zkClient.getPartitionReassignment() - val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) => - shouldRemoveReassignment(tp, replicas) - } - info(s"Removing partitions $removingPartitions from the list of reassigned partitions in zookeeper") - - // write the new list to zookeeper - if (updatedPartitionsBeingReassigned.isEmpty) { - info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}") - zkClient.deletePartitionReassignment(controllerContext.epochZkVersion) - // Ensure we detect future reassignments - eventManager.put(ZkPartitionReassignment) - } else { - try { - zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion) - } catch { - case e: KeeperException => throw new AdminOperationException(e) - } - } - } - private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition], isTriggeredByAutoRebalance : Boolean): Unit = { for (partition <- partitionsToBeRemoved) { @@ -1054,49 +765,6 @@ class KafkaController(val config: KafkaConfig, } } - /** - * Does not change leader or isr, but just increments the leader epoch - * - * @param partition partition - * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. - */ - private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = { - debug(s"Updating leader epoch for partition $partition") - var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None - var zkWriteCompleteOrUnnecessary = false - while (!zkWriteCompleteOrUnnecessary) { - // refresh leader and isr from zookeeper again - zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { - case Some(leaderIsrAndControllerEpoch) => - val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - if (controllerEpoch > epoch) - throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " + - s"means the current controller with epoch $epoch went through a soft failure and another " + - s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller") - // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded - // assigned replica list - val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion - // update the new leadership decision in zookeeper or retry - val UpdateLeaderAndIsrResult(finishedUpdates, _) = - zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion) - - finishedUpdates.headOption.map { - case (partition, Right(leaderAndIsr)) => - finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) - info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}") - true - case (_, Left(e)) => - throw e - }.getOrElse(false) - case None => - throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " + - "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist") - } - } - finalLeaderIsrAndControllerEpoch - } - private def checkAndTriggerAutoLeaderRebalance(): Unit = { trace("Checking need to trigger auto leader balancing") val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] = @@ -1577,36 +1245,18 @@ class KafkaController(val config: KafkaConfig, // We need to register the watcher if the path doesn't exist in order to detect future // reassignments and we get the `path exists` check for free if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) { - val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] - val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - - zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) => - maybeBuildReassignment(tp, Some(targetReplicas)) match { - case Some(context) => partitionsToReassign.put(tp, context) - case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) - } - } - - reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) - val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE) - if (partitionsFailed.nonEmpty) { - warn(s"Failed reassignment through zk with the following errors: $partitionsFailed") - maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp)) + try { + reassignmentsManager.triggerZkReassignment() + } catch { + case e: IllegalStateException => handleIllegalState(e) } - partitionsReassigned.keySet } else { Set.empty } } /** - * Process a partition reassignment from the AlterPartitionReassignment API. If there is an - * existing reassignment through zookeeper for any of the requested partitions, they will be - * cancelled prior to beginning the new reassignment. Any zk-based reassignment for partitions - * which are NOT included in this call will not be affected. - * - * @param reassignments Map of reassignments passed through the AlterReassignments API. A null value - * means that we should cancel an in-progress reassignment. + * Process a partition reassignment from the AlterPartitionReassignment API. * @param callback Callback to send AlterReassignments response */ private def processApiPartitionReassignment(reassignments: Map[TopicPartition, Option[Seq[Int]]], @@ -1614,57 +1264,11 @@ class KafkaController(val config: KafkaConfig, if (!isActive) { callback(Right(new ApiError(Errors.NOT_CONTROLLER))) } else { - val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] - val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - - reassignments.foreach { case (tp, targetReplicas) => - if (replicasAreValid(tp, targetReplicas)) { - maybeBuildReassignment(tp, targetReplicas) match { - case Some(context) => partitionsToReassign.put(tp, context) - case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) - } - } else { - reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)) - } - } - - // The latest reassignment (whether by API or through zk) always takes precedence, - // so remove from active zk reassignment (if one exists) - maybeRemoveFromZkReassignment((tp, _) => partitionsToReassign.contains(tp)) - - reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) - callback(Left(reassignmentResults)) - } - } - - private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { - replicasOpt match { - case Some(replicas) => - val replicaSet = replicas.toSet - if (replicas.isEmpty || replicas.size != replicaSet.size) - false - else if (replicas.exists(_ < 0)) - false - else { - // Ensure that any new replicas are among the live brokers - val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - val newAssignment = currentAssignment.reassignTo(replicas) - newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds) - } - - case None => true - } - } - - private def maybeBuildReassignment(topicPartition: TopicPartition, - targetReplicasOpt: Option[Seq[Int]]): Option[ReplicaAssignment] = { - val replicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - if (replicaAssignment.isBeingReassigned) { - val targetReplicas = targetReplicasOpt.getOrElse(replicaAssignment.originReplicas) - Some(replicaAssignment.reassignTo(targetReplicas)) - } else { - targetReplicasOpt.map { targetReplicas => - replicaAssignment.reassignTo(targetReplicas) + try { + val results = reassignmentsManager.triggerApiReassignment(reassignments) + callback(Left(results)) + } catch { + case e: IllegalStateException => handleIllegalState(e) } } } @@ -1672,14 +1276,10 @@ class KafkaController(val config: KafkaConfig, private def processPartitionReassignmentIsrChange(topicPartition: TopicPartition): Unit = { if (!isActive) return - if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { - val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - if (isReassignmentComplete(topicPartition, reassignment)) { - // resume the partition reassignment process - info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " + - s"reassigning partition $topicPartition") - onPartitionReassignment(topicPartition, reassignment) - } + try { + reassignmentsManager.maybeResumeReassignment(topicPartition) + } catch { + case e: IllegalStateException => handleIllegalState(e) } } @@ -1687,20 +1287,7 @@ class KafkaController(val config: KafkaConfig, if (!isActive) { callback(Right(new ApiError(Errors.NOT_CONTROLLER))) } else { - val results: mutable.Map[TopicPartition, ReplicaAssignment] = mutable.Map.empty - val partitionsToList = partitionsOpt match { - case Some(partitions) => partitions - case None => controllerContext.partitionsBeingReassigned - } - - partitionsToList.foreach { tp => - val assignment = controllerContext.partitionFullReplicaAssignment(tp) - if (assignment.isBeingReassigned) { - results += tp -> assignment - } - } - - callback(Left(results)) + callback(Left(reassignmentsManager.listPartitionsBeingReassigned(partitionsOpt))) } } diff --git a/core/src/main/scala/kafka/controller/ReassignmentManager.scala b/core/src/main/scala/kafka/controller/ReassignmentManager.scala new file mode 100644 index 0000000000000..cd789cba5eb3a --- /dev/null +++ b/core/src/main/scala/kafka/controller/ReassignmentManager.scala @@ -0,0 +1,561 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import kafka.admin.AdminOperationException +import kafka.common.StateChangeFailedException +import kafka.utils.Logging +import kafka.zk.{KafkaZkClient, ReassignPartitionsZNode, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import org.apache.kafka.common.errors.ControllerMovedException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.KeeperException.Code + +import scala.collection.{Map, Seq, Set, immutable, mutable} + +trait ReassignmentListener { + /** + * Called right before the reassignment is starting + */ + def preReassignmentStartOrResume(tp: TopicPartition) + + /** + * Called right after the reassignment is persisted in ZK + */ + def postReassignmentStartedOrResumed(tp: TopicPartition) + + /** + * Called right before we clear a reassignment from memory + * @param deletedZNode - boolean indicating whether the reassign_partitions znode was + * deleted as part of the completion of the reassignment + * (e.g if it was the last reassignment in that znode) + */ + def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean) + + /** + * Called at the very end of the reassignment process + */ + def postReassignmentFinished(tp: TopicPartition) +} + +/** + * A helper class which contains logic for driving partition reassignments. + * This class is not thread-safe. + */ +class ReassignmentManager(controllerContext: ControllerContext, + zkClient: KafkaZkClient, + reassignmentListener: ReassignmentListener, + replicaStateMachine: ReplicaStateMachine, + partitionStateMachine: PartitionStateMachine, + brokerRequestBatch: ControllerBrokerRequestBatch, + stateChangeLogger: StateChangeLogger, + shouldSkipReassignment: TopicPartition => Option[ApiError]) extends Logging { + + /** + * If there is an existing reassignment through zookeeper for any of the requested partitions, they will be + * cancelled prior to beginning the new reassignment. Any zk-based reassignment for partitions + * which are NOT included in this call will not be affected. + * + * @param reassignments Map of reassignments passed through the AlterReassignments API. A null value + * means that we should cancel an in-progress reassignment. + */ + def triggerApiReassignment(reassignments: Map[TopicPartition, Option[Seq[Int]]]): mutable.Map[TopicPartition, ApiError] = { + val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] + val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] + + reassignments.foreach { case (tp, targetReplicas) => + if (replicasAreValid(tp, targetReplicas)) { + maybeBuildReassignment(tp, targetReplicas) match { + case Some(context) => partitionsToReassign.put(tp, context) + case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) + } + } else { + reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)) + } + } + + // The latest reassignment (whether by API or through zk) always takes precedence, + // so remove from active zk reassignment (if one exists) + maybeRemoveFromZkReassignment((tp, _) => partitionsToReassign.contains(tp)) + + reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) + } + + /** + * @throws IllegalStateException + */ + def triggerZkReassignment(): Set[TopicPartition] = { + val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] + val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] + + zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) => + maybeBuildReassignment(tp, Some(targetReplicas)) match { + case Some(context) => partitionsToReassign.put(tp, context) + case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) + } + } + + reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign) + val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE) + if (partitionsFailed.nonEmpty) { + warn(s"Failed reassignment through zk with the following errors: $partitionsFailed") + maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp)) + } + partitionsReassigned.keySet + } + + /** + * @throws IllegalStateException + */ + def maybeResumeReassignments(shouldResume: (TopicPartition, ReplicaAssignment) => Boolean): Unit = { + controllerContext.partitionsBeingReassigned.foreach { tp => + val currentAssignment = controllerContext.partitionFullReplicaAssignment(tp) + if (shouldResume(tp, currentAssignment)) + onPartitionReassignment(tp, currentAssignment) + } + } + + /** + * @throws IllegalStateException + */ + def maybeResumeReassignment(topicPartition: TopicPartition): Unit = { + if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { + val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + if (isReassignmentComplete(topicPartition, reassignment)) { + // resume the partition reassignment process + info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " + + s"reassigning partition $topicPartition") + onPartitionReassignment(topicPartition, reassignment) + } + } + } + + def listPartitionsBeingReassigned(partitionsOpt: Option[Set[TopicPartition]]): Map[TopicPartition, ReplicaAssignment] = { + val results: mutable.Map[TopicPartition, ReplicaAssignment] = mutable.Map.empty + val partitionsToList = partitionsOpt match { + case Some(partitions) => partitions + case None => controllerContext.partitionsBeingReassigned + } + + partitionsToList.foreach { tp => + val assignment = controllerContext.partitionFullReplicaAssignment(tp) + if (assignment.isBeingReassigned) { + results += tp -> assignment + } + } + results + } + + /** + * Trigger a partition reassignment provided that the topic exists and is not being deleted. + * + * This is called when a reassignment is initially received either through Zookeeper or through the + * AlterPartitionReassignments API + * + * The `partitionsBeingReassigned` field in the controller context will be updated by this + * call after the reassignment completes validation and is successfully stored in the topic + * assignment zNode. + * + * @param reassignments The reassignments to begin processing + * @return A map of any errors in the reassignment. If the error is NONE for a given partition, + * then the reassignment was submitted successfully. + */ + private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = { + reassignments.map { case (tp, reassignment) => + val apiError = shouldSkipReassignment(tp).getOrElse { + val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) + if (assignedReplicas.nonEmpty) { + try { + onPartitionReassignment(tp, reassignment) + ApiError.NONE + } catch { + case e: ControllerMovedException => + info(s"Failed completing reassignment of partition $tp because controller has moved to another broker") + throw e + case e: Throwable => + error(s"Error completing reassignment of partition $tp", e) + new ApiError(Errors.UNKNOWN_SERVER_ERROR) + } + } else { + new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") + } + } + + tp -> apiError + } + } + + private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { + replicasOpt match { + case Some(replicas) => + val replicaSet = replicas.toSet + if (replicas.isEmpty || replicas.size != replicaSet.size) + false + else if (replicas.exists(_ < 0)) + false + else { + // Ensure that any new replicas are among the live brokers + val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + val newAssignment = currentAssignment.reassignTo(replicas) + newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds) + } + + case None => true + } + } + + private def maybeBuildReassignment(topicPartition: TopicPartition, + targetReplicasOpt: Option[Seq[Int]]): Option[ReplicaAssignment] = { + val replicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + if (replicaAssignment.isBeingReassigned) { + val targetReplicas = targetReplicasOpt.getOrElse(replicaAssignment.originReplicas) + Some(replicaAssignment.reassignTo(targetReplicas)) + } else { + targetReplicasOpt.map { targetReplicas => + replicaAssignment.reassignTo(targetReplicas) + } + } + } + + /** + * This callback is invoked: + * 1. By the AlterPartitionReassignments API + * 2. By the reassigned partitions listener which is triggered when the /admin/reassign_partitions znode is created + * 3. When an ongoing reassignment finishes - this is detected by a change in the partition's ISR znode + * 4. Whenever a new broker comes up which is part of an ongoing reassignment + * 5. On controller startup/failover + * + * Reassigning replicas for a partition goes through a few steps listed in the code. + * RS = current assigned replica set + * ORS = Original replica set for partition + * TRS = Reassigned (target) replica set + * AR = The replicas we are adding as part of this reassignment + * RR = The replicas we are removing as part of this reassignment + * + * A reassignment may have up to three phases, each with its own steps: + * + * Phase U (Assignment update): Regardless of the trigger, the first step is in the reassignment process + * is to update the existing assignment state. We always update the state in Zookeeper before + * we update memory so that it can be resumed upon controller fail-over. + * + * U1. Update ZK with RS = ORS + TRS, AR = TRS - ORS, RR = ORS - TRS. + * U2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS + * U3. If we are cancelling or replacing an existing reassignment, send StopReplica to all members + * of AR in the original reassignment if they are not in TRS from the new assignment + * + * To complete the reassignment, we need to bring the new replicas into sync, so depending on the state + * of the ISR, we will execute one of the following steps. + * + * Phase A (when TRS != ISR): The reassignment is not yet complete + * + * A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS. + * A2. Start new replicas AR by moving replicas in AR to NewReplica state. + * + * Phase B (when TRS = ISR): The reassignment is complete + * + * B1. Move all replicas in AR to OnlineReplica state. + * B2. Set RS = TRS, AR = [], RR = [] in memory. + * B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr. + * If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. + * We may send the LeaderAndIsr to more than the TRS replicas due to the + * way the partition state machine works (it reads replicas from ZK) + * B4. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state change, we shrink the + * isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. + * After that, we send a StopReplica (delete = false) to the replicas in RR. + * B5. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica (delete = true) to + * the replicas in RR to physically delete the replicas on disk. + * B6. Update ZK with RS=TRS, AR=[], RR=[]. + * B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. + * B8. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. + * + * In general, there are two goals we want to aim for: + * 1. Every replica present in the replica set of a LeaderAndIsrRequest gets the request sent to it + * 2. Replicas that are removed from a partition's assignment get StopReplica sent to them + * + * For example, if ORS = {1,2,3} and TRS = {4,5,6}, the values in the topic and leader/isr paths in ZK + * may go through the following transitions. + * RS AR RR leader isr + * {1,2,3} {} {} 1 {1,2,3} (initial state) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3} (step A2) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 1 {1,2,3,4,5,6} (phase B) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {1,2,3,4,5,6} (step B3) + * {4,5,6,1,2,3} {4,5,6} {1,2,3} 4 {4,5,6} (step B4) + * {4,5,6} {} {} 4 {4,5,6} (step B6) + * + * Note that we have to update RS in ZK with TRS last since it's the only place where we store ORS persistently. + * This way, if the controller crashes before that step, we can still recover. + * + * @throws IllegalStateException + */ + private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { + reassignmentListener.preReassignmentStartOrResume(topicPartition) + + updateCurrentReassignment(topicPartition, reassignment) + + val addingReplicas = reassignment.addingReplicas + val removingReplicas = reassignment.removingReplicas + + if (!isReassignmentComplete(topicPartition, reassignment)) { + // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). + updateLeaderEpochAndSendRequest(topicPartition, reassignment) + // A2. replicas in AR -> NewReplica + startNewReplicasForReassignedPartition(topicPartition, addingReplicas) + } else { + // B1. replicas in AR -> OnlineReplica + replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica) + // B2. Set RS = TRS, AR = [], RR = [] in memory. + val completedReassignment = ReplicaAssignment(reassignment.targetReplicas) + controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment) + // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and + // a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS + moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment) + // B4. replicas in RR -> Offline (force those replicas out of isr) + // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted) + stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas) + // B6. Update ZK with RS = TRS, AR = [], RR = []. + updateReplicaAssignmentForPartition(topicPartition, completedReassignment) + // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it. + removePartitionFromReassigningPartitions(topicPartition, completedReassignment) + // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker + brokerRequestBatch.newBatch() + brokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) + reassignmentListener.postReassignmentFinished(topicPartition) + } + } + + /** + * Update the current assignment state in Zookeeper and in memory. If a reassignment is already in + * progress, then the new reassignment will supplant it and some replicas will be shutdown. + * + * Note that due to the way we compute the original replica set, we cannot guarantee that a + * cancellation will restore the original replica order. Target replicas are always listed + * first in the replica set in the desired order, which means we have no way to get to the + * original order if the reassignment overlaps with the current assignment. For example, + * with an initial assignment of [1, 2, 3] and a reassignment of [3, 4, 2], then the replicas + * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If the reassignment + * is cancelled, there is no way to restore the original order. + * + * @param topicPartition The reassigning partition + * @param reassignment The new reassignment + */ + private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = { + val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + + if (currentAssignment != reassignment) { + debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment") + + // U1. Update assignment state in zookeeper + updateReplicaAssignmentForPartition(topicPartition, reassignment) + // U2. Update assignment state in memory + controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment) + + // If there is a reassignment already in progress, then some of the currently adding replicas + // may be eligible for immediate removal, in which case we need to stop the replicas. + val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas) + if (unneededReplicas.nonEmpty) + stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas) + } + + reassignmentListener.postReassignmentStartedOrResumed(topicPartition) + + controllerContext.partitionsBeingReassigned.add(topicPartition) + } + + private def isReassignmentComplete(partition: TopicPartition, assignment: ReplicaAssignment): Boolean = { + if (!assignment.isBeingReassigned) { + true + } else { + zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch => + val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr.toSet + val targetReplicas = assignment.targetReplicas.toSet + targetReplicas.subsetOf(isr) + } + } + } + + /** + * @throws IllegalStateException + */ + private def updateLeaderEpochAndSendRequest(topicPartition: TopicPartition, + assignment: ReplicaAssignment): Unit = { + val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) + updateLeaderEpoch(topicPartition) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + brokerRequestBatch.newBatch() + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas, topicPartition, + updatedLeaderIsrAndControllerEpoch, assignment, isNew = false) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) + stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " + + s"new replica assignment $assignment to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " + + s"for partition being reassigned $topicPartition") + + case None => // fail the reassignment + stateChangeLog.error(s"Failed to send LeaderAndIsr request with new replica assignment " + + s"$assignment to leader for partition being reassigned $topicPartition") + } + } + + /** + * Does not change leader or isr, but just increments the leader epoch + * + * @param partition partition + * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. + */ + private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = { + debug(s"Updating leader epoch for partition $partition") + var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None + var zkWriteCompleteOrUnnecessary = false + while (!zkWriteCompleteOrUnnecessary) { + // refresh leader and isr from zookeeper again + zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match { + case Some(leaderIsrAndControllerEpoch) => + val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch + if (controllerEpoch > controllerContext.epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " + + s"means the current controller with epoch ${controllerContext.epoch} went through a soft failure and another " + + s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller") + // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded + // assigned replica list + val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion + // update the new leadership decision in zookeeper or retry + val UpdateLeaderAndIsrResult(finishedUpdates, _) = + zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), controllerContext.epoch, controllerContext.epochZkVersion) + + finishedUpdates.headOption.exists { + case (partition, Right(leaderAndIsr)) => + finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)) + info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}") + true + case (_, Left(e)) => + throw e + } + case None => + throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " + + "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist") + } + } + finalLeaderIsrAndControllerEpoch + } + + private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = { + var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) + topicAssignment += topicPartition -> assignment + + val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion) + setDataResponse.resultCode match { + case Code.OK => + info(s"Successfully updated assignment of partition $topicPartition to $assignment") + case Code.NONODE => + throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " + + "has no current assignment") + case _ => throw new KafkaException(setDataResponse.resultException.get) + } + } + + private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = { + // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned + // replicas list + newReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Seq(PartitionAndReplica(topicPartition, replica)), NewReplica) + } + } + + private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition, + newAssignment: ReplicaAssignment): Unit = { + val reassignedReplicas = newAssignment.replicas + val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader + + if (!reassignedReplicas.contains(currentLeader)) { + info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + + s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader") + // move the leader to one of the alive and caught up new replicas + partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) + } else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) { + info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + + s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive") + // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicPartition, newAssignment) + } else { + info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + + s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead") + partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy)) + } + } + + private def stopRemovedReplicasOfReassignedPartition(topicPartition: TopicPartition, + removedReplicas: Seq[Int]): Unit = { + // first move the replica to offline state (the controller removes it from the ISR) + val replicasToBeDeleted = removedReplicas.map(PartitionAndReplica(topicPartition, _)) + replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica) + // send stop replica command to the old replicas + replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted) + // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed + replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful) + replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica) + } + + private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition, + assignment: ReplicaAssignment): Unit = { + if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { + val deletedZNode = maybeRemoveFromZkReassignment( + (tp, replicas) => tp == topicPartition && replicas == assignment.replicas + ) + + reassignmentListener.preReassignmentFinish(topicPartition, deletedZNode) + controllerContext.partitionsBeingReassigned.remove(topicPartition) + } else { + throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory") + } + } + + /** + * Remove partitions from an active zk-based reassignment (if one exists). + * + * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed + * @return a boolean indicating whether the /admin/reassign_partitions znode was deleted + */ + private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Boolean = { + if (!zkClient.reassignPartitionsInProgress()) + return false + + val reassigningPartitions = zkClient.getPartitionReassignment() + val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) => + shouldRemoveReassignment(tp, replicas) + } + info(s"Removing partitions $removingPartitions from the list of reassigned partitions in zookeeper") + + // write the new list to zookeeper + if (updatedPartitionsBeingReassigned.isEmpty) { + info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}") + zkClient.deletePartitionReassignment(controllerContext.epochZkVersion) + true + } else { + try { + zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion) + false + } catch { + case e: KeeperException => throw new AdminOperationException(e) + } + } + } +} diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index d032b3b499dbf..b68a213143e86 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -81,7 +81,6 @@ class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkCli * as well as from zookeeper. This is the only time the /brokers/topics/ path gets deleted. On the other hand, * if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then * it marks the topic for deletion retry. - * @param controller */ class TopicDeletionManager(config: KafkaConfig, controllerContext: ControllerContext, diff --git a/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala new file mode 100644 index 0000000000000..f2639757588ca --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ReassignmentManagerTest.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import kafka.api.LeaderAndIsr +import kafka.cluster.{Broker, EndPoint} +import kafka.utils.TestUtils +import kafka.zk.KafkaZkClient +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper.SetDataResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.zookeeper.KeeperException.Code +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.{Before, Test} +import org.mockito.Mockito +import org.mockito.Mockito.doReturn +import org.mockito.Mockito.verify + +import scala.collection.{Map, Set, mutable} + +class ReassignmentManagerTest { + + class TestReassignmentListener(var preReassignmentStartOrResumeCalled: Boolean = false, + var postReassignmentStartedOrResumedCalled: Boolean = false, + var preReassignmentFinishCalled: Boolean = false, + var postReassignmentFinishedCalled: Boolean = false) extends ReassignmentListener { + override def preReassignmentStartOrResume(tp: TopicPartition): Unit = preReassignmentStartOrResumeCalled = true + + override def postReassignmentStartedOrResumed(tp: TopicPartition): Unit = postReassignmentStartedOrResumedCalled = true + + override def preReassignmentFinish(tp: TopicPartition, deletedZNode: Boolean): Unit = preReassignmentFinishCalled = true + + override def postReassignmentFinished(tp: TopicPartition): Unit = postReassignmentFinishedCalled = true + } + + private var controllerContext: ControllerContext = null + private var mockZkClient: KafkaZkClient = null + private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null + private var mockReplicaStateMachine: ReplicaStateMachine = null + private var mockPartitionStateMachine: PartitionStateMachine = null + private var testReassignmentListener: TestReassignmentListener = null + + private var mockBrokerRequestBatch: ControllerBrokerRequestBatch = null + private final val topic = "topic" + private final val tp = new TopicPartition(topic, 0) + private final val mockPartitionReassignmentHandler = new PartitionReassignmentHandler(null) + + private var partitionReassignmentManager: ReassignmentManager = null + + @Before + def setUp(): Unit = { + testReassignmentListener = new TestReassignmentListener() + controllerContext = TestUtils.initContext(brokers = Seq(1, 2, 3, 4, 5), + topics = Set(tp.topic), + numPartitions = 1, + replicationFactor = 3) + mockZkClient = Mockito.mock(classOf[KafkaZkClient]) + mockControllerBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) + mockReplicaStateMachine = new MockReplicaStateMachine(controllerContext) + mockReplicaStateMachine.startup() + mockPartitionStateMachine = new MockPartitionStateMachine(controllerContext, false) + mockPartitionStateMachine.startup() + mockBrokerRequestBatch = Mockito.mock(classOf[ControllerBrokerRequestBatch]) + partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, testReassignmentListener, + mockReplicaStateMachine, mockPartitionStateMachine, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None), + shouldSkipReassignment = _ => None) + } + + @Test + def testShouldSkipReassignment(): Unit = { + partitionReassignmentManager = new ReassignmentManager(controllerContext, mockZkClient, testReassignmentListener, + mockReplicaStateMachine, mockPartitionStateMachine, mockBrokerRequestBatch, new StateChangeLogger(0, inControllerContext = true, None), + shouldSkipReassignment = _ => Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))) + + val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(1,2,3)))) + assertTrue(results(tp).is(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } + + /** + * Phase A of a partition reassignment denotes the initial trigger of a reassignment. + * + * A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS. + * A2. Start new replicas AR by moving replicas in AR to NewReplica state. + */ + @Test + def testPhaseAOfPartitionReassignment(): Unit = { + /* + * Existing assignment is [1,2,3] + * We issue a reassignment to [3, 4, 5] + */ + val expectedFullReplicaSet = Seq(3, 4, 5, 1, 2) + val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3), controllerContext.epochZkVersion) + controllerContext.partitionsBeingReassigned.add(tp) + mockAreReplicasInIsr(tp, List(1, 2, 3), initialLeaderAndIsr) + val expectedNewAssignment = ReplicaAssignment.fromOldAndNewReplicas(Seq(1, 2, 3), Seq(3, 4, 5)) + assertEquals(expectedFullReplicaSet, expectedNewAssignment.replicas) + // U1. Should update ZK + doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient).setTopicAssignmentRaw(topic, mutable.Map(tp -> expectedNewAssignment), controllerContext.epochZkVersion) + // U2. Should update memory + // A1. Should update partition leader epoch in ZK + val expectedLeaderAndIsr = initialLeaderAndIsr.newEpochAndZkVersion + doReturn(UpdateLeaderAndIsrResult(Map(tp -> Right(expectedLeaderAndIsr)), Seq()), Nil: _*) + .when(mockZkClient).updateLeaderAndIsr(Map(tp -> expectedLeaderAndIsr), controllerContext.epoch, controllerContext.epochZkVersion) + + // act + val results = partitionReassignmentManager.triggerApiReassignment(Map(tp -> Some(Seq(3, 4, 5)))) + assertTrue(s"reassignment failed - $results", results(tp).isSuccess) + assertTrue("Listener was not called pre reassignment start", testReassignmentListener.preReassignmentStartOrResumeCalled) + assertTrue("Listener was not called post reassignment start", testReassignmentListener.postReassignmentStartedOrResumedCalled) + + // U2. Should have updated memory + assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) + // A1. Should send a LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR). + verify(mockBrokerRequestBatch).addLeaderAndIsrRequestForBrokers( + expectedFullReplicaSet, tp, + LeaderIsrAndControllerEpoch(expectedLeaderAndIsr, controllerContext.epoch), expectedNewAssignment, isNew = false + ) + verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerContext.epoch) + + assertFalse("Listener was wrongly called pre reassignment finish", testReassignmentListener.preReassignmentFinishCalled) + assertFalse("Listener was wrongly called post reassignment finish", testReassignmentListener.postReassignmentFinishedCalled) + } + + /** + * Phase B of a partition reassignment is the part where all the new replicas are in ISR + * and the controller finishes the reassignment + * B1. Move all replicas in AR to OnlineReplica state. + * B2. Set RS = TRS, AR = [], RR = [] in memory. + * B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr. + * If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. + * We may send the LeaderAndIsr to more than the TRS replicas due to the + * way the partition state machine works (it reads replicas from ZK) + * B4. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state change, we shrink the + * isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr. + * After that, we send a StopReplica (delete = false) to the replicas in RR. + * B5. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica (delete = true) to + * the replicas in RR to physically delete the replicas on disk. + * B6. Update ZK with RS=TRS, AR=[], RR=[]. + * B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. + * B8. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker. + * + */ + @Test + def testPhaseBOfPartitionReassignment(): Unit = { + /* + * Existing assignment is [1,2,3] + * We had issued a reassignment to [3, 4, 5] and now all replicas are in ISR + */ + val initialAssignment = ReplicaAssignment.fromOldAndNewReplicas(Seq(1, 2, 3), Seq(3, 4, 5)) + val expectedNewAssignment = ReplicaAssignment(Seq(3, 4, 5), Seq(), Seq()) + val initialLeaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3, 4, 5), controllerContext.epochZkVersion) + controllerContext.partitionAssignments.put(topic, mutable.Map(tp.partition() -> initialAssignment)) + controllerContext.updatePartitionFullReplicaAssignment(tp, initialAssignment) + controllerContext.partitionsBeingReassigned.add(tp) + controllerContext.partitionLeadershipInfo.put(tp, + LeaderIsrAndControllerEpoch(initialLeaderAndIsr, controllerContext.epoch)) + mockAreReplicasInIsr(tp, List(1, 2, 3, 4, 5), initialLeaderAndIsr) + // A2. replicas in AR -> NewReplica + mockReplicaStateMachine.handleStateChanges(Seq(4, 5).map(PartitionAndReplica(tp, _)), NewReplica) + + // B2. Set RS = TRS, AR = [], RR = [] in memory. + // B3. Send a LeaderAndIsr request with RS = TRS. + // If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS. + // We may send the LeaderAndIsr to more than the TRS replicas due to the way the partition state machine works (it reads replicas from ZK) + controllerContext.partitionStates.put(tp, NewPartition) + // B6. Update ZK with RS = TRS, AR = [], RR = []. + doReturn(mockSetDataResponseOK, Nil: _*).when(mockZkClient) + .setTopicAssignmentRaw(tp.topic(), mutable.Map(tp -> expectedNewAssignment), controllerContext.epochZkVersion) + // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it if present. + doReturn(true, Nil: _*).when(mockZkClient).reassignPartitionsInProgress() + doReturn(Map(tp -> Seq(1, 2, 3)), Nil: _*).when(mockZkClient).getPartitionReassignment() + doReturn(false, Nil: _*).when(mockZkClient).registerZNodeChangeHandlerAndCheckExistence(mockPartitionReassignmentHandler) + + // act + partitionReassignmentManager.maybeResumeReassignment(tp) + assertTrue("Listener was not called pre reassignment resumption", testReassignmentListener.preReassignmentStartOrResumeCalled) + assertTrue("Listener was not called post reassignment resumption", testReassignmentListener.postReassignmentStartedOrResumedCalled) + + // B2. Should have updated memory + assertEquals(expectedNewAssignment, controllerContext.partitionFullReplicaAssignment(tp)) + // B7. Should have cleared in-memory partitionsBeingReassigned and called + assertTrue("Listener was not called pre reassignment finish", testReassignmentListener.preReassignmentFinishCalled) + assertEquals(Set(), controllerContext.partitionsBeingReassigned) + // B8. Resend the update metadata request to every broker + verify(mockBrokerRequestBatch).addUpdateMetadataRequestForBrokers(controllerContext.liveBrokerIds.toSeq, Set(tp)) + verify(mockBrokerRequestBatch).sendRequestsToBrokers(controllerContext.epoch) + assertTrue("Listener was not called post reassignment finish", testReassignmentListener.postReassignmentFinishedCalled) + } + + def setLiveBrokers(brokerIds: Seq[Int]): Unit = { + val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"), + SecurityProtocol.PLAINTEXT) + val brokerEpochs = brokerIds.map { + id => (Broker(id, Seq(endpoint1), rack = None), 1L) + }.toMap + controllerContext.setLiveBrokerAndEpochs(brokerEpochs) + } + + /** + * To determine what phase of the reassignment we are in, we check whether the target replicas are in the ISR set + * If they aren't, we enter phase A. If they are - phase B + */ + def mockAreReplicasInIsr(tp: TopicPartition, isr: List[Int], leaderAndIsr: LeaderAndIsr): Unit = { + val tpStateMap: Map[TopicPartition, LeaderIsrAndControllerEpoch] = Map( + tp -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) + ) + doReturn(tpStateMap, Nil: _*).when(mockZkClient).getTopicPartitionStates(Seq(tp)) + } + + def mockSetDataResponseOK: SetDataResponse = + SetDataResponse(Code.OK, "", None, null, null) +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala index e3e1996cb646c..5b2a0b588dcdc 100644 --- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala @@ -16,12 +16,8 @@ */ package kafka.controller -import kafka.cluster.{Broker, EndPoint} import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Assert._ import org.junit.Test import org.mockito.Mockito._ @@ -34,7 +30,7 @@ class TopicDeletionManagerTest { @Test def testInitialization(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar", "baz"), numPartitions = 2, @@ -58,7 +54,7 @@ class TopicDeletionManagerTest { @Test def testBasicDeletion(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar"), numPartitions = 2, @@ -99,7 +95,7 @@ class TopicDeletionManagerTest { @Test def testDeletionWithBrokerOffline(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar"), numPartitions = 2, @@ -167,7 +163,7 @@ class TopicDeletionManagerTest { @Test def testBrokerFailureAfterDeletionStarted(): Unit = { - val controllerContext = initContext( + val controllerContext = TestUtils.initContext( brokers = Seq(1, 2, 3), topics = Set("foo", "bar"), numPartitions = 2, @@ -226,31 +222,4 @@ class TopicDeletionManagerTest { assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted)) } - - def initContext(brokers: Seq[Int], - topics: Set[String], - numPartitions: Int, - replicationFactor: Int): ControllerContext = { - val context = new ControllerContext - val brokerEpochs = brokers.map { brokerId => - val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("blah"), - SecurityProtocol.PLAINTEXT) - Broker(brokerId, Seq(endpoint), rack = None) -> 1L - }.toMap - context.setLiveBrokerAndEpochs(brokerEpochs) - - // Simple round-robin replica assignment - var leaderIndex = 0 - for (topic <- topics; partitionId <- 0 until numPartitions) { - val partition = new TopicPartition(topic, partitionId) - val replicas = (0 until replicationFactor).map { i => - val replica = brokers((i + leaderIndex) % brokers.size) - replica - } - context.updatePartitionReplicaAssignment(partition, replicas) - leaderIndex += 1 - } - context - } - } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0f34268b31b79..3cd6382f9e9bf 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -39,7 +39,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile import Implicits._ import com.yammer.metrics.Metrics import com.yammer.metrics.core.Meter -import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch} import kafka.zk._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.AlterConfigOp.OpType @@ -1684,4 +1684,33 @@ object TestUtils extends Logging { waitUntilTrue(() => adminClient.listPartitionReassignments().reassignments().get().isEmpty, s"There still are ongoing reassignments", pause = pause) } + + /** + * Initializes a new ControllerContext + */ + def initContext(brokers: Seq[Int], + topics: scala.collection.Set[String], + numPartitions: Int, + replicationFactor: Int): ControllerContext = { + val context = new ControllerContext + val brokerEpochs = brokers.map { brokerId => + val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("blah"), + SecurityProtocol.PLAINTEXT) + Broker(brokerId, Seq(endpoint), rack = None) -> 1L + }.toMap + context.setLiveBrokerAndEpochs(brokerEpochs) + + // Simple round-robin replica assignment + var leaderIndex = 0 + for (topic <- topics; partitionId <- 0 until numPartitions) { + val partition = new TopicPartition(topic, partitionId) + val replicas = (0 until replicationFactor).map { i => + val replica = brokers((i + leaderIndex) % brokers.size) + replica + } + context.updatePartitionReplicaAssignment(partition, replicas) + leaderIndex += 1 + } + context + } }