From 8724c6df49689d0a6089486c53d23700514ff8ae Mon Sep 17 00:00:00 2001 From: Simon Van den Bossche Date: Thu, 1 Aug 2019 17:00:25 +0200 Subject: [PATCH] [wip] discard celery related changes in the volumedriver update --- ovs/update/volumedriver/updater.py | 55 ++++++------------------------ 1 file changed, 10 insertions(+), 45 deletions(-) diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index ff724fcd9..fc44637d1 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -87,9 +87,6 @@ def restart_services(cls): # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) if initial_run_steps: - cls.logger.info('Offloading a MDS catchup to celery. This will ensure all slaves will be caught up to avoid deadlocking') - MDSServiceController.mds_catchup.apply_async() - # Plan to execute migrate. Avoid the VPool from being an HA target cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR) initial_run_steps = False try: @@ -190,25 +187,15 @@ def migrate_away(cls, balances_by_vpool, storagerouter): :return: None :raises: FailureDuringMigrateException if any volumes failed to move """ - tasks = [] - signatures = [] + evacuate_srs = [storagerouter.guid] for vpool, balances in balances_by_vpool.iteritems(): - # Serialize to offload to celery. DataObjects can't be serialized yet - serialized_balances = [b.to_dict() for b in balances] - signature = VPoolController.execute_balance_change.si(vpool.guid, serialized_balances, [storagerouter.guid]) - # Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it - tasks.append(signature.freeze()) - signatures.append(signature) - if signatures: - cls.logger.info('Adding migration group with tasks {}'.format(', '.join(t.id for t in tasks))) - # Add all chain signatures to a group for parallel execution - task_group = group(signatures) - # Wait for the group result - async_result = task_group.apply_async() - cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id)) - # Timeout similar to migrate_master_mds does not make a lot of sense. All tasks are executed in parallel - _ = async_result.get() - cls.logger.info("MDS migration finished") + for balance in balances: # type: VDiskBalance + if balance.storagedriver.storagerouter_guid in evacuate_srs: + successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, + user_input=False, + abort_on_error=False) + if failed_moves: + raise FailureDuringMigrateException('Could not move volumes {} away'.format(', '.join(failed_moves))) @classmethod def migrate_master_mds(cls, storagerouter, max_chain_size=100, group_timeout=10 * 60): @@ -226,26 +213,20 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100, group_timeout=10 """ cls.logger.info("Starting MDS migrations") while True: - hosted_vdisk_guids = storagerouter._vdisks_guids() vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter) all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0 if all_masters_gone: break - all_tasks = [] chains = [] for vpool_guid, vdisk_guids in vpool_mds_master_vdisks.iteritems(): signatures = [] tasks = [] for vdisk_guid in vdisk_guids[0:max_chain_size]: - if vdisk_guid in hosted_vdisk_guids: - cls.logger.warning('Skipping vDisk {} as it is still hosted on Storagerouter {}'.format(vdisk_guid, storagerouter.name)) cls.logger.info('Ensuring safety for {}'.format(vdisk_guid)) - # Ensure safety is a common task. Let's timeout on the ensure single quickly to avoid worker lockups - signature = MDSServiceController.ensure_safety.si(vdisk_guid, ensure_single_timeout=5) + signature = MDSServiceController.ensure_safety.si(vdisk_guid) # Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it tasks.append(signature.freeze()) signatures.append(signature) - all_tasks.extend(tasks) if signatures: cls.logger.info('Adding chain for VPool {} with tasks {}'.format(vpool_guid, ', '.join(t.id for t in tasks))) chains.append(chain(signatures)) @@ -254,24 +235,8 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100, group_timeout=10 # Wait for the group result async_result = task_group.apply_async() cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id)) - try: - _ = async_result.get(timeout=group_timeout) - except TimeoutError: - cls.logger.warning('Migration took longer than expected. Revoking all non-started tasks') - revoked_tasks = [] - for task in all_tasks: - if task.state == 'PENDING': - # Certain PENDING tasks cannot be revoked. It appears they're non-existent. Not even the workers know about them - # @todo build a new result chain and wait for that - task.revoke() - revoked_tasks.append(task) - if revoked_tasks: - cls.logger.warning('Revoked migration tasks: {}'.format(', '.join(revoked_tasks))) - cls.logger.warning('Waiting for the execution on the running migrations') - _ = async_result.get() + _ = async_result.get() cls.logger.info("MDS migration finished") - if len(hosted_vdisk_guids) > 0: - raise LocalMastersRemaining('vDisks are still hosted on Storagerouter to migrate from: {}'.format(', '.join(hosted_vdisk_guids), storagerouter.name)) @staticmethod def get_vdisks_mds_masters_on_storagerouter(storagerouter):