From 4ebf6b2337c31066b6c99355d7ba5fd38a7add74 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Thu, 20 Jun 2019 17:54:23 +0200 Subject: [PATCH 1/7] !WIP Standalone single node volumedriver update --- ovs/lib/helpers/storagerouter/__init__.py | 16 + ovs/lib/helpers/storagerouter/evacuate.py | 30 ++ ovs/lib/helpers/vdisk/__init__.py | 15 + ovs/lib/helpers/vdisk/rebalancer.py | 574 ++++++++++++++++++++++ ovs/update/__init__.py | 15 + ovs/update/volumedriver.py | 306 ++++++++++++ ovs/update/volumedriver/__init__.py | 15 + ovs/update/volumedriver/updater.py | 96 ++++ 8 files changed, 1067 insertions(+) create mode 100644 ovs/lib/helpers/storagerouter/__init__.py create mode 100644 ovs/lib/helpers/storagerouter/evacuate.py create mode 100644 ovs/lib/helpers/vdisk/__init__.py create mode 100644 ovs/lib/helpers/vdisk/rebalancer.py create mode 100644 ovs/update/__init__.py create mode 100644 ovs/update/volumedriver.py create mode 100644 ovs/update/volumedriver/__init__.py create mode 100644 ovs/update/volumedriver/updater.py diff --git a/ovs/lib/helpers/storagerouter/__init__.py b/ovs/lib/helpers/storagerouter/__init__.py new file mode 100644 index 000000000..301eed1cb --- /dev/null +++ b/ovs/lib/helpers/storagerouter/__init__.py @@ -0,0 +1,16 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + diff --git a/ovs/lib/helpers/storagerouter/evacuate.py b/ovs/lib/helpers/storagerouter/evacuate.py new file mode 100644 index 000000000..e442c85a1 --- /dev/null +++ b/ovs/lib/helpers/storagerouter/evacuate.py @@ -0,0 +1,30 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +""" +Evacuate Storagerouter module +""" + + +class StorageRouterEvacuator(object): + + """ + Evacuates all volumes on the given storagerouter + - Check for volume potential to see if evacuation is possible + - Moves volumes away as evenly as possible + - Can wait for MDS masters to have moved + """ + diff --git a/ovs/lib/helpers/vdisk/__init__.py b/ovs/lib/helpers/vdisk/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/lib/helpers/vdisk/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/lib/helpers/vdisk/rebalancer.py b/ovs/lib/helpers/vdisk/rebalancer.py new file mode 100644 index 000000000..fa1da875c --- /dev/null +++ b/ovs/lib/helpers/vdisk/rebalancer.py @@ -0,0 +1,574 @@ +# Copyright (C) 2018 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +""" +Rebalances volumes across nodes +Taken from the support-tools +""" + +from __future__ import division + +import pprint +import itertools +from math import ceil +from ovs.dal.hybrids.storagerouter import StorageRouter +from ovs.dal.hybrids.vpool import VPool +from ovs.dal.hybrids.storagedriver import StorageDriver +from ovs.dal.hybrids.vdisk import VDisk +from ovs_extensions.log.logger import Logger + +# noinspection PyUnreachableCode +if False: + from typing import List, Dict, Tuple, Optional + + +class VDiskRebalancer(object): + + _volume_potentials = {} + logger = Logger('vdisk_rebalance') + + @classmethod + def print_balances(cls, balances): + # type: (List[VDiskBalance]) -> None + """ + Prints out balances + :return: None + :rtype: NoneType + """ + balances_by_vpool = {} + for balance in balances: # type: VDiskBalance + vpool = balance.storagedriver.vpool + if vpool not in balances_by_vpool: + balances_by_vpool[vpool] = [] + balances_by_vpool[vpool].append(balance) + for vpool, vpool_balances in balances_by_vpool.viewitems(): + print('Balance for VPool {0}'.format(vpool.name)) + for balance in vpool_balances: # type: VDiskBalance + storagerouter = balance.storagedriver.storagerouter + print(' Storagerouter {0}, vdisks now: {1}, vdisks afterwards {2}, added {3}'.format(storagerouter.name, len(balance.hosted_guids), len(balance.balance), len(balance.added))) + if balance.added: + added_source_overview = {} + for vdisk_guid in balance.added: + current_storagerouter = StorageRouter(VDisk(vdisk_guid).storagerouter_guid) + if current_storagerouter not in added_source_overview: + added_source_overview[current_storagerouter] = [] + added_source_overview[current_storagerouter].append(vdisk_guid) + print(' Vdisks added from:') + for current_storagerouter, moved_vdisk_guids in added_source_overview.iteritems(): + print(' StorageRouter {0}: {1}'.format(current_storagerouter.name, len(moved_vdisk_guids))) + + @classmethod + def get_rebalanced_layout(cls, vpool_guid, excluded_storagerouters=None, ignore_domains=False, evacuate_storagerouters=None, base_on_volume_potential=True): + # type: (str, Optional[List[str]], Optional[bool], Optional[List[str]], Optional[bool]) -> List[VDiskBalance] + """ + Retrieve the layout of how to optimal spread would look like + :param evacuate_storagerouters: Migrate all vdisks from this hosts + :type evacuate_storagerouters: List[str] + :param vpool_guid: Guid of the VPool to rebalance + :type vpool_guid: str + :param excluded_storagerouters: Guids of StorageRouters to avoid + :type excluded_storagerouters: List[str] + :param ignore_domains: Ignore the domains (rebalance across everything) + :type ignore_domains: bool + :param base_on_volume_potential: Base the movement of the volume potential instead of a linear distribution + :type base_on_volume_potential: bool + :return: List of balances + :rtype: List[VDiskBalance] + """ + if evacuate_storagerouters is None: + evacuate_storagerouters = [] + if excluded_storagerouters is None: + excluded_storagerouters = [] + + vpool = VPool(vpool_guid) + if ignore_domains: + return cls._get_rebalances_layout(vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential) + return cls._get_rebalanced_layout_by_domain(vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential) + + @classmethod + def get_volume_potentials(cls, storagedrivers, cache=True): + potentials = {} + for storagedriver in storagedrivers: + if cache: + potential = cls._volume_potentials.get(storagedriver, -1) + if potential == -1: + potential = storagedriver.vpool.storagedriver_client.volume_potential(str(storagedriver.storagedriver_id)) + cls._volume_potentials[storagedriver] = potential + else: + potential = storagedriver.vpool.storagedriver_client.volume_potential(str(storagedriver.storagedriver_id)) + potentials[storagedriver] = potential + return potentials + + @classmethod + def _get_rebalances_layout(cls, vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential): + # type: (VPool, List[str], List[str], bool) -> List[VDiskBalance] + """ + Rebalance volumes and stay without domains + :param vpool: VPool to rebalance + :type vpool: VPool + :param excluded_storagerouters: Guids of StorageRouters to avoid + :type excluded_storagerouters: List[str] + :param evacuate_storagerouters: Migrate all vdisks from this hosts + :type evacuate_storagerouters: List[str] + :param base_on_volume_potential: Base the limit calculation of the volume potential ratio + :type base_on_volume_potential: bool + :return: List of balances + :rtype: List[VDiskBalance] + """ + storagerouters_to_avoid = set(itertools.chain(excluded_storagerouters, evacuate_storagerouters)) + destination_storagedrivers = [std for std in vpool.storagedrivers if std.storagerouter_guid not in storagerouters_to_avoid] + destination_storagedrivers_by_ip = dict((storagedriver.storagerouter.ip, storagedriver) for storagedriver in destination_storagedrivers) + + volume_potentials = {} + if base_on_volume_potential: + volume_potentials = cls.get_volume_potentials(destination_storagedrivers) + total_potential = sum(p for p in volume_potentials.itervalues()) + vdisks_within_destination_storagedrivers = list( + itertools.chain(*(sd.vdisks_guids for sd in destination_storagedrivers))) + volume_total_capacity = total_potential + len(vdisks_within_destination_storagedrivers) + + # Default limit. Simple distribution + storagedriver_vdisk_limit = int(ceil(len(vpool.vdisks_guids) / len(destination_storagedrivers))) + balances = {} + overflow = [] + for storagedriver in vpool.storagedrivers: + if base_on_volume_potential: + # Use the ratio between volume potential max and current to distribute + volume_potential = volume_potentials[storagedriver] + storagedriver_vdisk_limit = int(ceil(len(vpool.vdisks_guids) * (volume_potential + len(storagedriver.vdisks_guids)) / volume_total_capacity)) + + limit = 0 if storagedriver.storagerouter_guid in evacuate_storagerouters else storagedriver_vdisk_limit + balance = VDiskBalance(storagedriver, limit) + overflow.extend(balance.overflow) + balances[storagedriver] = balance + # Attempt to move to current mds hosts + for vdisk_guid in overflow: + vdisk = VDisk(vdisk_guid) + # If only set was ordered :D + preferred_destinations = [destination_storagedrivers_by_ip[mds_entry['ip']] for mds_entry in vdisk.info['metadata_backend_config'] if mds_entry['ip'] in destination_storagedrivers_by_ip] + # Try to fill in these storagedriver first + destinations = preferred_destinations + [storagedriver for storagedriver in destination_storagedrivers if storagedriver not in preferred_destinations] + added = False + for storagedriver in destinations: + balance = balances[storagedriver] + added = cls.add_to_balance(vdisk_guid, balance) + if added: + try: + index = preferred_destinations.index(storagedriver) + mds_type = 'master' if index == 0 else 'slave' + cls.logger.info('Appointing {0} to {1} (index {2})'.format(vdisk_guid, mds_type, index)) + except ValueError: + # Index query didn't find the storagedriver + cls.logger.info('Appointing to non-mds host') + break + if not added: + raise NotImplementedError('Vdisk couldnt be added to any destination. Might be faulty implementation here') + return balances.values() + + @classmethod + def _get_rebalanced_layout_by_domain(cls, vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential): + # type: (VPool, List[str], List[str], bool) -> List[VDiskBalance] + """ + Rebalance volumes and stay within the primary domain + :param vpool: VPool to rebalance + :type vpool: VPool + :param excluded_storagerouters: Guids of StorageRouters to avoid + :type excluded_storagerouters: List[str] + :param evacuate_storagerouters: Migrate all vdisks from this hosts + :type evacuate_storagerouters: List[str] + :param base_on_volume_potential: Base the limit calculation of the volume potential ratio + :type base_on_volume_potential: bool + :return: List of balances + :rtype: List[VDiskBalance] + """ + # Calculate balance cap for every storagedriver + # Every storagedriver can share disks between other storagedriver within the same primary domain + # Certain storagedrivers add their disks to the pool but can't take disks themselves + balances = {} + storagedriver_limits = {} + storagedriver_domain_relation = {} + for storagedriver in vpool.storagedrivers: + cls.logger.info('Calculating the limit for {} in VPool {}'.format(storagedriver.storagerouter.name, vpool.name)) + # Create the disk pool for the current storagedriver in the domain + storagedrivers_in_domain = cls.get_storagedrivers_in_same_primary_domain_as_storagedriver(storagedriver, excluded_storagerouters) + cls.logger.info('{} shares primary domains with {}'.format(storagedriver.storagerouter.name, ', '.join(d.storagerouter.name for d in storagedrivers_in_domain))) + storagedriver_domain_relation[storagedriver] = storagedrivers_in_domain + vdisks_within_domain = [] + for storagedriver_in_domain in storagedrivers_in_domain: + vdisks_within_domain.extend(storagedriver_in_domain.vdisks_guids) + cls.logger.info('VDisks within the primary domain of {}: {}'.format(storagedriver.storagerouter.name, len(vdisks_within_domain))) + # Think about the disk distribution + if storagedriver.storagerouter_guid in evacuate_storagerouters: + limit = 0 + else: + # Remove the evacuations from the limit + usable_storagedrivers_in_domain = [std for std in storagedrivers_in_domain if std.storagerouter_guid not in evacuate_storagerouters] + cls.logger.info('Can move volumes to {} within the primary domain storagedrivers'.format(', '.join(d.storagerouter.name for d in usable_storagedrivers_in_domain))) + if base_on_volume_potential: + volume_potentials = cls.get_volume_potentials(usable_storagedrivers_in_domain) + total_potential = sum(p for p in volume_potentials.itervalues()) + volume_potentials_sr = dict((storagedriver.storagerouter.name, potential) for storagedriver, potential in volume_potentials.iteritems()) + cls.logger.info('Volume potential overview: {}. Total potential: {}'.format(pprint.pformat(volume_potentials_sr), total_potential)) + # len should be adjusted with evacuates + vdisks_within_domain_usable = list(itertools.chain(*(sd.vdisks_guids for sd in usable_storagedrivers_in_domain))) + volume_total_capacity = total_potential + len(vdisks_within_domain_usable) + if len(vdisks_within_domain) > volume_total_capacity: + cls.logger.error('The total capacity with the usuable storagedrivers in the domain is not large enough. vdisks_within_domain {0} > volume_total_capacity {1}' + .format(len(vdisks_within_domain), volume_total_capacity)) + raise RuntimeError('Migration with given params is not possible. Too many vdisks for the usuable storagedrivers within the domain .') + cls.logger.info('Total capacity within this domain subset is {}'.format(volume_total_capacity)) + # Use the ratio between volume potential max and current to distribute + volume_potential = volume_potentials[storagedriver] + volume_ratio = (volume_potential + len(storagedriver.vdisks_guids)) / volume_total_capacity + cls.logger.info('{} can take {}% of the volumes'.format(storagedriver.storagerouter.name, volume_ratio * 100)) + limit = int(ceil(len(vdisks_within_domain) * volume_ratio)) + else: + limit = int(ceil(len(vdisks_within_domain) / len(usable_storagedrivers_in_domain))) + cls.logger.info('Limit imposed for {}: {}'.format(storagedriver.storagerouter.name, limit)) + storagedriver_limits[storagedriver] = limit + + for storagedriver in vpool.storagedrivers: + balance = VDiskBalance(storagedriver, storagedriver_limits[storagedriver]) + balances[storagedriver] = balance + cls.logger.info('Balance overview {}'.format(balance)) + + for storagedriver in vpool.storagedrivers: + storagedrivers_in_domain = [std for std in storagedriver_domain_relation[storagedriver] if std != storagedriver] + storagedrivers_in_domain_by_ip = dict((storagedriver.storagerouter.ip, storagedriver) for storagedriver in storagedrivers_in_domain) + balance = balances[storagedriver] + cls.logger.info('Migrating {} vdisks from {} of VPool {}. Limit: {}, hosting {}'.format(len(balance.overflow), storagedriver.storagerouter.name, vpool.name, + balance.limit, len(balance.hosted_guids))) + for vdisk_guid in balance.overflow: + vdisk = VDisk(vdisk_guid) + preferred_destinations = [storagedrivers_in_domain_by_ip[mds_entry['ip']] for mds_entry in vdisk.info['metadata_backend_config'] + if mds_entry['ip'] in storagedrivers_in_domain_by_ip] + # Try to fill in these storagedriver first + destinations = preferred_destinations + [storagedriver for storagedriver in storagedrivers_in_domain if storagedriver not in preferred_destinations] + cls.logger.info('Destination overview for migrations: {}'.format(', '.join(d.storagerouter.name for d in destinations))) + added = False + while not added and destinations: + destination = destinations.pop() + balance = balances[destination] + added = cls.add_to_balance(vdisk_guid, balance) + if added: + cls.logger.info('Added vdisk {} to {}'.format(vdisk_guid, destination.storagerouter.name)) + if destination.storagedriver_id == vdisk.storagedriver_id: + raise RuntimeError('Moving to current host ERROR') + try: + index = preferred_destinations.index(destination) + mds_type = 'master' if index == 0 else 'slave' + cls.logger.info('Appointing {0} to {1} (index {2})'.format(vdisk_guid, mds_type, index)) + except ValueError: + # Index query didn't find the storagedriver + cls.logger.info('Appointing to non-mds host') + else: + cls.logger.info('Did not add vdisks to {}. Its limit: {}, currently hosting {}'.format(destination.storagerouter.name, balance.limit, len(balance.balance))) + if not added: + raise NotImplementedError('Vdisk couldnt be added to any destination. Might be faulty implementation here') + return balances.values() + + @classmethod + def get_storagedrivers_in_same_primary_domain_as_storagedriver(cls, storagedriver, excluded_storagerouters=None): + # type: (StorageDriver, Optional[List[str]]) -> List[StorageDriver] + """ + Retrieve all storagedrivers within the same primary domain as the given storagedriver + :param storagedriver: StorageDriver to check other domain relations for + :param excluded_storagerouters: Storagerouters that are excluded for the search + :type excluded_storagerouters: Optional[List[str]] + :return: List of storagedrivers + :rtype: List[StorageDriver] + """ + if excluded_storagerouters is None: + excluded_storagerouters = [] + primary_domains = cls.get_primary_domain_guids_storagedriver(storagedriver) + if not primary_domains: + return list(storagedriver.vpool.storagedrivers) + return [std for std in storagedriver.vpool.storagedrivers + if std.storagerouter_guid not in excluded_storagerouters + and any(domain_guid in primary_domains for domain_guid in cls.get_primary_domain_guids_storagedriver(std))] + + @staticmethod + def get_primary_domain_guids_storagedriver(storagedriver): + # type: (StorageDriver) -> List[str] + """ + Retrieve all primary domains of the StorageDriver + :param storagedriver: Storagedriver to get domains from + :type storagedriver: StorageDriver + :return: List of primary domain guids + :rtype: List[str] + """ + primary_domains = [] + storagerouter = storagedriver.storagerouter + for junction in storagerouter.domains: + if not junction.backup: + primary_domains.append(junction.domain_guid) + return primary_domains + + @classmethod + def add_to_balance(cls, vdisk_guid, balance): + # type: (str, VDiskBalance) -> bool + """ + Try to add a vdisk to a balance + :param vdisk_guid: Guid to add + :param balance: Balance to add guid to + :return: True if vdisk was added, else False + :rtype: bool + """ + added, overflowed = balance.fill([vdisk_guid]) + return vdisk_guid in added + + +class VDiskBalance(object): + + logger = Logger('vdisk_balance') + + def __init__(self, storagedriver, vdisk_limit): + # type: (StorageDriver, int) -> None + """ + Represents the vdisk balance of a storagedriver + :param storagedriver: StorageDriver to balance for + :type storagedriver: StorageDriver + :param vdisk_limit: Maximum amount of vdisks to host. -1 means no limit + :type vdisk_limit: int + """ + self.storagedriver = storagedriver + self.hosted_guids = storagedriver.vdisks_guids + self.limit = vdisk_limit + + self.balance, self.overflow = self.impose_limit() + self.added = [] + + def __add__(self, other): + if not isinstance(other, VDiskBalance) or self.storagedriver != other.storagedriver: + raise ValueError('Different objects cannot be added') + limit = self.limit + other.limit + self.set_limit(limit) + self.added += other.added + + def set_limit(self, limit): + """ + Set a new limit + :param limit: Limit to set + :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host + :rtype: Tuple(List[str], List[str]) + """ + self.limit = limit + self.balance, self.overflow = self.impose_limit() + return self.balance, self.overflow + + def impose_limit(self): + # type: () -> Tuple[List[str], List[str]] + """ + Impose the set limit. Returns the max amount of vdisks that can be hosted and the vdisks that need to go + :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host + :rtype: Tuple(List[str], List[str]) + """ + if self.limit == -1: + return self.hosted_guids, [] + overflow = self.hosted_guids[self.limit:] + balance = self.hosted_guids[:self.limit] + return balance, overflow + + def fill(self, vdisk_guids): + # type: (List[str]) -> Tuple[List[str], List[str]] + """ + Fill this balance until the limit is reached + :param vdisk_guids: Guids to add + :type vdisk_guids: List[str] + :return: The guids that could be added to this balanced and the guids that couldn't be added + :rtype: Tuple[List[str], List[str]] + """ + amount_to_add = self.limit - len(self.balance) + added = [] + overflow = vdisk_guids + if amount_to_add: + added = vdisk_guids[:amount_to_add] + overflow = vdisk_guids[amount_to_add:] + self.balance.extend(added) + self.added.extend(added) + return added, overflow + + def generate_overview(self): + # type: () -> dict + """ + Generate the move overview depending on the current state + :return: The overview from where the disks are coming from + :rtype: dict + """ + added_source_overview = {} + for vdisk_guid in self.added: + storagedriver_id = VDisk(vdisk_guid).storagedriver_id + if storagedriver_id not in added_source_overview: + added_source_overview[storagedriver_id] = [] + added_source_overview[storagedriver_id].append(vdisk_guid) + overview = {'added': self.added, + 'balance': self.balance, + 'overflow': self.overflow, + 'add_source_overview': added_source_overview} + return overview + + def execute_balance_change(self, force=False, user_input=False, abort_on_error=False): + # type: (Optional[bool], Optional[bool], Optional[bool]) -> Tuple[List[str], List[str]] + """ + Execute the necessary steps to balance out + :param force: Indicates whether to force the migration or not (forcing can lead to data loss) + :type force: bool + :param user_input: require user input to proceed to next vDisk + :type user_input: bool + :param abort_on_error: Abort script when error occurs during migration + :type abort_on_error: bool + :return: List with all successful moves, list with all failed moves + :rtype: NoneType + """ + failed_moves = [] + successful_moves = [] + vdisk_guid = None + try: + for vdisk_guid in self.added: + try: + self._execute_move(vdisk_guid, self.storagedriver, force, user_input) + successful_moves.append(vdisk_guid) + except: + self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, self.storagedriver.storagerouter_guid)) + if abort_on_error: + raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) + failed_moves.append(vdisk_guid) + except KeyboardInterrupt: + interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) + self.logger.warning(interrupt_msg) + if user_input: + if successful_moves: + print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) + if failed_moves: + print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) + raise + + return successful_moves, failed_moves + + def execute_balance_change_through_overflow(self, balances, force=False, user_input=False, abort_on_error=False): + # type: (List[VDiskBalance], bool, bool, bool) -> Tuple[List[str], List[str]] + """ + Execute the necessary steps to balance out. Starts from the overflow to move all vdisks from the container away first + Other balances must be passed on to see where they'd have to move to + :param balances: Other balances to work with. Used to find the owner of this balance its overflow + :type balances: List[VDiskBalance] + :param force: Indicates whether to force the migration or not (forcing can lead to data loss) + :type force: bool + :param user_input: require user input to proceed to next vDisk + :type user_input: bool + :param abort_on_error: Abort script when error occurs during migration + :type abort_on_error: bool + :return: List with all successful moves, list with all failed moves + :rtype: NoneType + """ + failed_moves = [] + successful_moves = [] + vdisk_guid = None + try: + vdisk_balance_map = self.map_vdisk_to_destination(balances) + for vdisk_guid in self.overflow: + add_balance = vdisk_balance_map[vdisk_guid] + destination_std = add_balance.storagedriver + try: + self._execute_move(vdisk_guid, destination_std, force, user_input) + successful_moves.append(vdisk_guid) + except: + self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter_guid)) + if abort_on_error: + raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) + failed_moves.append(vdisk_guid) + except KeyboardInterrupt: + interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) + self.logger.warning(interrupt_msg) + if user_input: + if successful_moves: + print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) + if failed_moves: + print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) + raise + + return successful_moves, failed_moves + + def _execute_move(self, vdisk_guid, destination_std, force, interactive, minimum_potential=1): + """ + Perform a move + :param vdisk_guid: VDisk to move + :param destination_std: Destination to move to + :param force: Use force when moving + :param interactive: Prompt for user input before moving + :return: None + """ + vd = VDisk(vdisk_guid) + current_sr = StorageRouter(vd.storagerouter_guid).name + next_sr = destination_std.storagerouter.name + if vd.storagerouter_guid == destination_std.storagerouter_guid: + # Ownership changed in meantime + self.logger.info('No longer need to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter.name)) + return + rebalance_message = 'Rebalancing vPool by moving vDisk {0} from {1} to {2}'.format(vdisk_guid, current_sr, next_sr) + if interactive: + retry = True + while retry: + proceed = raw_input('{0}. Continue? (press Enter)'.format(rebalance_message)) + if proceed == '': # Mock 'Enter' key + retry = False + try: + volume_potential = destination_std.vpool.storagedriver_client.volume_potential(str(destination_std.storagedriver_id)) + except: + self.logger.exception('Unable to retrieve volume potential. Aborting') + raise + if volume_potential > minimum_potential: + self.logger.info(rebalance_message) + try: + vd.storagedriver_client.migrate(str(vd.volume_id), str(destination_std.name), False) + except RuntimeError: + # When a RunTimeError occurs. Try restarting the volume locally for safety measures. + self.logger.warning('Encountered RunTimeError. Checking if vdisk({0}) is not running and restarting it.'.format(vd.guid)) + vd.invalidate_dynamics('info') + if vd.info['live_status'] != vd.STATUSES.RUNNING: + vd.storagedriver_client.restart_object(str(vd.volume_id), False) + # Now check if the migration succeeded and if the volume is running on the correct storagedriver. + if vd.storagedriver_id == destination_std.name: + self.logger.info('Vdisk({0}) got restarted and runs on destination storagedriver. Previous error can be ignored.'.format(vd.guid)) + else: + self.logger.warning('Vdisk({0}) got restarted but doesn\'t run on destination storagedriver.'.format(vd.guid)) + + else: + raise ValueError('Volume potential is lower than {0}. Not moving anymore!'.format(minimum_potential)) + + @staticmethod + def map_vdisk_to_destination(balances): + # type: (List[VDiskBalance]) -> Dict[str, VDiskBalance] + """ + Map all vdisks to destinations of balances + :param balances: Balances to map for + :return: guid - balance map + """ + vdisk_balance_map = {} + for balance in balances: # type: VDiskBalance + for vdisk_guid in balance.added: + if vdisk_guid in vdisk_balance_map: + raise RuntimeError('Vdisk {} has multiple destinations'.format(vdisk_guid)) + vdisk_balance_map[vdisk_guid] = balance + return vdisk_balance_map + + def __str__(self): + return 'StorageRouter {} of VPool {}: hosting prior to changes: {}, imposed limit {}, hosting after changes: {}'\ + .format(self.storagedriver.storagerouter.name, + self.storagedriver.vpool.name, + len(self.hosted_guids), + self.limit, + len(self.balance)) diff --git a/ovs/update/__init__.py b/ovs/update/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/update/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/update/volumedriver.py b/ovs/update/volumedriver.py new file mode 100644 index 000000000..bf6446f2e --- /dev/null +++ b/ovs/update/volumedriver.py @@ -0,0 +1,306 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +from ovs.dal.hybrids.vdisk import VDisk +from ovs.dal.hybrids.storagerouter import StorageRouter +from ovs.dal.hybrids.storagedriver import StorageDriver +from ovs.extensions.generic.system import System +from ovs_extensions.update.base import ComponentUpdater +from ovs_extensions.log.logger import Logger + +# noinspection PyUnreachableCode +if False: + from typing import List + + +class VolumeDriverUpdater(ComponentUpdater): + + """ + Responsible for updating the volumedriver of a single node + """ + + def restart_services(cls): + """ + Override the service restart. The volumedrivers should be prepared for shutdown + :return: + """ + + @classmethod + def can_migrate_storagerouter(cls, storagerouter): + """ + Determine if all volumedrivers on the storagerouter can be migrated away + :param storagerouter: + :return: + """ + + @classmethod + def can_migrate_away(cls, storagedriver): + """ + Determine if all volumes of the storagedriver can be migrated away + :param storagedriver: + :return: + """ + + @classmethod + def migrate_awy(cls, storagedriver): + """ + Migrate all volumes away + :param storagedriver: + :return: + """ + + +class VDiskBalance(object): + + logger = Logger('vdisk_balance') + + def __init__(self, storagedriver, vdisk_limit): + # type: (StorageDriver, int) -> None + """ + Represents the vdisk balance of a storagedriver + :param storagedriver: StorageDriver to balance for + :type storagedriver: StorageDriver + :param vdisk_limit: Maximum amount of vdisks to host. -1 means no limit + :type vdisk_limit: int + """ + self.storagedriver = storagedriver + self.hosted_guids = storagedriver.vdisks_guids + self.limit = vdisk_limit + + self.balance, self.overflow = self.impose_limit() + self.added = [] + + def __add__(self, other): + if not isinstance(other, VDiskBalance) or self.storagedriver != other.storagedriver: + raise ValueError('Different objects cannot be added') + limit = self.limit + other.limit + self.set_limit(limit) + self.added += other.added + + def set_limit(self, limit): + """ + Set a new limit + :param limit: Limit to set + :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host + :rtype: Tuple(List[str], List[str]) + """ + self.limit = limit + self.balance, self.overflow = self.impose_limit() + return self.balance, self.overflow + + def impose_limit(self): + # type: () -> Tuple[List[str], List[str]] + """ + Impose the set limit. Returns the max amount of vdisks that can be hosted and the vdisks that need to go + :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host + :rtype: Tuple(List[str], List[str]) + """ + if self.limit == -1: + return self.hosted_guids, [] + overflow = self.hosted_guids[self.limit:] + balance = self.hosted_guids[:self.limit] + return balance, overflow + + def fill(self, vdisk_guids): + # type: (List[str]) -> Tuple[List[str], List[str]] + """ + Fill this balance until the limit is reached + :param vdisk_guids: Guids to add + :type vdisk_guids: List[str] + :return: The guids that could be added to this balanced and the guids that couldn't be added + :rtype: Tuple[List[str], List[str]] + """ + amount_to_add = self.limit - len(self.balance) + added = [] + overflow = vdisk_guids + if amount_to_add: + added = vdisk_guids[:amount_to_add] + overflow = vdisk_guids[amount_to_add:] + self.balance.extend(added) + self.added.extend(added) + return added, overflow + + def generate_overview(self): + # type: () -> dict + """ + Generate the move overview depending on the current state + :return: The overview from where the disks are coming from + :rtype: dict + """ + added_source_overview = {} + for vdisk_guid in self.added: + storagedriver_id = VDisk(vdisk_guid).storagedriver_id + if storagedriver_id not in added_source_overview: + added_source_overview[storagedriver_id] = [] + added_source_overview[storagedriver_id].append(vdisk_guid) + overview = {'added': self.added, + 'balance': self.balance, + 'overflow': self.overflow, + 'add_source_overview': added_source_overview} + return overview + + def execute_balance_change(self, force=False, user_input=False, abort_on_error=False): + # type: (Optional[bool], Optional[bool], Optional[bool]) -> Tuple[List[str], List[str]] + """ + Execute the necessary steps to balance out + :param force: Indicates whether to force the migration or not (forcing can lead to data loss) + :type force: bool + :param user_input: require user input to proceed to next vDisk + :type user_input: bool + :param abort_on_error: Abort script when error occurs during migration + :type abort_on_error: bool + :return: List with all successful moves, list with all failed moves + :rtype: NoneType + """ + failed_moves = [] + successful_moves = [] + vdisk_guid = None + try: + for vdisk_guid in self.added: + try: + self._execute_move(vdisk_guid, self.storagedriver, force, user_input) + successful_moves.append(vdisk_guid) + except: + self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, self.storagedriver.storagerouter_guid)) + if abort_on_error: + raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) + failed_moves.append(vdisk_guid) + except KeyboardInterrupt: + interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) + self.logger.warning(interrupt_msg) + if user_input: + if successful_moves: + print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) + if failed_moves: + print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) + raise + + return successful_moves, failed_moves + + def execute_balance_change_through_overflow(self, balances, force=False, user_input=False, abort_on_error=False): + # type: (List[VDiskBalance], bool, bool, bool) -> Tuple[List[str], List[str]] + """ + Execute the necessary steps to balance out. Starts from the overflow to move all vdisks from the container away first + Other balances must be passed on to see where they'd have to move to + :param balances: Other balances to work with. Used to find the owner of this balance its overflow + :type balances: List[VDiskBalance] + :param force: Indicates whether to force the migration or not (forcing can lead to data loss) + :type force: bool + :param user_input: require user input to proceed to next vDisk + :type user_input: bool + :param abort_on_error: Abort script when error occurs during migration + :type abort_on_error: bool + :return: List with all successful moves, list with all failed moves + :rtype: NoneType + """ + failed_moves = [] + successful_moves = [] + vdisk_guid = None + try: + vdisk_balance_map = self.map_vdisk_to_destination(balances) + for vdisk_guid in self.overflow: + add_balance = vdisk_balance_map[vdisk_guid] + destination_std = add_balance.storagedriver + try: + self._execute_move(vdisk_guid, destination_std, force, user_input) + successful_moves.append(vdisk_guid) + except: + self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter_guid)) + if abort_on_error: + raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) + failed_moves.append(vdisk_guid) + except KeyboardInterrupt: + interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) + self.logger.warning(interrupt_msg) + if user_input: + if successful_moves: + print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) + if failed_moves: + print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) + raise + + return successful_moves, failed_moves + + def _execute_move(self, vdisk_guid, destination_std, force, interactive, minimum_potential=1): + """ + Perform a move + :param vdisk_guid: VDisk to move + :param destination_std: Destination to move to + :param force: Use force when moving + :param interactive: Prompt for user input before moving + :return: None + """ + vd = VDisk(vdisk_guid) + current_sr = StorageRouter(vd.storagerouter_guid).name + next_sr = destination_std.storagerouter.name + if vd.storagerouter_guid == destination_std.storagerouter_guid: + # Ownership changed in meantime + self.logger.info('No longer need to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter.name)) + return + rebalance_message = 'Rebalancing vPool by moving vDisk {0} from {1} to {2}'.format(vdisk_guid, current_sr, next_sr) + if interactive: + retry = True + while retry: + proceed = raw_input('{0}. Continue? (press Enter)'.format(rebalance_message)) + if proceed == '': # Mock 'Enter' key + retry = False + try: + volume_potential = destination_std.vpool.storagedriver_client.volume_potential(str(destination_std.storagedriver_id)) + except: + self.logger.exception('Unable to retrieve volume potential. Aborting') + raise + if volume_potential > minimum_potential: + self.logger.info(rebalance_message) + try: + vd.storagedriver_client.migrate(str(vd.volume_id), str(destination_std.name), False) + except RuntimeError: + # When a RunTimeError occurs. Try restarting the volume locally for safety measures. + self.logger.warning('Encountered RunTimeError. Checking if vdisk({0}) is not running and restarting it.'.format(vd.guid)) + vd.discard() + if vd.info['live_status'] != vd.STATUSES.RUNNING: + vd.storagedriver_client.restart_object(str(vd.volume_id), False) + # Now check if the migration succeeded and if the volume is running on the correct storagedriver. + if vd.storagedriver_id == destination_std.name: + self.logger.info('Vdisk({0}) got restarted and runs on destination storagedriver. Previous error can be ignored.'.format(vd.guid)) + else: + self.logger.warning('Vdisk({0}) got restarted but doesn\'t run on destination storagedriver.'.format(vd.guid)) + + else: + raise ValueError('Volume potential is lower than {0}. Not moving anymore!'.format(minimum_potential)) + + @staticmethod + def map_vdisk_to_destination(balances): + # type: (List[VDiskBalance]) -> Dict[str, VDiskBalance] + """ + Map all vdisks to destinations of balances + :param balances: Balances to map for + :return: guid - balance map + """ + vdisk_balance_map = {} + for balance in balances: # type: VDiskBalance + for vdisk_guid in balance.added: + if vdisk_guid in vdisk_balance_map: + raise RuntimeError('Vdisk {} has multiple destinations'.format(vdisk_guid)) + vdisk_balance_map[vdisk_guid] = balance + return vdisk_balance_map + + def __str__(self): + return 'StorageRouter {} of VPool {}: hosting prior to changes: {}, imposed limit {}, hosting after changes: {}'\ + .format(self.storagedriver.storagerouter.name, + self.storagedriver.vpool.name, + len(self.hosted_guids), + self.limit, + len(self.balance)) diff --git a/ovs/update/volumedriver/__init__.py b/ovs/update/volumedriver/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/update/volumedriver/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py new file mode 100644 index 000000000..39c9d6ac2 --- /dev/null +++ b/ovs/update/volumedriver/updater.py @@ -0,0 +1,96 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +from ovs.dal.hybrids.vpool import VPool +from ovs.dal.hybrids.vdisk import VDisk +from ovs.dal.hybrids.storagerouter import StorageRouter +from ovs.dal.hybrids.storagedriver import StorageDriver +from ovs.extensions.generic.system import System +from ovs_extensions.update.base import ComponentUpdater +from ovs.lib.helpers.vdisk.rebalancer import VDiskRebalancer, VDiskBalance +from ovs_extensions.log.logger import Logger + +# noinspection PyUnreachableCode +if False: + from typing import List, Dict + + +class VolumeDriverUpdater(ComponentUpdater): + + """ + Responsible for updating the volumedriver of a single node + """ + + LOCAL_SR = System.get_my_storagerouter() + + @classmethod + def restart_services(cls): + """ + Override the service restart. The volumedrivers should be prepared for shutdown + :return: + """ + # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise + balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) + # Plan to execute migrate. Avoid the VPool from being an HA target + cls.make_sr_unreachable_for_ha(cls.LOCAL_SR) + + @staticmethod + def get_vpool_balances_for_evacuating_storagerouter(storagerouter): + # type: (StorageRouter) -> Dict[VPool, List[VDiskBalance]] + """ + Retrieve the balances for every vpool on the local machine + :param storagerouter: Storagerouter to migrate away from + :type storagerouter: StorageRouter + :return: Dict with vpool and balances + :rtype: Dict[VPool, VDiskBalance] + :raises RuntimeError if not all vdisks would be able to move out + """ + errors = [] + evacuate_srs = [storagerouter.guid] + balances_by_vpool = {} + for storagedriver in storagerouter.storagedrivers: + vpool = storagedriver.vpool + try: + balances = VDiskRebalancer.get_rebalanced_layout(storagedriver.vpool_guid, + ignore_domains=False, + excluded_storagerouters=None, + evacuate_storagerouters=evacuate_srs, + base_on_volume_potential=True) + balances_sorted = sorted(balances, key=lambda b: b.storagedriver.storagerouter_guid in evacuate_srs, + reverse=True) + balances_by_vpool[vpool] = balances_sorted + except Exception as ex: + errors.append((vpool, ex)) + if errors: + formatted_errors = '\n - {0}'.format('\n - '.join('VPool {0}: {1}'.format(vpool.name, error) for vpool, error in errors)) + raise RuntimeError('Unable to migrate all volumes away from this machine: {}'.format(formatted_errors)) + return balances_by_vpool + + @classmethod + def make_sr_unreachable_for_ha(cls, storagerouter): + """ + Update the node distance maps to + :return: None + :rtype: NoneType + """ + + @classmethod + def migrate_awy(cls, storagedriver): + """ + Migrate all volumes away + :param storagedriver: + :return: + """ From e5e6ce8cce6ed90c49b89857c527f74bffccda47 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Thu, 20 Jun 2019 17:58:37 +0200 Subject: [PATCH 2/7] Remove old volumedriver update file --- ovs/update/volumedriver.py | 306 ------------------------------------- 1 file changed, 306 deletions(-) delete mode 100644 ovs/update/volumedriver.py diff --git a/ovs/update/volumedriver.py b/ovs/update/volumedriver.py deleted file mode 100644 index bf6446f2e..000000000 --- a/ovs/update/volumedriver.py +++ /dev/null @@ -1,306 +0,0 @@ -# Copyright (C) 2019 iNuron NV -# -# This file is part of Open vStorage Open Source Edition (OSE), -# as available from -# -# http://www.openvstorage.org and -# http://www.openvstorage.com. -# -# This file is free software; you can redistribute it and/or modify it -# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) -# as published by the Free Software Foundation, in version 3 as it comes -# in the LICENSE.txt file of the Open vStorage OSE distribution. -# -# Open vStorage is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY of any kind. - -from ovs.dal.hybrids.vdisk import VDisk -from ovs.dal.hybrids.storagerouter import StorageRouter -from ovs.dal.hybrids.storagedriver import StorageDriver -from ovs.extensions.generic.system import System -from ovs_extensions.update.base import ComponentUpdater -from ovs_extensions.log.logger import Logger - -# noinspection PyUnreachableCode -if False: - from typing import List - - -class VolumeDriverUpdater(ComponentUpdater): - - """ - Responsible for updating the volumedriver of a single node - """ - - def restart_services(cls): - """ - Override the service restart. The volumedrivers should be prepared for shutdown - :return: - """ - - @classmethod - def can_migrate_storagerouter(cls, storagerouter): - """ - Determine if all volumedrivers on the storagerouter can be migrated away - :param storagerouter: - :return: - """ - - @classmethod - def can_migrate_away(cls, storagedriver): - """ - Determine if all volumes of the storagedriver can be migrated away - :param storagedriver: - :return: - """ - - @classmethod - def migrate_awy(cls, storagedriver): - """ - Migrate all volumes away - :param storagedriver: - :return: - """ - - -class VDiskBalance(object): - - logger = Logger('vdisk_balance') - - def __init__(self, storagedriver, vdisk_limit): - # type: (StorageDriver, int) -> None - """ - Represents the vdisk balance of a storagedriver - :param storagedriver: StorageDriver to balance for - :type storagedriver: StorageDriver - :param vdisk_limit: Maximum amount of vdisks to host. -1 means no limit - :type vdisk_limit: int - """ - self.storagedriver = storagedriver - self.hosted_guids = storagedriver.vdisks_guids - self.limit = vdisk_limit - - self.balance, self.overflow = self.impose_limit() - self.added = [] - - def __add__(self, other): - if not isinstance(other, VDiskBalance) or self.storagedriver != other.storagedriver: - raise ValueError('Different objects cannot be added') - limit = self.limit + other.limit - self.set_limit(limit) - self.added += other.added - - def set_limit(self, limit): - """ - Set a new limit - :param limit: Limit to set - :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host - :rtype: Tuple(List[str], List[str]) - """ - self.limit = limit - self.balance, self.overflow = self.impose_limit() - return self.balance, self.overflow - - def impose_limit(self): - # type: () -> Tuple[List[str], List[str]] - """ - Impose the set limit. Returns the max amount of vdisks that can be hosted and the vdisks that need to go - :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host - :rtype: Tuple(List[str], List[str]) - """ - if self.limit == -1: - return self.hosted_guids, [] - overflow = self.hosted_guids[self.limit:] - balance = self.hosted_guids[:self.limit] - return balance, overflow - - def fill(self, vdisk_guids): - # type: (List[str]) -> Tuple[List[str], List[str]] - """ - Fill this balance until the limit is reached - :param vdisk_guids: Guids to add - :type vdisk_guids: List[str] - :return: The guids that could be added to this balanced and the guids that couldn't be added - :rtype: Tuple[List[str], List[str]] - """ - amount_to_add = self.limit - len(self.balance) - added = [] - overflow = vdisk_guids - if amount_to_add: - added = vdisk_guids[:amount_to_add] - overflow = vdisk_guids[amount_to_add:] - self.balance.extend(added) - self.added.extend(added) - return added, overflow - - def generate_overview(self): - # type: () -> dict - """ - Generate the move overview depending on the current state - :return: The overview from where the disks are coming from - :rtype: dict - """ - added_source_overview = {} - for vdisk_guid in self.added: - storagedriver_id = VDisk(vdisk_guid).storagedriver_id - if storagedriver_id not in added_source_overview: - added_source_overview[storagedriver_id] = [] - added_source_overview[storagedriver_id].append(vdisk_guid) - overview = {'added': self.added, - 'balance': self.balance, - 'overflow': self.overflow, - 'add_source_overview': added_source_overview} - return overview - - def execute_balance_change(self, force=False, user_input=False, abort_on_error=False): - # type: (Optional[bool], Optional[bool], Optional[bool]) -> Tuple[List[str], List[str]] - """ - Execute the necessary steps to balance out - :param force: Indicates whether to force the migration or not (forcing can lead to data loss) - :type force: bool - :param user_input: require user input to proceed to next vDisk - :type user_input: bool - :param abort_on_error: Abort script when error occurs during migration - :type abort_on_error: bool - :return: List with all successful moves, list with all failed moves - :rtype: NoneType - """ - failed_moves = [] - successful_moves = [] - vdisk_guid = None - try: - for vdisk_guid in self.added: - try: - self._execute_move(vdisk_guid, self.storagedriver, force, user_input) - successful_moves.append(vdisk_guid) - except: - self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, self.storagedriver.storagerouter_guid)) - if abort_on_error: - raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) - failed_moves.append(vdisk_guid) - except KeyboardInterrupt: - interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) - self.logger.warning(interrupt_msg) - if user_input: - if successful_moves: - print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) - if failed_moves: - print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) - raise - - return successful_moves, failed_moves - - def execute_balance_change_through_overflow(self, balances, force=False, user_input=False, abort_on_error=False): - # type: (List[VDiskBalance], bool, bool, bool) -> Tuple[List[str], List[str]] - """ - Execute the necessary steps to balance out. Starts from the overflow to move all vdisks from the container away first - Other balances must be passed on to see where they'd have to move to - :param balances: Other balances to work with. Used to find the owner of this balance its overflow - :type balances: List[VDiskBalance] - :param force: Indicates whether to force the migration or not (forcing can lead to data loss) - :type force: bool - :param user_input: require user input to proceed to next vDisk - :type user_input: bool - :param abort_on_error: Abort script when error occurs during migration - :type abort_on_error: bool - :return: List with all successful moves, list with all failed moves - :rtype: NoneType - """ - failed_moves = [] - successful_moves = [] - vdisk_guid = None - try: - vdisk_balance_map = self.map_vdisk_to_destination(balances) - for vdisk_guid in self.overflow: - add_balance = vdisk_balance_map[vdisk_guid] - destination_std = add_balance.storagedriver - try: - self._execute_move(vdisk_guid, destination_std, force, user_input) - successful_moves.append(vdisk_guid) - except: - self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter_guid)) - if abort_on_error: - raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) - failed_moves.append(vdisk_guid) - except KeyboardInterrupt: - interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) - self.logger.warning(interrupt_msg) - if user_input: - if successful_moves: - print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) - if failed_moves: - print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) - raise - - return successful_moves, failed_moves - - def _execute_move(self, vdisk_guid, destination_std, force, interactive, minimum_potential=1): - """ - Perform a move - :param vdisk_guid: VDisk to move - :param destination_std: Destination to move to - :param force: Use force when moving - :param interactive: Prompt for user input before moving - :return: None - """ - vd = VDisk(vdisk_guid) - current_sr = StorageRouter(vd.storagerouter_guid).name - next_sr = destination_std.storagerouter.name - if vd.storagerouter_guid == destination_std.storagerouter_guid: - # Ownership changed in meantime - self.logger.info('No longer need to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter.name)) - return - rebalance_message = 'Rebalancing vPool by moving vDisk {0} from {1} to {2}'.format(vdisk_guid, current_sr, next_sr) - if interactive: - retry = True - while retry: - proceed = raw_input('{0}. Continue? (press Enter)'.format(rebalance_message)) - if proceed == '': # Mock 'Enter' key - retry = False - try: - volume_potential = destination_std.vpool.storagedriver_client.volume_potential(str(destination_std.storagedriver_id)) - except: - self.logger.exception('Unable to retrieve volume potential. Aborting') - raise - if volume_potential > minimum_potential: - self.logger.info(rebalance_message) - try: - vd.storagedriver_client.migrate(str(vd.volume_id), str(destination_std.name), False) - except RuntimeError: - # When a RunTimeError occurs. Try restarting the volume locally for safety measures. - self.logger.warning('Encountered RunTimeError. Checking if vdisk({0}) is not running and restarting it.'.format(vd.guid)) - vd.discard() - if vd.info['live_status'] != vd.STATUSES.RUNNING: - vd.storagedriver_client.restart_object(str(vd.volume_id), False) - # Now check if the migration succeeded and if the volume is running on the correct storagedriver. - if vd.storagedriver_id == destination_std.name: - self.logger.info('Vdisk({0}) got restarted and runs on destination storagedriver. Previous error can be ignored.'.format(vd.guid)) - else: - self.logger.warning('Vdisk({0}) got restarted but doesn\'t run on destination storagedriver.'.format(vd.guid)) - - else: - raise ValueError('Volume potential is lower than {0}. Not moving anymore!'.format(minimum_potential)) - - @staticmethod - def map_vdisk_to_destination(balances): - # type: (List[VDiskBalance]) -> Dict[str, VDiskBalance] - """ - Map all vdisks to destinations of balances - :param balances: Balances to map for - :return: guid - balance map - """ - vdisk_balance_map = {} - for balance in balances: # type: VDiskBalance - for vdisk_guid in balance.added: - if vdisk_guid in vdisk_balance_map: - raise RuntimeError('Vdisk {} has multiple destinations'.format(vdisk_guid)) - vdisk_balance_map[vdisk_guid] = balance - return vdisk_balance_map - - def __str__(self): - return 'StorageRouter {} of VPool {}: hosting prior to changes: {}, imposed limit {}, hosting after changes: {}'\ - .format(self.storagedriver.storagerouter.name, - self.storagedriver.vpool.name, - len(self.hosted_guids), - self.limit, - len(self.balance)) From 22cd407a34d26538463d5126fe28532045afc226 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Fri, 21 Jun 2019 17:05:51 +0200 Subject: [PATCH 3/7] !WIP Move MDS masters away through celery --- ovs/lib/helpers/storagerouter/__init__.py | 16 ---- ovs/lib/helpers/storagerouter/evacuate.py | 30 ------- ovs/update/volumedriver/updater.py | 105 ++++++++++++++++++++-- 3 files changed, 97 insertions(+), 54 deletions(-) delete mode 100644 ovs/lib/helpers/storagerouter/__init__.py delete mode 100644 ovs/lib/helpers/storagerouter/evacuate.py diff --git a/ovs/lib/helpers/storagerouter/__init__.py b/ovs/lib/helpers/storagerouter/__init__.py deleted file mode 100644 index 301eed1cb..000000000 --- a/ovs/lib/helpers/storagerouter/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright (C) 2019 iNuron NV -# -# This file is part of Open vStorage Open Source Edition (OSE), -# as available from -# -# http://www.openvstorage.org and -# http://www.openvstorage.com. -# -# This file is free software; you can redistribute it and/or modify it -# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) -# as published by the Free Software Foundation, in version 3 as it comes -# in the LICENSE.txt file of the Open vStorage OSE distribution. -# -# Open vStorage is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY of any kind. - diff --git a/ovs/lib/helpers/storagerouter/evacuate.py b/ovs/lib/helpers/storagerouter/evacuate.py deleted file mode 100644 index e442c85a1..000000000 --- a/ovs/lib/helpers/storagerouter/evacuate.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright (C) 2019 iNuron NV -# -# This file is part of Open vStorage Open Source Edition (OSE), -# as available from -# -# http://www.openvstorage.org and -# http://www.openvstorage.com. -# -# This file is free software; you can redistribute it and/or modify it -# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) -# as published by the Free Software Foundation, in version 3 as it comes -# in the LICENSE.txt file of the Open vStorage OSE distribution. -# -# Open vStorage is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY of any kind. - -""" -Evacuate Storagerouter module -""" - - -class StorageRouterEvacuator(object): - - """ - Evacuates all volumes on the given storagerouter - - Check for volume potential to see if evacuation is possible - - Moves volumes away as evenly as possible - - Can wait for MDS masters to have moved - """ - diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index 39c9d6ac2..8b4133036 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -13,7 +13,7 @@ # # Open vStorage is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY of any kind. - +from celery import chain, group from ovs.dal.hybrids.vpool import VPool from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.hybrids.storagerouter import StorageRouter @@ -22,7 +22,8 @@ from ovs_extensions.update.base import ComponentUpdater from ovs.lib.helpers.vdisk.rebalancer import VDiskRebalancer, VDiskBalance from ovs_extensions.log.logger import Logger - +from ovs.extensions.storage.persistentfactory import PersistentFactory +from ovs.lib.mdsservice import MDSServiceController # noinspection PyUnreachableCode if False: from typing import List, Dict @@ -35,6 +36,7 @@ class VolumeDriverUpdater(ComponentUpdater): """ LOCAL_SR = System.get_my_storagerouter() + EDGE_SYNC_TIME = 5 * 60 @classmethod def restart_services(cls): @@ -45,7 +47,14 @@ def restart_services(cls): # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) # Plan to execute migrate. Avoid the VPool from being an HA target - cls.make_sr_unreachable_for_ha(cls.LOCAL_SR) + cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR) + try: + # @todo Go concurrently? + for vpool, balances in balances_by_vpool.iteritems(): + cls.migrate_away(balances, cls.LOCAL_SR) + cls.migrate_master_mds(cls.LOCAL_SR) + finally: + cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR) @staticmethod def get_vpool_balances_for_evacuating_storagerouter(storagerouter): @@ -80,17 +89,97 @@ def get_vpool_balances_for_evacuating_storagerouter(storagerouter): return balances_by_vpool @classmethod - def make_sr_unreachable_for_ha(cls, storagerouter): + def mark_storagerouter_unreachable_for_ha(cls, storagerouter): """ Update the node distance maps to + Current code paths that update the node distance map on the volumedriver side are: + - Update of domains + - Update of vpool layout (extend/shrink) + - cluster registry checkup (ran periodically) :return: None :rtype: NoneType """ + # Mark the storagerouter as excluded for any checkups + # @todo implement + pass + # Checkup to adjust the node distance map + # todo + pass + # Wait for a period of time to let all clients sync up + # @todo - @classmethod - def migrate_awy(cls, storagedriver): + @staticmethod + def mark_storagerouter_reachable_for_ha(storagerouter): + # type: (StorageRouter) -> None + """ + Update the node distance map to add the storagerouter back into the HA pool + :param storagerouter: Storagerouter to put back into the distance map + :type storagerouter: StorageRouter + :return: None + """ + + @staticmethod + def migrate_away(balances, storagerouter): + # type: (List[VDiskBalance], StorageRouter) -> None """ Migrate all volumes away - :param storagedriver: - :return: + :param balances: List of vdisk balances to execute + :type balances: List[VDiskBalance] + :param storagerouter: Storagerouter to move away from + :type storagerouter: StorageRouter + :return: None """ + evacuate_srs = [storagerouter.guid] + for balance in balances: # type: VDiskBalance + if balance.storagedriver.storagerouter_guid in evacuate_srs: + successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, + user_input=False, + abort_on_error=False) + + @classmethod + def migrate_master_mds(cls, storagerouter): + """ + Migrate away all master mds from the given storagerouter + :param storagerouter: Storagerouter to migrate away from + :type storagerouter: StorageRouter + :return: None + :rtype: NoneType + """ + all_masters_gone = False + while not all_masters_gone: + vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter) + all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0 + chains = [] + for vpool, vdisks in vpool_mds_master_vdisks.iteritems(): + chains.append(chain(MDSServiceController.ensure_safety.si(vdisk.guid) for vdisk in vdisks)) + # Add all chain signatures to a group for parallel execution + task_group = group(c.s() for c in chains) + # Wait for the group result + result = task_group().get() + print result + + @staticmethod + def get_vdisks_mds_masters_on_storagerouter(storagerouter): + # type: (StorageRouter) -> Dict[VPool, List[VDisk]] + """ + Retrieve all vdisks with the MDS master on the given storagerouter + :param storagerouter: Storagerouter to list MDS masters on + :type storagerouter: StorageRouter + :return: Dict with VPool as key and vdisks with the MDS master on the storagerouter as value + :rtype: Dict[VPool, List[VDisk] + """ + mds_masters = {} + vpools = set(sd.vpool for sd in storagerouter.storagedrivers) + for vpool in sorted(vpools, key=lambda k: k.name): + masters = [] + for mds_service in sorted(vpool.mds_services, key=lambda k: k.number): + if mds_service.service.storagerouter_guid == storagerouter.guid: + for junction in mds_service.vdisks: + if junction.is_master: + masters.append(junction.vdisk_guid) + mds_masters[vpool.name] = masters + return mds_masters + + @staticmethod + def get_persistent_client(): + return PersistentFactory.get_client() From 678a61645988078c358d4183b440c9a0b2c497a3 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Mon, 24 Jun 2019 16:48:21 +0200 Subject: [PATCH 4/7] !WIP Add storagerouter exclusion into the node distance map --- ovs/constants/vpool.py | 22 ++++++++++++++++++ ovs/dal/hybrids/storagedriver.py | 9 +++++++- ovs/update/volumedriver/updater.py | 37 +++++++++++++++++++----------- 3 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 ovs/constants/vpool.py diff --git a/ovs/constants/vpool.py b/ovs/constants/vpool.py new file mode 100644 index 000000000..9bab90459 --- /dev/null +++ b/ovs/constants/vpool.py @@ -0,0 +1,22 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +import os + +VPOOL_UPDATE_KEY = os.path.join(os.path.sep, 'ovs', 'volumedriver', 'update', 'storagerouter') + +PACKAGES_OSE = ['volumedriver-no-dedup-base', 'volumedriver-no-dedup-server'] +PACKAGES_EE = ['volumedriver-ee-base', 'volumedriver-ee-server'] diff --git a/ovs/dal/hybrids/storagedriver.py b/ovs/dal/hybrids/storagedriver.py index 9f623eca0..bb9373970 100644 --- a/ovs/dal/hybrids/storagedriver.py +++ b/ovs/dal/hybrids/storagedriver.py @@ -19,11 +19,13 @@ """ import time +from ovs.constants.vpool import VPOOL_UPDATE_KEY from ovs.dal.dataobject import DataObject from ovs.dal.structures import Property, Relation, Dynamic from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.hybrids.vpool import VPool from ovs.dal.hybrids.storagerouter import StorageRouter +from ovs.extensions.generic.configuration import Configuration from ovs.extensions.storageserver.storagedriver import StorageDriverClient from ovs.log.log_handler import LogHandler @@ -187,10 +189,15 @@ def _cluster_node_config(self): primary_domains.append(junction.domain_guid) else: secondary_domains.append(junction.domain_guid) + # @todo implement more race-conditions guarantees. Current guarantee is the single update invalidating the value + # through cluster_registry_checkup + storagerouters_marked_for_update = Configuration.list(VPOOL_UPDATE_KEY) for sd in self.vpool.storagedrivers: if sd.guid == self.guid: continue - if len(primary_domains) == 0: + if sd.storagerouter_guid in storagerouters_marked_for_update: + distance_map[str(sd.storagedriver_id)] = StorageDriver.DISTANCES.FAR + elif len(primary_domains) == 0: distance_map[str(sd.storagedriver_id)] = StorageDriver.DISTANCES.NEAR else: distance = StorageDriver.DISTANCES.INFINITE diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index 8b4133036..08ec5440c 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -13,15 +13,19 @@ # # Open vStorage is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY of any kind. + +import os +import time from celery import chain, group +from ovs.constants.vpool import VPOOL_UPDATE_KEY +from ovs.extensions.generic.configuration import Configuration from ovs.dal.hybrids.vpool import VPool from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.hybrids.storagerouter import StorageRouter -from ovs.dal.hybrids.storagedriver import StorageDriver from ovs.extensions.generic.system import System from ovs_extensions.update.base import ComponentUpdater from ovs.lib.helpers.vdisk.rebalancer import VDiskRebalancer, VDiskBalance -from ovs_extensions.log.logger import Logger +from ovs.lib.storagedriver import StorageDriverController from ovs.extensions.storage.persistentfactory import PersistentFactory from ovs.lib.mdsservice import MDSServiceController # noinspection PyUnreachableCode @@ -35,6 +39,8 @@ class VolumeDriverUpdater(ComponentUpdater): Responsible for updating the volumedriver of a single node """ + COMPONENT = None + BINARIES = [] # List with tuples. [(package_name, binary_name, binary_location, [service_prefix_0]] LOCAL_SR = System.get_my_storagerouter() EDGE_SYNC_TIME = 5 * 60 @@ -49,7 +55,7 @@ def restart_services(cls): # Plan to execute migrate. Avoid the VPool from being an HA target cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR) try: - # @todo Go concurrently? + # @todo currency? for vpool, balances in balances_by_vpool.iteritems(): cls.migrate_away(balances, cls.LOCAL_SR) cls.migrate_master_mds(cls.LOCAL_SR) @@ -99,17 +105,16 @@ def mark_storagerouter_unreachable_for_ha(cls, storagerouter): :return: None :rtype: NoneType """ - # Mark the storagerouter as excluded for any checkups - # @todo implement - pass - # Checkup to adjust the node distance map - # todo - pass - # Wait for a period of time to let all clients sync up - # @todo + # Set the value used in the storagedriver cluster node config path + # This holds for all mentioned paths in the docstrings + Configuration.set(os.path.join(VPOOL_UPDATE_KEY, storagerouter.guid), 0) + # Trigger a complete reload of node distance maps + StorageDriverController.cluster_registry_checkup() + # Wait a few moment for the edge to catch up all the configs + time.sleep(2 * cls.EDGE_SYNC_TIME) - @staticmethod - def mark_storagerouter_reachable_for_ha(storagerouter): + @classmethod + def mark_storagerouter_reachable_for_ha(cls, storagerouter): # type: (StorageRouter) -> None """ Update the node distance map to add the storagerouter back into the HA pool @@ -117,6 +122,11 @@ def mark_storagerouter_reachable_for_ha(storagerouter): :type storagerouter: StorageRouter :return: None """ + Configuration.delete(os.path.join(VPOOL_UPDATE_KEY, storagerouter.guid)) + # Trigger a complete reload of node distance maps + StorageDriverController.cluster_registry_checkup() + # Wait a few moment for the edge to catch up all the configs + time.sleep(2 * cls.EDGE_SYNC_TIME) @staticmethod def migrate_away(balances, storagerouter): @@ -132,6 +142,7 @@ def migrate_away(balances, storagerouter): evacuate_srs = [storagerouter.guid] for balance in balances: # type: VDiskBalance if balance.storagedriver.storagerouter_guid in evacuate_srs: + # @todo abort on failed moves? The user should know but when...? successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, user_input=False, abort_on_error=False) From 3192e055065e10f48e1fa4b18322b28d1833ca32 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Fri, 28 Jun 2019 09:51:52 +0200 Subject: [PATCH 5/7] Limit celery chain size to avoid recursionexception Improve logging Add custom exceptions Add documentation for volumedriver update --- ovs/constants/vpool.py | 5 ++ ovs/update/__init__.py | 2 + ovs/update/single_update.md | 21 +++++++ ovs/update/volumedriver/updater.py | 95 ++++++++++++++++++++++++------ 4 files changed, 104 insertions(+), 19 deletions(-) create mode 100644 ovs/update/single_update.md diff --git a/ovs/constants/vpool.py b/ovs/constants/vpool.py index 9bab90459..4459dd014 100644 --- a/ovs/constants/vpool.py +++ b/ovs/constants/vpool.py @@ -18,5 +18,10 @@ VPOOL_UPDATE_KEY = os.path.join(os.path.sep, 'ovs', 'volumedriver', 'update', 'storagerouter') +STORAGEDRIVER_SERVICE_BASE = 'ovs-volumedriver' + PACKAGES_OSE = ['volumedriver-no-dedup-base', 'volumedriver-no-dedup-server'] PACKAGES_EE = ['volumedriver-ee-base', 'volumedriver-ee-server'] + +VOLUMEDRIVER_BIN_PATH = os.path.join(os.path.sep, 'usr', 'bin', 'volumedriver_fs.sh') +VOLUMEDRIVER_CMD_NAME = 'volumedriver_fs' diff --git a/ovs/update/__init__.py b/ovs/update/__init__.py index f3ecd2bfd..cc1f36adb 100644 --- a/ovs/update/__init__.py +++ b/ovs/update/__init__.py @@ -13,3 +13,5 @@ # # Open vStorage is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY of any kind. + +from .volumedriver.updater import VolumeDriverUpdater diff --git a/ovs/update/single_update.md b/ovs/update/single_update.md new file mode 100644 index 000000000..9f040f435 --- /dev/null +++ b/ovs/update/single_update.md @@ -0,0 +1,21 @@ +# Single update +The Framework now supports updating certain components on a single host at the time. +It provides simple CLI commands (`ovs local_update `) to update Arakoon, Alba and the volumedriver + +## Volumedriver update +The Volumedriver update can be started by running `ovs local_update volumedriver`. +The local update will: +- Update the volumedriver binaries +- Update the node distance map to avoid HA-ing to the node being upgraded +- Move away all volumes from the node to other nodes +- Migrate away all MDS master instances running on the node +- Restart the volumedriver services +- Update the node distance map to accept HA back onto the node + +### Exceptions +Certain exception can be thrown during the update. + +| Exit code | Exception name | What happened | +| --------- | ---------------| ------------- | +| 21 | FailedToMigrateException | Not all volumes can be migrated away to different nodes. No moves have started yet | +| 22 | FailureDuringMigrateException | Some volumes could not be moved away during the migration process | diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index 08ec5440c..cc93d0f51 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -16,8 +16,9 @@ import os import time +import itertools from celery import chain, group -from ovs.constants.vpool import VPOOL_UPDATE_KEY +from ovs.constants.vpool import VPOOL_UPDATE_KEY, STORAGEDRIVER_SERVICE_BASE, VOLUMEDRIVER_BIN_PATH, VOLUMEDRIVER_CMD_NAME, PACKAGES_EE from ovs.extensions.generic.configuration import Configuration from ovs.dal.hybrids.vpool import VPool from ovs.dal.hybrids.vdisk import VDisk @@ -28,9 +29,24 @@ from ovs.lib.storagedriver import StorageDriverController from ovs.extensions.storage.persistentfactory import PersistentFactory from ovs.lib.mdsservice import MDSServiceController +from ovs.log.log_handler import LogHandler # noinspection PyUnreachableCode if False: - from typing import List, Dict + from typing import List, Dict, Tuple + + +class FailedToMigrateException(EnvironmentError): + """ + Thrown when not all volumes would be able to move away + """ + exit_code = 21 + + +class FailureDuringMigrateException(EnvironmentError): + """ + Thrown when certain volumes failed to move away + """ + exit_code = 22 class VolumeDriverUpdater(ComponentUpdater): @@ -39,17 +55,22 @@ class VolumeDriverUpdater(ComponentUpdater): Responsible for updating the volumedriver of a single node """ - COMPONENT = None - BINARIES = [] # List with tuples. [(package_name, binary_name, binary_location, [service_prefix_0]] + logger = LogHandler.get('update', 'volumedriver') + + COMPONENT = 'volumedriver' + # List with tuples. [(package_name, binary_name, binary_location, [service_prefix_0]] + BINARIES = [(PACKAGES_EE, VOLUMEDRIVER_CMD_NAME, VOLUMEDRIVER_BIN_PATH, [STORAGEDRIVER_SERVICE_BASE])] # type: List[Tuple[List[str], str, str, List[str]]] LOCAL_SR = System.get_my_storagerouter() - EDGE_SYNC_TIME = 5 * 60 + # @todo revert + # EDGE_SYNC_TIME = 5 * 60 + EDGE_SYNC_TIME = 1 @classmethod def restart_services(cls): """ Override the service restart. The volumedrivers should be prepared for shutdown - :return: """ + cls.logger.info("Restarting all related services") # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) # Plan to execute migrate. Avoid the VPool from being an HA target @@ -59,6 +80,8 @@ def restart_services(cls): for vpool, balances in balances_by_vpool.iteritems(): cls.migrate_away(balances, cls.LOCAL_SR) cls.migrate_master_mds(cls.LOCAL_SR) + all_prefixes = tuple(itertools.chain.from_iterable(b[3] for b in cls.BINARIES)) + return cls.restart_services_by_prefixes(all_prefixes) finally: cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR) @@ -71,7 +94,7 @@ def get_vpool_balances_for_evacuating_storagerouter(storagerouter): :type storagerouter: StorageRouter :return: Dict with vpool and balances :rtype: Dict[VPool, VDiskBalance] - :raises RuntimeError if not all vdisks would be able to move out + :raises FailedToMigrateException if not all vdisks would be able to move out """ errors = [] evacuate_srs = [storagerouter.guid] @@ -91,7 +114,7 @@ def get_vpool_balances_for_evacuating_storagerouter(storagerouter): errors.append((vpool, ex)) if errors: formatted_errors = '\n - {0}'.format('\n - '.join('VPool {0}: {1}'.format(vpool.name, error) for vpool, error in errors)) - raise RuntimeError('Unable to migrate all volumes away from this machine: {}'.format(formatted_errors)) + raise FailedToMigrateException('Unable to migrate all volumes away from this machine: {}'.format(formatted_errors)) return balances_by_vpool @classmethod @@ -105,13 +128,16 @@ def mark_storagerouter_unreachable_for_ha(cls, storagerouter): :return: None :rtype: NoneType """ + cls.logger.info("Marking Storagerouter {} as unavailable for HA".format(storagerouter.name)) # Set the value used in the storagedriver cluster node config path # This holds for all mentioned paths in the docstrings Configuration.set(os.path.join(VPOOL_UPDATE_KEY, storagerouter.guid), 0) # Trigger a complete reload of node distance maps StorageDriverController.cluster_registry_checkup() # Wait a few moment for the edge to catch up all the configs - time.sleep(2 * cls.EDGE_SYNC_TIME) + sleep_time = cls.get_edge_sync_time() + cls.logger.info("Waiting {} to sync up all edge clients".format(sleep_time)) + time.sleep(sleep_time) @classmethod def mark_storagerouter_reachable_for_ha(cls, storagerouter): @@ -122,11 +148,14 @@ def mark_storagerouter_reachable_for_ha(cls, storagerouter): :type storagerouter: StorageRouter :return: None """ + cls.logger.info("Marking Storagerouter {} as available for HA".format(storagerouter.name)) Configuration.delete(os.path.join(VPOOL_UPDATE_KEY, storagerouter.guid)) # Trigger a complete reload of node distance maps StorageDriverController.cluster_registry_checkup() # Wait a few moment for the edge to catch up all the configs - time.sleep(2 * cls.EDGE_SYNC_TIME) + sleep_time = cls.get_edge_sync_time() + cls.logger.info("Waiting {} to sync up all edge clients".format(sleep_time)) + time.sleep(sleep_time) @staticmethod def migrate_away(balances, storagerouter): @@ -138,36 +167,54 @@ def migrate_away(balances, storagerouter): :param storagerouter: Storagerouter to move away from :type storagerouter: StorageRouter :return: None + :raises: FailureDuringMigrateException if any volumes failed to move """ evacuate_srs = [storagerouter.guid] for balance in balances: # type: VDiskBalance if balance.storagedriver.storagerouter_guid in evacuate_srs: - # @todo abort on failed moves? The user should know but when...? successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, user_input=False, abort_on_error=False) + if failed_moves: + raise FailureDuringMigrateException('Could not move volumes {} away'.format(', '.join(failed_moves))) @classmethod - def migrate_master_mds(cls, storagerouter): + def migrate_master_mds(cls, storagerouter, max_chain_size=100): """ Migrate away all master mds from the given storagerouter :param storagerouter: Storagerouter to migrate away from :type storagerouter: StorageRouter + :param max_chain_size: Maximum number of tasks within a chain. Set because https://github.com/celery/celery/issues/1078 + :type max_chain_size: int :return: None :rtype: NoneType """ - all_masters_gone = False - while not all_masters_gone: + cls.logger.info("Starting MDS migrations") + while True: vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter) all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0 + if all_masters_gone: + break chains = [] - for vpool, vdisks in vpool_mds_master_vdisks.iteritems(): - chains.append(chain(MDSServiceController.ensure_safety.si(vdisk.guid) for vdisk in vdisks)) + for vpool_guid, vdisk_guids in vpool_mds_master_vdisks.iteritems(): + signatures = [] + tasks = [] + for vdisk_guid in vdisk_guids[0:max_chain_size]: + cls.logger.info('Ensuring safety for {}'.format(vdisk_guid)) + signature = MDSServiceController.ensure_safety.si(vdisk_guid) + # Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it + tasks.append(signature.freeze()) + signatures.append(signature) + if signatures: + cls.logger.info('Adding chain for VPool {} with tasks {}'.format(vpool_guid, ', '.join(t.id for t in tasks))) + chains.append(chain(signatures)) # Add all chain signatures to a group for parallel execution - task_group = group(c.s() for c in chains) + task_group = group(chains) # Wait for the group result - result = task_group().get() - print result + async_result = task_group.apply_async() + cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id)) + _ = async_result.get() + cls.logger.info("MDS migration finished") @staticmethod def get_vdisks_mds_masters_on_storagerouter(storagerouter): @@ -194,3 +241,13 @@ def get_vdisks_mds_masters_on_storagerouter(storagerouter): @staticmethod def get_persistent_client(): return PersistentFactory.get_client() + + @classmethod + def get_edge_sync_time(cls): + # type: () -> int + """ + Get the time required for all edge clients to do a complete sync + :return: Time for a complete edge sync + :rtype: int + """ + return 2 * cls.EDGE_SYNC_TIME From 108633fa2283a317160ae40626dc805c92f5b635 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Tue, 2 Jul 2019 17:55:48 +0200 Subject: [PATCH 6/7] Move ovs.lib.update to ovs.update to avoid conflicts with UpdateController Add alba to the volumedriver doc Update the ovs binary --- ovs/lib/update/__init__.py | 0 ovs/lib/update/single_update.md | 23 ------------------- ovs/update/__init__.py | 1 + ovs/update/alba/__init__.py | 15 ++++++++++++ .../alba/updater.py} | 6 ++--- ovs/update/single_update.md | 20 ++++++++++++++++ ovs/update/volumedriver/updater.py | 6 ++--- scripts/system/ovs | 11 +++++++-- 8 files changed, 50 insertions(+), 32 deletions(-) delete mode 100644 ovs/lib/update/__init__.py delete mode 100644 ovs/lib/update/single_update.md create mode 100644 ovs/update/alba/__init__.py rename ovs/{lib/update/albacomponent_update.py => update/alba/updater.py} (92%) diff --git a/ovs/lib/update/__init__.py b/ovs/lib/update/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/ovs/lib/update/single_update.md b/ovs/lib/update/single_update.md deleted file mode 100644 index 7cf3d07dd..000000000 --- a/ovs/lib/update/single_update.md +++ /dev/null @@ -1,23 +0,0 @@ -# Single update -The Framework now supports updating certain components on a single host at the time. -It provides simple CLI commands (`ovs local_update `) to update Arakoon, Alba and the volumedriver - - ## Alba update -The Volumedriver update can be started by running `ovs local_update alba`. -The local update will: -- Update the volumedriver binaries. Currently, only andes-updates-3 towards andes-updates-4 is supported. Further updates towards bighorn will need further testing, as alba will bump from 1.5.x towards 1.6.x. - -- Update the alba alternatives that were introduced. -`/usr/bin/alba` will from now on be a symlink towards a binary. -`/opt/alba/bin/alba`. Some plugin information, needed for correct functioning of service files is changed too. -- Update the arakoon alternatives that were introduced. `/usr/bin/arakoon` will now point to -`/opt/alba/bin/arakoon` -Note that given the nature of arakoon and its polling, stacktraces of nomaster exceptions may occure. These errors can be negated. This logging could not be suppressed, as essential other output could be suppressed with it. - - ### Exceptions -Certain exception can be thrown during the update. - - | Exit code | Exception name | What happened | -| --------- | ---------------| ------------- | -| 61 | NoMasterFoundException | Raise this error when no arakoon master can be found after a couple of ComponentUpdaterattempts | -| 62 | InvalidAlbaVersionException | Will be called if no valid alba version has been found and the update-alternatives call has failed with this alba version| \ No newline at end of file diff --git a/ovs/update/__init__.py b/ovs/update/__init__.py index cc1f36adb..26a40bab7 100644 --- a/ovs/update/__init__.py +++ b/ovs/update/__init__.py @@ -15,3 +15,4 @@ # but WITHOUT ANY WARRANTY of any kind. from .volumedriver.updater import VolumeDriverUpdater +from .alba.updater import AlbaComponentUpdater diff --git a/ovs/update/alba/__init__.py b/ovs/update/alba/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/update/alba/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/lib/update/albacomponent_update.py b/ovs/update/alba/updater.py similarity index 92% rename from ovs/lib/update/albacomponent_update.py rename to ovs/update/alba/updater.py index 8a63ea84a..8dbdef060 100644 --- a/ovs/lib/update/albacomponent_update.py +++ b/ovs/update/alba/updater.py @@ -14,21 +14,20 @@ # Open vStorage is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY of any kind. -from abc import abstractmethod from ovs.extensions.generic.system import System from ovs.extensions.storage.persistentfactory import PersistentFactory from ovs_extensions.update.alba_component_update import AlbaComponentUpdater as _AlbacomponentUpdater +from ovs.log.log_handler import LogHandler class AlbaComponentUpdater(_AlbacomponentUpdater): """ Implementation of abstract class to update alba """ + logger = LogHandler.get('update', 'volumedriver') @staticmethod - @abstractmethod def get_persistent_client(): - # type: () -> PyrakoonStore """ Retrieve a persistent client which needs Needs to be implemented by the callee @@ -37,7 +36,6 @@ def get_persistent_client(): @classmethod def get_node_id(cls): - # type: () -> str """ use a factory to provide the machine id :return: diff --git a/ovs/update/single_update.md b/ovs/update/single_update.md index 9f040f435..1cf323f61 100644 --- a/ovs/update/single_update.md +++ b/ovs/update/single_update.md @@ -19,3 +19,23 @@ Certain exception can be thrown during the update. | --------- | ---------------| ------------- | | 21 | FailedToMigrateException | Not all volumes can be migrated away to different nodes. No moves have started yet | | 22 | FailureDuringMigrateException | Some volumes could not be moved away during the migration process | + +## Alba update +The Volumedriver update can be started by running `ovs local_update alba`. +The local update will: +- Update the volumedriver binaries. Currently, only andes-updates-3 towards andes-updates-4 is supported. Further updates towards bighorn will need further testing, as alba will bump from 1.5.x towards 1.6.x. + +- Update the alba alternatives that were introduced. +`/usr/bin/alba` will from now on be a symlink towards a binary. +`/opt/alba/bin/alba`. Some plugin information, needed for correct functioning of service files is changed too. +- Update the arakoon alternatives that were introduced. `/usr/bin/arakoon` will now point to +`/opt/alba/bin/arakoon` +Note that given the nature of arakoon and its polling, stacktraces of nomaster exceptions may occure. These errors can be negated. This logging could not be suppressed, as essential other output could be suppressed with it. + +### Exceptions +Certain exception can be thrown during the update. + + | Exit code | Exception name | What happened | +| --------- | ---------------| ------------- | +| 61 | NoMasterFoundException | Raise this error when no arakoon master can be found after a couple of ComponentUpdaterattempts | +| 62 | InvalidAlbaVersionException | Will be called if no valid alba version has been found and the update-alternatives call has failed with this alba version| \ No newline at end of file diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index cc93d0f51..5e2ed44f9 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -24,7 +24,7 @@ from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.hybrids.storagerouter import StorageRouter from ovs.extensions.generic.system import System -from ovs_extensions.update.base import ComponentUpdater +from ovs_extensions.update.base import ComponentUpdater, UpdateException from ovs.lib.helpers.vdisk.rebalancer import VDiskRebalancer, VDiskBalance from ovs.lib.storagedriver import StorageDriverController from ovs.extensions.storage.persistentfactory import PersistentFactory @@ -35,14 +35,14 @@ from typing import List, Dict, Tuple -class FailedToMigrateException(EnvironmentError): +class FailedToMigrateException(UpdateException): """ Thrown when not all volumes would be able to move away """ exit_code = 21 -class FailureDuringMigrateException(EnvironmentError): +class FailureDuringMigrateException(UpdateException): """ Thrown when certain volumes failed to move away """ diff --git a/scripts/system/ovs b/scripts/system/ovs index 46cb4727a..c6db72e1e 100644 --- a/scripts/system/ovs +++ b/scripts/system/ovs @@ -69,6 +69,7 @@ function show_help { echo "" echo " * Run automated local update:" echo " - ovs local_update alba Execute a local update of alba and its dependencies" + echo " - ovs local_update volumedriver Execute a local update of the volumedriver" echo "" } @@ -277,10 +278,16 @@ if Configuration.exists('$3', raw=True): fi elif [ "$1" = "local_update" ] ; then if [ "$2" = "alba" ] ; then - python -c """ -from ovs.lib.update.albacomponent_update import AlbaComponentUpdater + python -c """ +from ovs.update import AlbaComponentUpdater from ovs.extensions.generic.system import System AlbaComponentUpdater.do_update(System.get_my_machine_id()) +""" + elif [ "$2" = "volumedriver" ] ; then + python -c """ +from ovs.update import VolumeDriverUpdater +from ovs.extensions.generic.system import System +VolumeDriverUpdater.do_update(System.get_my_machine_id()) """ else show_help From aced74208d51fe779b530b15968fc90f7b9ff918 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Wed, 3 Jul 2019 10:21:41 +0200 Subject: [PATCH 7/7] Add try except to catch the config key not existing Revert the edge sync time --- ovs/dal/hybrids/storagedriver.py | 8 +++++--- ovs/update/volumedriver/updater.py | 4 +--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ovs/dal/hybrids/storagedriver.py b/ovs/dal/hybrids/storagedriver.py index bb9373970..5c12a1e4a 100644 --- a/ovs/dal/hybrids/storagedriver.py +++ b/ovs/dal/hybrids/storagedriver.py @@ -25,7 +25,6 @@ from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.hybrids.vpool import VPool from ovs.dal.hybrids.storagerouter import StorageRouter -from ovs.extensions.generic.configuration import Configuration from ovs.extensions.storageserver.storagedriver import StorageDriverClient from ovs.log.log_handler import LogHandler @@ -179,7 +178,7 @@ def _cluster_node_config(self): """ Prepares a ClusterNodeConfig dict for the StorageDriver process """ - from ovs.extensions.generic.configuration import Configuration + from ovs.extensions.generic.configuration import Configuration, NotFoundException rdma = Configuration.get('/ovs/framework/rdma') distance_map = {} primary_domains = [] @@ -191,7 +190,10 @@ def _cluster_node_config(self): secondary_domains.append(junction.domain_guid) # @todo implement more race-conditions guarantees. Current guarantee is the single update invalidating the value # through cluster_registry_checkup - storagerouters_marked_for_update = Configuration.list(VPOOL_UPDATE_KEY) + try: + storagerouters_marked_for_update = list(Configuration.list(VPOOL_UPDATE_KEY)) + except NotFoundException: + storagerouters_marked_for_update = [] for sd in self.vpool.storagedrivers: if sd.guid == self.guid: continue diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py index 5e2ed44f9..b9910b6c5 100644 --- a/ovs/update/volumedriver/updater.py +++ b/ovs/update/volumedriver/updater.py @@ -61,9 +61,7 @@ class VolumeDriverUpdater(ComponentUpdater): # List with tuples. [(package_name, binary_name, binary_location, [service_prefix_0]] BINARIES = [(PACKAGES_EE, VOLUMEDRIVER_CMD_NAME, VOLUMEDRIVER_BIN_PATH, [STORAGEDRIVER_SERVICE_BASE])] # type: List[Tuple[List[str], str, str, List[str]]] LOCAL_SR = System.get_my_storagerouter() - # @todo revert - # EDGE_SYNC_TIME = 5 * 60 - EDGE_SYNC_TIME = 1 + EDGE_SYNC_TIME = 5 * 60 @classmethod def restart_services(cls):