Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 10 additions & 45 deletions ovs/update/volumedriver/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ def restart_services(cls):
# Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise
balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR)
if initial_run_steps:
cls.logger.info('Offloading a MDS catchup to celery. This will ensure all slaves will be caught up to avoid deadlocking')
MDSServiceController.mds_catchup.apply_async()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has to be kept. We dont care about the result, it just runs in the background

# Plan to execute migrate. Avoid the VPool from being an HA target
cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR)
initial_run_steps = False
try:
Expand Down Expand Up @@ -190,25 +187,15 @@ def migrate_away(cls, balances_by_vpool, storagerouter):
:return: None
:raises: FailureDuringMigrateException if any volumes failed to move
"""
tasks = []
signatures = []
evacuate_srs = [storagerouter.guid]
for vpool, balances in balances_by_vpool.iteritems():
# Serialize to offload to celery. DataObjects can't be serialized yet
serialized_balances = [b.to_dict() for b in balances]
signature = VPoolController.execute_balance_change.si(vpool.guid, serialized_balances, [storagerouter.guid])
# Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it
tasks.append(signature.freeze())
signatures.append(signature)
if signatures:
cls.logger.info('Adding migration group with tasks {}'.format(', '.join(t.id for t in tasks)))
# Add all chain signatures to a group for parallel execution
task_group = group(signatures)
# Wait for the group result
async_result = task_group.apply_async()
cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id))
# Timeout similar to migrate_master_mds does not make a lot of sense. All tasks are executed in parallel
_ = async_result.get()
cls.logger.info("MDS migration finished")
for balance in balances: # type: VDiskBalance
if balance.storagedriver.storagerouter_guid in evacuate_srs:
successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances,
user_input=False,
abort_on_error=False)
if failed_moves:
raise FailureDuringMigrateException('Could not move volumes {} away'.format(', '.join(failed_moves)))

@classmethod
def migrate_master_mds(cls, storagerouter, max_chain_size=100, group_timeout=10 * 60):
Expand All @@ -226,26 +213,20 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100, group_timeout=10
"""
cls.logger.info("Starting MDS migrations")
while True:
hosted_vdisk_guids = storagerouter._vdisks_guids()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont use signatures here as it will lock up. Discarded tasks are still being waited for

The reason for the catchup (see comment above) is to ensure all slaves are caught up with their master

vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter)
all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0
if all_masters_gone:
break
all_tasks = []
chains = []
for vpool_guid, vdisk_guids in vpool_mds_master_vdisks.iteritems():
signatures = []
tasks = []
for vdisk_guid in vdisk_guids[0:max_chain_size]:
if vdisk_guid in hosted_vdisk_guids:
cls.logger.warning('Skipping vDisk {} as it is still hosted on Storagerouter {}'.format(vdisk_guid, storagerouter.name))
cls.logger.info('Ensuring safety for {}'.format(vdisk_guid))
# Ensure safety is a common task. Let's timeout on the ensure single quickly to avoid worker lockups
signature = MDSServiceController.ensure_safety.si(vdisk_guid, ensure_single_timeout=5)
signature = MDSServiceController.ensure_safety.si(vdisk_guid)
# Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it
tasks.append(signature.freeze())
signatures.append(signature)
all_tasks.extend(tasks)
if signatures:
cls.logger.info('Adding chain for VPool {} with tasks {}'.format(vpool_guid, ', '.join(t.id for t in tasks)))
chains.append(chain(signatures))
Expand All @@ -254,24 +235,8 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100, group_timeout=10
# Wait for the group result
async_result = task_group.apply_async()
cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id))
try:
_ = async_result.get(timeout=group_timeout)
except TimeoutError:
cls.logger.warning('Migration took longer than expected. Revoking all non-started tasks')
revoked_tasks = []
for task in all_tasks:
if task.state == 'PENDING':
# Certain PENDING tasks cannot be revoked. It appears they're non-existent. Not even the workers know about them
# @todo build a new result chain and wait for that
task.revoke()
revoked_tasks.append(task)
if revoked_tasks:
cls.logger.warning('Revoked migration tasks: {}'.format(', '.join(revoked_tasks)))
cls.logger.warning('Waiting for the execution on the running migrations')
_ = async_result.get()
_ = async_result.get()
cls.logger.info("MDS migration finished")
if len(hosted_vdisk_guids) > 0:
raise LocalMastersRemaining('vDisks are still hosted on Storagerouter to migrate from: {}'.format(', '.join(hosted_vdisk_guids), storagerouter.name))

@staticmethod
def get_vdisks_mds_masters_on_storagerouter(storagerouter):
Expand Down