diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index b9910b6c5..be0c5ade5 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -30,6 +30,7 @@ from ovs.extensions.storage.persistentfactory import PersistentFactory from ovs.lib.mdsservice import MDSServiceController from ovs.log.log_handler import LogHandler + # noinspection PyUnreachableCode if False: from typing import List, Dict, Tuple @@ -49,6 +50,12 @@ class FailureDuringMigrateException(UpdateException): exit_code = 22 +class LocalMastersRemaining(RuntimeError): + """ + Thrown when local masters are still present on the machine + """ + + class VolumeDriverUpdater(ComponentUpdater): """ @@ -68,20 +75,35 @@ def restart_services(cls): """ Override the service restart. The volumedrivers should be prepared for shutdown """ - cls.logger.info("Restarting all related services") - # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise - balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) - # Plan to execute migrate. Avoid the VPool from being an HA target - cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR) + cls.logger.info("Preparing to restart the related services") + initial_run_steps = True try: - # @todo currency? - for vpool, balances in balances_by_vpool.iteritems(): - cls.migrate_away(balances, cls.LOCAL_SR) - cls.migrate_master_mds(cls.LOCAL_SR) - all_prefixes = tuple(itertools.chain.from_iterable(b[3] for b in cls.BINARIES)) - return cls.restart_services_by_prefixes(all_prefixes) + run_number = 0 + while True: + cls.logger.info('Attempt {0} to prepare the restart'.format(run_number)) + # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise + balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) + if initial_run_steps: + cls.logger.info('Offloading a MDS catchup to celery. This will ensure all slaves will be caught up to avoid deadlocking') + MDSServiceController.mds_catchup.apply_async() + # Plan to execute migrate. Avoid the VPool from being an HA target + cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR) + initial_run_steps = False + try: + # @todo currency? + for vpool, balances in balances_by_vpool.iteritems(): + cls.migrate_away(balances, cls.LOCAL_SR) + cls.migrate_master_mds(cls.LOCAL_SR) + all_prefixes = tuple(itertools.chain.from_iterable(b[3] for b in cls.BINARIES)) + cls.logger.info("Restarting all related services") + return cls.restart_services_by_prefixes(all_prefixes) + except LocalMastersRemaining: + # Swallow and retry + cls.logger.warning('Local masters still found on the machine. Will try to migrate them away') + run_number += 1 finally: - cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR) + if not initial_run_steps: + cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR) @staticmethod def get_vpool_balances_for_evacuating_storagerouter(storagerouter): @@ -170,9 +192,9 @@ def migrate_away(balances, storagerouter): evacuate_srs = [storagerouter.guid] for balance in balances: # type: VDiskBalance if balance.storagedriver.storagerouter_guid in evacuate_srs: - successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, - user_input=False, - abort_on_error=False) + successful_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, + user_input=False, + abort_on_error=False) if failed_moves: raise FailureDuringMigrateException('Could not move volumes {} away'.format(', '.join(failed_moves))) @@ -189,6 +211,7 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100): """ cls.logger.info("Starting MDS migrations") while True: + hosted_vdisk_guids = storagerouter._vdisks_guids() vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter) all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0 if all_masters_gone: @@ -198,6 +221,8 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100): signatures = [] tasks = [] for vdisk_guid in vdisk_guids[0:max_chain_size]: + if vdisk_guid in hosted_vdisk_guids: + cls.logger.warning('Skipping vDisk {} as it is still hosted on Storagerouter {}'.format(vdisk_guid, storagerouter.name)) cls.logger.info('Ensuring safety for {}'.format(vdisk_guid)) signature = MDSServiceController.ensure_safety.si(vdisk_guid) # Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it @@ -213,6 +238,8 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100): cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id)) _ = async_result.get() cls.logger.info("MDS migration finished") + if len(hosted_vdisk_guids) > 0: + raise LocalMastersRemaining('vDisks are still hosted on Storagerouter to migrate from: {}'.format(', '.join(hosted_vdisk_guids), storagerouter.name)) @staticmethod def get_vdisks_mds_masters_on_storagerouter(storagerouter):