diff --git a/ovs/constants/vpool.py b/ovs/constants/vpool.py new file mode 100644 index 000000000..4459dd014 --- /dev/null +++ b/ovs/constants/vpool.py @@ -0,0 +1,27 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +import os + +VPOOL_UPDATE_KEY = os.path.join(os.path.sep, 'ovs', 'volumedriver', 'update', 'storagerouter') + +STORAGEDRIVER_SERVICE_BASE = 'ovs-volumedriver' + +PACKAGES_OSE = ['volumedriver-no-dedup-base', 'volumedriver-no-dedup-server'] +PACKAGES_EE = ['volumedriver-ee-base', 'volumedriver-ee-server'] + +VOLUMEDRIVER_BIN_PATH = os.path.join(os.path.sep, 'usr', 'bin', 'volumedriver_fs.sh') +VOLUMEDRIVER_CMD_NAME = 'volumedriver_fs' diff --git a/ovs/dal/hybrids/storagedriver.py b/ovs/dal/hybrids/storagedriver.py index 9f623eca0..5c12a1e4a 100644 --- a/ovs/dal/hybrids/storagedriver.py +++ b/ovs/dal/hybrids/storagedriver.py @@ -19,6 +19,7 @@ """ import time +from ovs.constants.vpool import VPOOL_UPDATE_KEY from ovs.dal.dataobject import DataObject from ovs.dal.structures import Property, Relation, Dynamic from ovs.dal.hybrids.vdisk import VDisk @@ -177,7 +178,7 @@ def _cluster_node_config(self): """ Prepares a ClusterNodeConfig dict for the StorageDriver process """ - from ovs.extensions.generic.configuration import Configuration + from ovs.extensions.generic.configuration import Configuration, NotFoundException rdma = Configuration.get('/ovs/framework/rdma') distance_map = {} primary_domains = [] @@ -187,10 +188,18 @@ def _cluster_node_config(self): primary_domains.append(junction.domain_guid) else: secondary_domains.append(junction.domain_guid) + # @todo implement more race-conditions guarantees. Current guarantee is the single update invalidating the value + # through cluster_registry_checkup + try: + storagerouters_marked_for_update = list(Configuration.list(VPOOL_UPDATE_KEY)) + except NotFoundException: + storagerouters_marked_for_update = [] for sd in self.vpool.storagedrivers: if sd.guid == self.guid: continue - if len(primary_domains) == 0: + if sd.storagerouter_guid in storagerouters_marked_for_update: + distance_map[str(sd.storagedriver_id)] = StorageDriver.DISTANCES.FAR + elif len(primary_domains) == 0: distance_map[str(sd.storagedriver_id)] = StorageDriver.DISTANCES.NEAR else: distance = StorageDriver.DISTANCES.INFINITE diff --git a/ovs/lib/helpers/vdisk/__init__.py b/ovs/lib/helpers/vdisk/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/lib/helpers/vdisk/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/lib/helpers/vdisk/rebalancer.py b/ovs/lib/helpers/vdisk/rebalancer.py new file mode 100644 index 000000000..fa1da875c --- /dev/null +++ b/ovs/lib/helpers/vdisk/rebalancer.py @@ -0,0 +1,574 @@ +# Copyright (C) 2018 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +""" +Rebalances volumes across nodes +Taken from the support-tools +""" + +from __future__ import division + +import pprint +import itertools +from math import ceil +from ovs.dal.hybrids.storagerouter import StorageRouter +from ovs.dal.hybrids.vpool import VPool +from ovs.dal.hybrids.storagedriver import StorageDriver +from ovs.dal.hybrids.vdisk import VDisk +from ovs_extensions.log.logger import Logger + +# noinspection PyUnreachableCode +if False: + from typing import List, Dict, Tuple, Optional + + +class VDiskRebalancer(object): + + _volume_potentials = {} + logger = Logger('vdisk_rebalance') + + @classmethod + def print_balances(cls, balances): + # type: (List[VDiskBalance]) -> None + """ + Prints out balances + :return: None + :rtype: NoneType + """ + balances_by_vpool = {} + for balance in balances: # type: VDiskBalance + vpool = balance.storagedriver.vpool + if vpool not in balances_by_vpool: + balances_by_vpool[vpool] = [] + balances_by_vpool[vpool].append(balance) + for vpool, vpool_balances in balances_by_vpool.viewitems(): + print('Balance for VPool {0}'.format(vpool.name)) + for balance in vpool_balances: # type: VDiskBalance + storagerouter = balance.storagedriver.storagerouter + print(' Storagerouter {0}, vdisks now: {1}, vdisks afterwards {2}, added {3}'.format(storagerouter.name, len(balance.hosted_guids), len(balance.balance), len(balance.added))) + if balance.added: + added_source_overview = {} + for vdisk_guid in balance.added: + current_storagerouter = StorageRouter(VDisk(vdisk_guid).storagerouter_guid) + if current_storagerouter not in added_source_overview: + added_source_overview[current_storagerouter] = [] + added_source_overview[current_storagerouter].append(vdisk_guid) + print(' Vdisks added from:') + for current_storagerouter, moved_vdisk_guids in added_source_overview.iteritems(): + print(' StorageRouter {0}: {1}'.format(current_storagerouter.name, len(moved_vdisk_guids))) + + @classmethod + def get_rebalanced_layout(cls, vpool_guid, excluded_storagerouters=None, ignore_domains=False, evacuate_storagerouters=None, base_on_volume_potential=True): + # type: (str, Optional[List[str]], Optional[bool], Optional[List[str]], Optional[bool]) -> List[VDiskBalance] + """ + Retrieve the layout of how to optimal spread would look like + :param evacuate_storagerouters: Migrate all vdisks from this hosts + :type evacuate_storagerouters: List[str] + :param vpool_guid: Guid of the VPool to rebalance + :type vpool_guid: str + :param excluded_storagerouters: Guids of StorageRouters to avoid + :type excluded_storagerouters: List[str] + :param ignore_domains: Ignore the domains (rebalance across everything) + :type ignore_domains: bool + :param base_on_volume_potential: Base the movement of the volume potential instead of a linear distribution + :type base_on_volume_potential: bool + :return: List of balances + :rtype: List[VDiskBalance] + """ + if evacuate_storagerouters is None: + evacuate_storagerouters = [] + if excluded_storagerouters is None: + excluded_storagerouters = [] + + vpool = VPool(vpool_guid) + if ignore_domains: + return cls._get_rebalances_layout(vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential) + return cls._get_rebalanced_layout_by_domain(vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential) + + @classmethod + def get_volume_potentials(cls, storagedrivers, cache=True): + potentials = {} + for storagedriver in storagedrivers: + if cache: + potential = cls._volume_potentials.get(storagedriver, -1) + if potential == -1: + potential = storagedriver.vpool.storagedriver_client.volume_potential(str(storagedriver.storagedriver_id)) + cls._volume_potentials[storagedriver] = potential + else: + potential = storagedriver.vpool.storagedriver_client.volume_potential(str(storagedriver.storagedriver_id)) + potentials[storagedriver] = potential + return potentials + + @classmethod + def _get_rebalances_layout(cls, vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential): + # type: (VPool, List[str], List[str], bool) -> List[VDiskBalance] + """ + Rebalance volumes and stay without domains + :param vpool: VPool to rebalance + :type vpool: VPool + :param excluded_storagerouters: Guids of StorageRouters to avoid + :type excluded_storagerouters: List[str] + :param evacuate_storagerouters: Migrate all vdisks from this hosts + :type evacuate_storagerouters: List[str] + :param base_on_volume_potential: Base the limit calculation of the volume potential ratio + :type base_on_volume_potential: bool + :return: List of balances + :rtype: List[VDiskBalance] + """ + storagerouters_to_avoid = set(itertools.chain(excluded_storagerouters, evacuate_storagerouters)) + destination_storagedrivers = [std for std in vpool.storagedrivers if std.storagerouter_guid not in storagerouters_to_avoid] + destination_storagedrivers_by_ip = dict((storagedriver.storagerouter.ip, storagedriver) for storagedriver in destination_storagedrivers) + + volume_potentials = {} + if base_on_volume_potential: + volume_potentials = cls.get_volume_potentials(destination_storagedrivers) + total_potential = sum(p for p in volume_potentials.itervalues()) + vdisks_within_destination_storagedrivers = list( + itertools.chain(*(sd.vdisks_guids for sd in destination_storagedrivers))) + volume_total_capacity = total_potential + len(vdisks_within_destination_storagedrivers) + + # Default limit. Simple distribution + storagedriver_vdisk_limit = int(ceil(len(vpool.vdisks_guids) / len(destination_storagedrivers))) + balances = {} + overflow = [] + for storagedriver in vpool.storagedrivers: + if base_on_volume_potential: + # Use the ratio between volume potential max and current to distribute + volume_potential = volume_potentials[storagedriver] + storagedriver_vdisk_limit = int(ceil(len(vpool.vdisks_guids) * (volume_potential + len(storagedriver.vdisks_guids)) / volume_total_capacity)) + + limit = 0 if storagedriver.storagerouter_guid in evacuate_storagerouters else storagedriver_vdisk_limit + balance = VDiskBalance(storagedriver, limit) + overflow.extend(balance.overflow) + balances[storagedriver] = balance + # Attempt to move to current mds hosts + for vdisk_guid in overflow: + vdisk = VDisk(vdisk_guid) + # If only set was ordered :D + preferred_destinations = [destination_storagedrivers_by_ip[mds_entry['ip']] for mds_entry in vdisk.info['metadata_backend_config'] if mds_entry['ip'] in destination_storagedrivers_by_ip] + # Try to fill in these storagedriver first + destinations = preferred_destinations + [storagedriver for storagedriver in destination_storagedrivers if storagedriver not in preferred_destinations] + added = False + for storagedriver in destinations: + balance = balances[storagedriver] + added = cls.add_to_balance(vdisk_guid, balance) + if added: + try: + index = preferred_destinations.index(storagedriver) + mds_type = 'master' if index == 0 else 'slave' + cls.logger.info('Appointing {0} to {1} (index {2})'.format(vdisk_guid, mds_type, index)) + except ValueError: + # Index query didn't find the storagedriver + cls.logger.info('Appointing to non-mds host') + break + if not added: + raise NotImplementedError('Vdisk couldnt be added to any destination. Might be faulty implementation here') + return balances.values() + + @classmethod + def _get_rebalanced_layout_by_domain(cls, vpool, excluded_storagerouters, evacuate_storagerouters, base_on_volume_potential): + # type: (VPool, List[str], List[str], bool) -> List[VDiskBalance] + """ + Rebalance volumes and stay within the primary domain + :param vpool: VPool to rebalance + :type vpool: VPool + :param excluded_storagerouters: Guids of StorageRouters to avoid + :type excluded_storagerouters: List[str] + :param evacuate_storagerouters: Migrate all vdisks from this hosts + :type evacuate_storagerouters: List[str] + :param base_on_volume_potential: Base the limit calculation of the volume potential ratio + :type base_on_volume_potential: bool + :return: List of balances + :rtype: List[VDiskBalance] + """ + # Calculate balance cap for every storagedriver + # Every storagedriver can share disks between other storagedriver within the same primary domain + # Certain storagedrivers add their disks to the pool but can't take disks themselves + balances = {} + storagedriver_limits = {} + storagedriver_domain_relation = {} + for storagedriver in vpool.storagedrivers: + cls.logger.info('Calculating the limit for {} in VPool {}'.format(storagedriver.storagerouter.name, vpool.name)) + # Create the disk pool for the current storagedriver in the domain + storagedrivers_in_domain = cls.get_storagedrivers_in_same_primary_domain_as_storagedriver(storagedriver, excluded_storagerouters) + cls.logger.info('{} shares primary domains with {}'.format(storagedriver.storagerouter.name, ', '.join(d.storagerouter.name for d in storagedrivers_in_domain))) + storagedriver_domain_relation[storagedriver] = storagedrivers_in_domain + vdisks_within_domain = [] + for storagedriver_in_domain in storagedrivers_in_domain: + vdisks_within_domain.extend(storagedriver_in_domain.vdisks_guids) + cls.logger.info('VDisks within the primary domain of {}: {}'.format(storagedriver.storagerouter.name, len(vdisks_within_domain))) + # Think about the disk distribution + if storagedriver.storagerouter_guid in evacuate_storagerouters: + limit = 0 + else: + # Remove the evacuations from the limit + usable_storagedrivers_in_domain = [std for std in storagedrivers_in_domain if std.storagerouter_guid not in evacuate_storagerouters] + cls.logger.info('Can move volumes to {} within the primary domain storagedrivers'.format(', '.join(d.storagerouter.name for d in usable_storagedrivers_in_domain))) + if base_on_volume_potential: + volume_potentials = cls.get_volume_potentials(usable_storagedrivers_in_domain) + total_potential = sum(p for p in volume_potentials.itervalues()) + volume_potentials_sr = dict((storagedriver.storagerouter.name, potential) for storagedriver, potential in volume_potentials.iteritems()) + cls.logger.info('Volume potential overview: {}. Total potential: {}'.format(pprint.pformat(volume_potentials_sr), total_potential)) + # len should be adjusted with evacuates + vdisks_within_domain_usable = list(itertools.chain(*(sd.vdisks_guids for sd in usable_storagedrivers_in_domain))) + volume_total_capacity = total_potential + len(vdisks_within_domain_usable) + if len(vdisks_within_domain) > volume_total_capacity: + cls.logger.error('The total capacity with the usuable storagedrivers in the domain is not large enough. vdisks_within_domain {0} > volume_total_capacity {1}' + .format(len(vdisks_within_domain), volume_total_capacity)) + raise RuntimeError('Migration with given params is not possible. Too many vdisks for the usuable storagedrivers within the domain .') + cls.logger.info('Total capacity within this domain subset is {}'.format(volume_total_capacity)) + # Use the ratio between volume potential max and current to distribute + volume_potential = volume_potentials[storagedriver] + volume_ratio = (volume_potential + len(storagedriver.vdisks_guids)) / volume_total_capacity + cls.logger.info('{} can take {}% of the volumes'.format(storagedriver.storagerouter.name, volume_ratio * 100)) + limit = int(ceil(len(vdisks_within_domain) * volume_ratio)) + else: + limit = int(ceil(len(vdisks_within_domain) / len(usable_storagedrivers_in_domain))) + cls.logger.info('Limit imposed for {}: {}'.format(storagedriver.storagerouter.name, limit)) + storagedriver_limits[storagedriver] = limit + + for storagedriver in vpool.storagedrivers: + balance = VDiskBalance(storagedriver, storagedriver_limits[storagedriver]) + balances[storagedriver] = balance + cls.logger.info('Balance overview {}'.format(balance)) + + for storagedriver in vpool.storagedrivers: + storagedrivers_in_domain = [std for std in storagedriver_domain_relation[storagedriver] if std != storagedriver] + storagedrivers_in_domain_by_ip = dict((storagedriver.storagerouter.ip, storagedriver) for storagedriver in storagedrivers_in_domain) + balance = balances[storagedriver] + cls.logger.info('Migrating {} vdisks from {} of VPool {}. Limit: {}, hosting {}'.format(len(balance.overflow), storagedriver.storagerouter.name, vpool.name, + balance.limit, len(balance.hosted_guids))) + for vdisk_guid in balance.overflow: + vdisk = VDisk(vdisk_guid) + preferred_destinations = [storagedrivers_in_domain_by_ip[mds_entry['ip']] for mds_entry in vdisk.info['metadata_backend_config'] + if mds_entry['ip'] in storagedrivers_in_domain_by_ip] + # Try to fill in these storagedriver first + destinations = preferred_destinations + [storagedriver for storagedriver in storagedrivers_in_domain if storagedriver not in preferred_destinations] + cls.logger.info('Destination overview for migrations: {}'.format(', '.join(d.storagerouter.name for d in destinations))) + added = False + while not added and destinations: + destination = destinations.pop() + balance = balances[destination] + added = cls.add_to_balance(vdisk_guid, balance) + if added: + cls.logger.info('Added vdisk {} to {}'.format(vdisk_guid, destination.storagerouter.name)) + if destination.storagedriver_id == vdisk.storagedriver_id: + raise RuntimeError('Moving to current host ERROR') + try: + index = preferred_destinations.index(destination) + mds_type = 'master' if index == 0 else 'slave' + cls.logger.info('Appointing {0} to {1} (index {2})'.format(vdisk_guid, mds_type, index)) + except ValueError: + # Index query didn't find the storagedriver + cls.logger.info('Appointing to non-mds host') + else: + cls.logger.info('Did not add vdisks to {}. Its limit: {}, currently hosting {}'.format(destination.storagerouter.name, balance.limit, len(balance.balance))) + if not added: + raise NotImplementedError('Vdisk couldnt be added to any destination. Might be faulty implementation here') + return balances.values() + + @classmethod + def get_storagedrivers_in_same_primary_domain_as_storagedriver(cls, storagedriver, excluded_storagerouters=None): + # type: (StorageDriver, Optional[List[str]]) -> List[StorageDriver] + """ + Retrieve all storagedrivers within the same primary domain as the given storagedriver + :param storagedriver: StorageDriver to check other domain relations for + :param excluded_storagerouters: Storagerouters that are excluded for the search + :type excluded_storagerouters: Optional[List[str]] + :return: List of storagedrivers + :rtype: List[StorageDriver] + """ + if excluded_storagerouters is None: + excluded_storagerouters = [] + primary_domains = cls.get_primary_domain_guids_storagedriver(storagedriver) + if not primary_domains: + return list(storagedriver.vpool.storagedrivers) + return [std for std in storagedriver.vpool.storagedrivers + if std.storagerouter_guid not in excluded_storagerouters + and any(domain_guid in primary_domains for domain_guid in cls.get_primary_domain_guids_storagedriver(std))] + + @staticmethod + def get_primary_domain_guids_storagedriver(storagedriver): + # type: (StorageDriver) -> List[str] + """ + Retrieve all primary domains of the StorageDriver + :param storagedriver: Storagedriver to get domains from + :type storagedriver: StorageDriver + :return: List of primary domain guids + :rtype: List[str] + """ + primary_domains = [] + storagerouter = storagedriver.storagerouter + for junction in storagerouter.domains: + if not junction.backup: + primary_domains.append(junction.domain_guid) + return primary_domains + + @classmethod + def add_to_balance(cls, vdisk_guid, balance): + # type: (str, VDiskBalance) -> bool + """ + Try to add a vdisk to a balance + :param vdisk_guid: Guid to add + :param balance: Balance to add guid to + :return: True if vdisk was added, else False + :rtype: bool + """ + added, overflowed = balance.fill([vdisk_guid]) + return vdisk_guid in added + + +class VDiskBalance(object): + + logger = Logger('vdisk_balance') + + def __init__(self, storagedriver, vdisk_limit): + # type: (StorageDriver, int) -> None + """ + Represents the vdisk balance of a storagedriver + :param storagedriver: StorageDriver to balance for + :type storagedriver: StorageDriver + :param vdisk_limit: Maximum amount of vdisks to host. -1 means no limit + :type vdisk_limit: int + """ + self.storagedriver = storagedriver + self.hosted_guids = storagedriver.vdisks_guids + self.limit = vdisk_limit + + self.balance, self.overflow = self.impose_limit() + self.added = [] + + def __add__(self, other): + if not isinstance(other, VDiskBalance) or self.storagedriver != other.storagedriver: + raise ValueError('Different objects cannot be added') + limit = self.limit + other.limit + self.set_limit(limit) + self.added += other.added + + def set_limit(self, limit): + """ + Set a new limit + :param limit: Limit to set + :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host + :rtype: Tuple(List[str], List[str]) + """ + self.limit = limit + self.balance, self.overflow = self.impose_limit() + return self.balance, self.overflow + + def impose_limit(self): + # type: () -> Tuple[List[str], List[str]] + """ + Impose the set limit. Returns the max amount of vdisks that can be hosted and the vdisks that need to go + :return: The guids of vdisks that can fit and the guids that cannot fit in on the current host + :rtype: Tuple(List[str], List[str]) + """ + if self.limit == -1: + return self.hosted_guids, [] + overflow = self.hosted_guids[self.limit:] + balance = self.hosted_guids[:self.limit] + return balance, overflow + + def fill(self, vdisk_guids): + # type: (List[str]) -> Tuple[List[str], List[str]] + """ + Fill this balance until the limit is reached + :param vdisk_guids: Guids to add + :type vdisk_guids: List[str] + :return: The guids that could be added to this balanced and the guids that couldn't be added + :rtype: Tuple[List[str], List[str]] + """ + amount_to_add = self.limit - len(self.balance) + added = [] + overflow = vdisk_guids + if amount_to_add: + added = vdisk_guids[:amount_to_add] + overflow = vdisk_guids[amount_to_add:] + self.balance.extend(added) + self.added.extend(added) + return added, overflow + + def generate_overview(self): + # type: () -> dict + """ + Generate the move overview depending on the current state + :return: The overview from where the disks are coming from + :rtype: dict + """ + added_source_overview = {} + for vdisk_guid in self.added: + storagedriver_id = VDisk(vdisk_guid).storagedriver_id + if storagedriver_id not in added_source_overview: + added_source_overview[storagedriver_id] = [] + added_source_overview[storagedriver_id].append(vdisk_guid) + overview = {'added': self.added, + 'balance': self.balance, + 'overflow': self.overflow, + 'add_source_overview': added_source_overview} + return overview + + def execute_balance_change(self, force=False, user_input=False, abort_on_error=False): + # type: (Optional[bool], Optional[bool], Optional[bool]) -> Tuple[List[str], List[str]] + """ + Execute the necessary steps to balance out + :param force: Indicates whether to force the migration or not (forcing can lead to data loss) + :type force: bool + :param user_input: require user input to proceed to next vDisk + :type user_input: bool + :param abort_on_error: Abort script when error occurs during migration + :type abort_on_error: bool + :return: List with all successful moves, list with all failed moves + :rtype: NoneType + """ + failed_moves = [] + successful_moves = [] + vdisk_guid = None + try: + for vdisk_guid in self.added: + try: + self._execute_move(vdisk_guid, self.storagedriver, force, user_input) + successful_moves.append(vdisk_guid) + except: + self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, self.storagedriver.storagerouter_guid)) + if abort_on_error: + raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) + failed_moves.append(vdisk_guid) + except KeyboardInterrupt: + interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) + self.logger.warning(interrupt_msg) + if user_input: + if successful_moves: + print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) + if failed_moves: + print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) + raise + + return successful_moves, failed_moves + + def execute_balance_change_through_overflow(self, balances, force=False, user_input=False, abort_on_error=False): + # type: (List[VDiskBalance], bool, bool, bool) -> Tuple[List[str], List[str]] + """ + Execute the necessary steps to balance out. Starts from the overflow to move all vdisks from the container away first + Other balances must be passed on to see where they'd have to move to + :param balances: Other balances to work with. Used to find the owner of this balance its overflow + :type balances: List[VDiskBalance] + :param force: Indicates whether to force the migration or not (forcing can lead to data loss) + :type force: bool + :param user_input: require user input to proceed to next vDisk + :type user_input: bool + :param abort_on_error: Abort script when error occurs during migration + :type abort_on_error: bool + :return: List with all successful moves, list with all failed moves + :rtype: NoneType + """ + failed_moves = [] + successful_moves = [] + vdisk_guid = None + try: + vdisk_balance_map = self.map_vdisk_to_destination(balances) + for vdisk_guid in self.overflow: + add_balance = vdisk_balance_map[vdisk_guid] + destination_std = add_balance.storagedriver + try: + self._execute_move(vdisk_guid, destination_std, force, user_input) + successful_moves.append(vdisk_guid) + except: + self.logger.exception('Unable to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter_guid)) + if abort_on_error: + raise RuntimeError("Something went wrong during moving VDisk {0} to {1}".format(vdisk_guid, self.storagedriver.storagerouter_guid)) + failed_moves.append(vdisk_guid) + except KeyboardInterrupt: + interrupt_msg = 'You have interrupted while moving vdisks. The last move (vDisk {0}) might be in an inconsistent state.'.format(vdisk_guid) + self.logger.warning(interrupt_msg) + if user_input: + if successful_moves: + print('Succesfully moved vDisks: \n {0}'.format(', '.join(successful_moves))) + if failed_moves: + print('\nFailed to move vDisks:\n {0}'.format(', '.join(failed_moves))) + raise + + return successful_moves, failed_moves + + def _execute_move(self, vdisk_guid, destination_std, force, interactive, minimum_potential=1): + """ + Perform a move + :param vdisk_guid: VDisk to move + :param destination_std: Destination to move to + :param force: Use force when moving + :param interactive: Prompt for user input before moving + :return: None + """ + vd = VDisk(vdisk_guid) + current_sr = StorageRouter(vd.storagerouter_guid).name + next_sr = destination_std.storagerouter.name + if vd.storagerouter_guid == destination_std.storagerouter_guid: + # Ownership changed in meantime + self.logger.info('No longer need to move VDisk {0} to {1}'.format(vdisk_guid, destination_std.storagerouter.name)) + return + rebalance_message = 'Rebalancing vPool by moving vDisk {0} from {1} to {2}'.format(vdisk_guid, current_sr, next_sr) + if interactive: + retry = True + while retry: + proceed = raw_input('{0}. Continue? (press Enter)'.format(rebalance_message)) + if proceed == '': # Mock 'Enter' key + retry = False + try: + volume_potential = destination_std.vpool.storagedriver_client.volume_potential(str(destination_std.storagedriver_id)) + except: + self.logger.exception('Unable to retrieve volume potential. Aborting') + raise + if volume_potential > minimum_potential: + self.logger.info(rebalance_message) + try: + vd.storagedriver_client.migrate(str(vd.volume_id), str(destination_std.name), False) + except RuntimeError: + # When a RunTimeError occurs. Try restarting the volume locally for safety measures. + self.logger.warning('Encountered RunTimeError. Checking if vdisk({0}) is not running and restarting it.'.format(vd.guid)) + vd.invalidate_dynamics('info') + if vd.info['live_status'] != vd.STATUSES.RUNNING: + vd.storagedriver_client.restart_object(str(vd.volume_id), False) + # Now check if the migration succeeded and if the volume is running on the correct storagedriver. + if vd.storagedriver_id == destination_std.name: + self.logger.info('Vdisk({0}) got restarted and runs on destination storagedriver. Previous error can be ignored.'.format(vd.guid)) + else: + self.logger.warning('Vdisk({0}) got restarted but doesn\'t run on destination storagedriver.'.format(vd.guid)) + + else: + raise ValueError('Volume potential is lower than {0}. Not moving anymore!'.format(minimum_potential)) + + @staticmethod + def map_vdisk_to_destination(balances): + # type: (List[VDiskBalance]) -> Dict[str, VDiskBalance] + """ + Map all vdisks to destinations of balances + :param balances: Balances to map for + :return: guid - balance map + """ + vdisk_balance_map = {} + for balance in balances: # type: VDiskBalance + for vdisk_guid in balance.added: + if vdisk_guid in vdisk_balance_map: + raise RuntimeError('Vdisk {} has multiple destinations'.format(vdisk_guid)) + vdisk_balance_map[vdisk_guid] = balance + return vdisk_balance_map + + def __str__(self): + return 'StorageRouter {} of VPool {}: hosting prior to changes: {}, imposed limit {}, hosting after changes: {}'\ + .format(self.storagedriver.storagerouter.name, + self.storagedriver.vpool.name, + len(self.hosted_guids), + self.limit, + len(self.balance)) diff --git a/ovs/lib/update/__init__.py b/ovs/lib/update/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/ovs/update/__init__.py b/ovs/update/__init__.py new file mode 100644 index 000000000..26a40bab7 --- /dev/null +++ b/ovs/update/__init__.py @@ -0,0 +1,18 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +from .volumedriver.updater import VolumeDriverUpdater +from .alba.updater import AlbaComponentUpdater diff --git a/ovs/update/alba/__init__.py b/ovs/update/alba/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/update/alba/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/lib/update/albacomponent_update.py b/ovs/update/alba/updater.py similarity index 92% rename from ovs/lib/update/albacomponent_update.py rename to ovs/update/alba/updater.py index 8a63ea84a..8dbdef060 100644 --- a/ovs/lib/update/albacomponent_update.py +++ b/ovs/update/alba/updater.py @@ -14,21 +14,20 @@ # Open vStorage is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY of any kind. -from abc import abstractmethod from ovs.extensions.generic.system import System from ovs.extensions.storage.persistentfactory import PersistentFactory from ovs_extensions.update.alba_component_update import AlbaComponentUpdater as _AlbacomponentUpdater +from ovs.log.log_handler import LogHandler class AlbaComponentUpdater(_AlbacomponentUpdater): """ Implementation of abstract class to update alba """ + logger = LogHandler.get('update', 'volumedriver') @staticmethod - @abstractmethod def get_persistent_client(): - # type: () -> PyrakoonStore """ Retrieve a persistent client which needs Needs to be implemented by the callee @@ -37,7 +36,6 @@ def get_persistent_client(): @classmethod def get_node_id(cls): - # type: () -> str """ use a factory to provide the machine id :return: diff --git a/ovs/lib/update/single_update.md b/ovs/update/single_update.md similarity index 63% rename from ovs/lib/update/single_update.md rename to ovs/update/single_update.md index 7cf3d07dd..1cf323f61 100644 --- a/ovs/lib/update/single_update.md +++ b/ovs/update/single_update.md @@ -2,7 +2,25 @@ The Framework now supports updating certain components on a single host at the time. It provides simple CLI commands (`ovs local_update `) to update Arakoon, Alba and the volumedriver - ## Alba update +## Volumedriver update +The Volumedriver update can be started by running `ovs local_update volumedriver`. +The local update will: +- Update the volumedriver binaries +- Update the node distance map to avoid HA-ing to the node being upgraded +- Move away all volumes from the node to other nodes +- Migrate away all MDS master instances running on the node +- Restart the volumedriver services +- Update the node distance map to accept HA back onto the node + +### Exceptions +Certain exception can be thrown during the update. + +| Exit code | Exception name | What happened | +| --------- | ---------------| ------------- | +| 21 | FailedToMigrateException | Not all volumes can be migrated away to different nodes. No moves have started yet | +| 22 | FailureDuringMigrateException | Some volumes could not be moved away during the migration process | + +## Alba update The Volumedriver update can be started by running `ovs local_update alba`. The local update will: - Update the volumedriver binaries. Currently, only andes-updates-3 towards andes-updates-4 is supported. Further updates towards bighorn will need further testing, as alba will bump from 1.5.x towards 1.6.x. @@ -14,7 +32,7 @@ The local update will: `/opt/alba/bin/arakoon` Note that given the nature of arakoon and its polling, stacktraces of nomaster exceptions may occure. These errors can be negated. This logging could not be suppressed, as essential other output could be suppressed with it. - ### Exceptions +### Exceptions Certain exception can be thrown during the update. | Exit code | Exception name | What happened | diff --git a/ovs/update/volumedriver/__init__.py b/ovs/update/volumedriver/__init__.py new file mode 100644 index 000000000..f3ecd2bfd --- /dev/null +++ b/ovs/update/volumedriver/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. diff --git a/ovs/update/volumedriver/updater.py b/ovs/update/volumedriver/updater.py new file mode 100644 index 000000000..b9910b6c5 --- /dev/null +++ b/ovs/update/volumedriver/updater.py @@ -0,0 +1,251 @@ +# Copyright (C) 2019 iNuron NV +# +# This file is part of Open vStorage Open Source Edition (OSE), +# as available from +# +# http://www.openvstorage.org and +# http://www.openvstorage.com. +# +# This file is free software; you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License v3 (GNU AGPLv3) +# as published by the Free Software Foundation, in version 3 as it comes +# in the LICENSE.txt file of the Open vStorage OSE distribution. +# +# Open vStorage is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY of any kind. + +import os +import time +import itertools +from celery import chain, group +from ovs.constants.vpool import VPOOL_UPDATE_KEY, STORAGEDRIVER_SERVICE_BASE, VOLUMEDRIVER_BIN_PATH, VOLUMEDRIVER_CMD_NAME, PACKAGES_EE +from ovs.extensions.generic.configuration import Configuration +from ovs.dal.hybrids.vpool import VPool +from ovs.dal.hybrids.vdisk import VDisk +from ovs.dal.hybrids.storagerouter import StorageRouter +from ovs.extensions.generic.system import System +from ovs_extensions.update.base import ComponentUpdater, UpdateException +from ovs.lib.helpers.vdisk.rebalancer import VDiskRebalancer, VDiskBalance +from ovs.lib.storagedriver import StorageDriverController +from ovs.extensions.storage.persistentfactory import PersistentFactory +from ovs.lib.mdsservice import MDSServiceController +from ovs.log.log_handler import LogHandler +# noinspection PyUnreachableCode +if False: + from typing import List, Dict, Tuple + + +class FailedToMigrateException(UpdateException): + """ + Thrown when not all volumes would be able to move away + """ + exit_code = 21 + + +class FailureDuringMigrateException(UpdateException): + """ + Thrown when certain volumes failed to move away + """ + exit_code = 22 + + +class VolumeDriverUpdater(ComponentUpdater): + + """ + Responsible for updating the volumedriver of a single node + """ + + logger = LogHandler.get('update', 'volumedriver') + + COMPONENT = 'volumedriver' + # List with tuples. [(package_name, binary_name, binary_location, [service_prefix_0]] + BINARIES = [(PACKAGES_EE, VOLUMEDRIVER_CMD_NAME, VOLUMEDRIVER_BIN_PATH, [STORAGEDRIVER_SERVICE_BASE])] # type: List[Tuple[List[str], str, str, List[str]]] + LOCAL_SR = System.get_my_storagerouter() + EDGE_SYNC_TIME = 5 * 60 + + @classmethod + def restart_services(cls): + """ + Override the service restart. The volumedrivers should be prepared for shutdown + """ + cls.logger.info("Restarting all related services") + # Get the migration plans for every volume on this host. If there are no plans for certain volumes, it will raise + balances_by_vpool = cls.get_vpool_balances_for_evacuating_storagerouter(cls.LOCAL_SR) + # Plan to execute migrate. Avoid the VPool from being an HA target + cls.mark_storagerouter_unreachable_for_ha(cls.LOCAL_SR) + try: + # @todo currency? + for vpool, balances in balances_by_vpool.iteritems(): + cls.migrate_away(balances, cls.LOCAL_SR) + cls.migrate_master_mds(cls.LOCAL_SR) + all_prefixes = tuple(itertools.chain.from_iterable(b[3] for b in cls.BINARIES)) + return cls.restart_services_by_prefixes(all_prefixes) + finally: + cls.mark_storagerouter_reachable_for_ha(cls.LOCAL_SR) + + @staticmethod + def get_vpool_balances_for_evacuating_storagerouter(storagerouter): + # type: (StorageRouter) -> Dict[VPool, List[VDiskBalance]] + """ + Retrieve the balances for every vpool on the local machine + :param storagerouter: Storagerouter to migrate away from + :type storagerouter: StorageRouter + :return: Dict with vpool and balances + :rtype: Dict[VPool, VDiskBalance] + :raises FailedToMigrateException if not all vdisks would be able to move out + """ + errors = [] + evacuate_srs = [storagerouter.guid] + balances_by_vpool = {} + for storagedriver in storagerouter.storagedrivers: + vpool = storagedriver.vpool + try: + balances = VDiskRebalancer.get_rebalanced_layout(storagedriver.vpool_guid, + ignore_domains=False, + excluded_storagerouters=None, + evacuate_storagerouters=evacuate_srs, + base_on_volume_potential=True) + balances_sorted = sorted(balances, key=lambda b: b.storagedriver.storagerouter_guid in evacuate_srs, + reverse=True) + balances_by_vpool[vpool] = balances_sorted + except Exception as ex: + errors.append((vpool, ex)) + if errors: + formatted_errors = '\n - {0}'.format('\n - '.join('VPool {0}: {1}'.format(vpool.name, error) for vpool, error in errors)) + raise FailedToMigrateException('Unable to migrate all volumes away from this machine: {}'.format(formatted_errors)) + return balances_by_vpool + + @classmethod + def mark_storagerouter_unreachable_for_ha(cls, storagerouter): + """ + Update the node distance maps to + Current code paths that update the node distance map on the volumedriver side are: + - Update of domains + - Update of vpool layout (extend/shrink) + - cluster registry checkup (ran periodically) + :return: None + :rtype: NoneType + """ + cls.logger.info("Marking Storagerouter {} as unavailable for HA".format(storagerouter.name)) + # Set the value used in the storagedriver cluster node config path + # This holds for all mentioned paths in the docstrings + Configuration.set(os.path.join(VPOOL_UPDATE_KEY, storagerouter.guid), 0) + # Trigger a complete reload of node distance maps + StorageDriverController.cluster_registry_checkup() + # Wait a few moment for the edge to catch up all the configs + sleep_time = cls.get_edge_sync_time() + cls.logger.info("Waiting {} to sync up all edge clients".format(sleep_time)) + time.sleep(sleep_time) + + @classmethod + def mark_storagerouter_reachable_for_ha(cls, storagerouter): + # type: (StorageRouter) -> None + """ + Update the node distance map to add the storagerouter back into the HA pool + :param storagerouter: Storagerouter to put back into the distance map + :type storagerouter: StorageRouter + :return: None + """ + cls.logger.info("Marking Storagerouter {} as available for HA".format(storagerouter.name)) + Configuration.delete(os.path.join(VPOOL_UPDATE_KEY, storagerouter.guid)) + # Trigger a complete reload of node distance maps + StorageDriverController.cluster_registry_checkup() + # Wait a few moment for the edge to catch up all the configs + sleep_time = cls.get_edge_sync_time() + cls.logger.info("Waiting {} to sync up all edge clients".format(sleep_time)) + time.sleep(sleep_time) + + @staticmethod + def migrate_away(balances, storagerouter): + # type: (List[VDiskBalance], StorageRouter) -> None + """ + Migrate all volumes away + :param balances: List of vdisk balances to execute + :type balances: List[VDiskBalance] + :param storagerouter: Storagerouter to move away from + :type storagerouter: StorageRouter + :return: None + :raises: FailureDuringMigrateException if any volumes failed to move + """ + evacuate_srs = [storagerouter.guid] + for balance in balances: # type: VDiskBalance + if balance.storagedriver.storagerouter_guid in evacuate_srs: + successfull_moves, failed_moves = balance.execute_balance_change_through_overflow(balances, + user_input=False, + abort_on_error=False) + if failed_moves: + raise FailureDuringMigrateException('Could not move volumes {} away'.format(', '.join(failed_moves))) + + @classmethod + def migrate_master_mds(cls, storagerouter, max_chain_size=100): + """ + Migrate away all master mds from the given storagerouter + :param storagerouter: Storagerouter to migrate away from + :type storagerouter: StorageRouter + :param max_chain_size: Maximum number of tasks within a chain. Set because https://github.com/celery/celery/issues/1078 + :type max_chain_size: int + :return: None + :rtype: NoneType + """ + cls.logger.info("Starting MDS migrations") + while True: + vpool_mds_master_vdisks = cls.get_vdisks_mds_masters_on_storagerouter(storagerouter) + all_masters_gone = sum(len(vds) for vds in vpool_mds_master_vdisks.values()) == 0 + if all_masters_gone: + break + chains = [] + for vpool_guid, vdisk_guids in vpool_mds_master_vdisks.iteritems(): + signatures = [] + tasks = [] + for vdisk_guid in vdisk_guids[0:max_chain_size]: + cls.logger.info('Ensuring safety for {}'.format(vdisk_guid)) + signature = MDSServiceController.ensure_safety.si(vdisk_guid) + # Freeze freezes the task into its final form. This will net the async result object we'd normally get from delaying it + tasks.append(signature.freeze()) + signatures.append(signature) + if signatures: + cls.logger.info('Adding chain for VPool {} with tasks {}'.format(vpool_guid, ', '.join(t.id for t in tasks))) + chains.append(chain(signatures)) + # Add all chain signatures to a group for parallel execution + task_group = group(chains) + # Wait for the group result + async_result = task_group.apply_async() + cls.logger.info('Waiting for all tasks of group {}'.format(async_result.id)) + _ = async_result.get() + cls.logger.info("MDS migration finished") + + @staticmethod + def get_vdisks_mds_masters_on_storagerouter(storagerouter): + # type: (StorageRouter) -> Dict[VPool, List[VDisk]] + """ + Retrieve all vdisks with the MDS master on the given storagerouter + :param storagerouter: Storagerouter to list MDS masters on + :type storagerouter: StorageRouter + :return: Dict with VPool as key and vdisks with the MDS master on the storagerouter as value + :rtype: Dict[VPool, List[VDisk] + """ + mds_masters = {} + vpools = set(sd.vpool for sd in storagerouter.storagedrivers) + for vpool in sorted(vpools, key=lambda k: k.name): + masters = [] + for mds_service in sorted(vpool.mds_services, key=lambda k: k.number): + if mds_service.service.storagerouter_guid == storagerouter.guid: + for junction in mds_service.vdisks: + if junction.is_master: + masters.append(junction.vdisk_guid) + mds_masters[vpool.name] = masters + return mds_masters + + @staticmethod + def get_persistent_client(): + return PersistentFactory.get_client() + + @classmethod + def get_edge_sync_time(cls): + # type: () -> int + """ + Get the time required for all edge clients to do a complete sync + :return: Time for a complete edge sync + :rtype: int + """ + return 2 * cls.EDGE_SYNC_TIME diff --git a/scripts/system/ovs b/scripts/system/ovs index 46cb4727a..c6db72e1e 100644 --- a/scripts/system/ovs +++ b/scripts/system/ovs @@ -69,6 +69,7 @@ function show_help { echo "" echo " * Run automated local update:" echo " - ovs local_update alba Execute a local update of alba and its dependencies" + echo " - ovs local_update volumedriver Execute a local update of the volumedriver" echo "" } @@ -277,10 +278,16 @@ if Configuration.exists('$3', raw=True): fi elif [ "$1" = "local_update" ] ; then if [ "$2" = "alba" ] ; then - python -c """ -from ovs.lib.update.albacomponent_update import AlbaComponentUpdater + python -c """ +from ovs.update import AlbaComponentUpdater from ovs.extensions.generic.system import System AlbaComponentUpdater.do_update(System.get_my_machine_id()) +""" + elif [ "$2" = "volumedriver" ] ; then + python -c """ +from ovs.update import VolumeDriverUpdater +from ovs.extensions.generic.system import System +VolumeDriverUpdater.do_update(System.get_my_machine_id()) """ else show_help