Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions ovs/update/volumedriver/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ovs.extensions.storage.persistentfactory import PersistentFactory
from ovs.lib.mdsservice import MDSServiceController
from ovs.log.log_handler import LogHandler

# noinspection PyUnreachableCode
if False:
from typing import List, Dict, Tuple
Expand All @@ -49,6 +50,12 @@ class FailureDuringMigrateException(UpdateException):
exit_code = 22


class LocalMastersRemaining(RuntimeError):
"""
Thrown when local masters are still present on the machine
"""


class VolumeDriverUpdater(ComponentUpdater):

"""
Expand All @@ -68,20 +75,35 @@ def restart_services(cls):
"""
Override the service restart. The volumedrivers should be prepared for shutdown
"""
cls.logger.info("Restarting all related services")
# Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise
balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR)
# Plan to execute migrate. Avoid the VPool from being an HA target
cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR)
cls.logger.info("Preparing to restart the related services")
initial_run_steps = True
try:
# @todo currency?
for vpool, balances in balances_by_vpool.iteritems():
cls.migrate_away(balances, cls.LOCAL_SR)
cls.migrate_master_mds(cls.LOCAL_SR)
all_prefixes = tuple(itertools.chain.from_iterable(b[3] for b in cls.BINARIES))
return cls.restart_services_by_prefixes(all_prefixes)
run_number = 0
while True:
cls.logger.info('Attempt {0} to prepare the restart'.format(run_number))
# Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise
balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR)
if initial_run_steps:
cls.logger.info('Offloading a MDS catchup to celery. This will ensure all slaves will be caught up to avoid deadlocking')
MDSServiceController.mds_catchup.apply_async()
# Plan to execute migrate. Avoid the VPool from being an HA target
cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR)
initial_run_steps = False
try:
# @todo currency?
for vpool, balances in balances_by_vpool.iteritems():
cls.migrate_away(balances, cls.LOCAL_SR)
cls.migrate_master_mds(cls.LOCAL_SR)
all_prefixes = tuple(itertools.chain.from_iterable(b[3] for b in cls.BINARIES))
cls.logger.info("Restarting all related services")
return cls.restart_services_by_prefixes(all_prefixes)
except LocalMastersRemaining:
# Swallow and retry
cls.logger.warning('Local masters still found on the machine. Will try to migrate them away')
run_number += 1
finally:
cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR)
if not initial_run_steps:
cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR)

@staticmethod
def get_vpool_balances_for_evacuating_storagerouter(storagerouter):
Expand Down Expand Up @@ -170,9 +192,9 @@ def migrate_away(balances, storagerouter):
evacuate_srs = [storagerouter.guid]
for balance in balances: # type: VDiskBalance
if balance.storagedriver.storagerouter_guid in evacuate_srs:
successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances,
user_input=False,
abort_on_error=False)
successful_moves, failed_moves = balance.execute_balance_change_through_overflow(balances,
user_input=False,
abort_on_error=False)
if failed_moves:
raise FailureDuringMigrateException('Could not move volumes {} away'.format(', '.join(failed_moves)))

Expand All @@ -189,6 +211,7 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100):
"""
cls.logger.info("Starting MDS migrations")
while True:
hosted_vdisk_guids = storagerouter._vdisks_guids()
vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter)
all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0
if all_masters_gone:
Expand All @@ -198,6 +221,8 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100):
signatures = []
tasks = []
for vdisk_guid in vdisk_guids[0:max_chain_size]:
if vdisk_guid in hosted_vdisk_guids:
cls.logger.warning('Skipping vDisk {} as it is still hosted on Storagerouter {}'.format(vdisk_guid, storagerouter.name))
cls.logger.info('Ensuring safety for {}'.format(vdisk_guid))
signature = MDSServiceController.ensure_safety.si(vdisk_guid)
# Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it
Expand All @@ -213,6 +238,8 @@ def migrate_master_mds(cls, storagerouter, max_chain_size=100):
cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id))
_ = async_result.get()
cls.logger.info("MDS migration finished")
if len(hosted_vdisk_guids) > 0:
raise LocalMastersRemaining('vDisks are still hosted on Storagerouter to migrate from: {}'.format(', '.join(hosted_vdisk_guids), storagerouter.name))

@staticmethod
def get_vdisks_mds_masters_on_storagerouter(storagerouter):
Expand Down