diff --git a/ovs/lib/helpers/decorators.py b/ovs/lib/helpers/decorators.py index b1e6e0707..19c26efb6 100644 --- a/ovs/lib/helpers/decorators.py +++ b/ovs/lib/helpers/decorators.py @@ -729,7 +729,7 @@ def lock_chained_mode(self, kwargs, timeout): # Update kwargs with args kwargs_dict = self._filter_ignorable_arguments(kwargs) - params_info = 'with params {0}'.format(kwargs_dict) if kwargs_dict else 'with default params' + params_info = 'with params {0} (all passed arguments: {1}'.format(kwargs_dict, kwargs) if kwargs_dict else 'with default params' task_log_format = 'task {0} {1}' task_log_name = task_log_format.format(self.ensure_single_container.task_name, params_info) task_log_id = task_log_format.format(self.task_id, params_info) @@ -801,7 +801,7 @@ def lock_deduped_mode(self, kwargs, timeout): self.validate_no_extra_names() kwargs_dict = self._filter_ignorable_arguments(kwargs) - params_info = 'with params {0}'.format(kwargs_dict) if kwargs_dict else 'with default params' + params_info = 'with params {0} (all passed arguments: {1}'.format(kwargs_dict, kwargs) if kwargs_dict else 'with default params' task_log_format = 'task {0} {1}' task_log_name = task_log_format.format(self.ensure_single_container.task_name, params_info) task_log_id = task_log_format.format(self.task_id, params_info) diff --git a/ovs/lib/mdsservice.py b/ovs/lib/mdsservice.py index 23dd13f7c..0e9583a20 100755 --- a/ovs/lib/mdsservice.py +++ b/ovs/lib/mdsservice.py @@ -29,6 +29,7 @@ from ovs.dal.hybrids.diskpartition import DiskPartition from ovs.dal.hybrids.storagerouter import StorageRouter from ovs.dal.hybrids.j_storagedriverpartition import StorageDriverPartition +from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.hybrids.vpool import VPool from ovs.dal.lists.storagerouterlist import StorageRouterList from ovs.dal.lists.vdisklist import VDiskList @@ -367,8 +368,32 @@ def mds_checkup(): # noinspection PyUnresolvedReferences @staticmethod - @ovs_task(name='ovs.mds.ensure_safety', ensure_single_info={'mode': 'CHAINED'}) - def ensure_safety(vdisk_guid, excluded_storagerouter_guids=None): + @ovs_task(name='ovs.mds.ensure_safety_vpool', ensure_single_info={'mode': 'DEDUPED', 'ignore_arguments': ['vdisk_guid', 'excluded_storagerouter_guids']}) + def _ensure_safety_vpool(vpool_guid, vdisk_guid, excluded_storagerouter_guids=None): + """ + Ensures safety for a single vdisk of a vpool + Allows multiple ensure safeties to run at the same time for different vpool + Used internally + :param vpool_guid: Guid of the VPool associated with the vDisk + :type vpool_guid: str + :param vdisk_guid: Guid of the vDisk to the safety off + :type vdisk_guid: str + :param excluded_storagerouter_guids: GUIDs of StorageRouters to leave out of calculation (Eg: When 1 is down or unavailable) + :type excluded_storagerouter_guids: list[str] + :return: None + :rtype: NoneType + """ + _ = vpool_guid + + if excluded_storagerouter_guids is None: + excluded_storagerouter_guids = [] + + safety_ensurer = SafetyEnsurer(vdisk_guid, excluded_storagerouter_guids) + safety_ensurer.ensure_safety() + + @staticmethod + @ovs_task(name='ovs.mds.ensure_safety') + def ensure_safety(vdisk_guid, excluded_storagerouter_guids=None, **kwargs): """ Ensures (or tries to ensure) the safety of a given vDisk. Assumptions: @@ -398,11 +423,8 @@ def ensure_safety(vdisk_guid, excluded_storagerouter_guids=None): :return: None :rtype: NoneType """ - if excluded_storagerouter_guids is None: - excluded_storagerouter_guids = [] - - safety_ensurer = SafetyEnsurer(vdisk_guid, excluded_storagerouter_guids) - safety_ensurer.ensure_safety() + vdisk = VDisk(vdisk_guid) + return MDSServiceController._ensure_safety_vpool(vdisk.vpool_guid, vdisk.guid, excluded_storagerouter_guids, **kwargs) @staticmethod def get_preferred_mds(storagerouter, vpool): diff --git a/ovs/lib/tests/mdsservice_tests/test_mdsservice.py b/ovs/lib/tests/mdsservice_tests/test_mdsservice.py index eb8e7f203..8b961a2f6 100644 --- a/ovs/lib/tests/mdsservice_tests/test_mdsservice.py +++ b/ovs/lib/tests/mdsservice_tests/test_mdsservice.py @@ -29,6 +29,7 @@ from ovs.extensions.generic.configuration import Configuration from ovs.extensions.storageserver.storagedriver import MetadataServerClient, StorageDriverConfiguration from ovs.extensions.storageserver.tests.mockups import MDSClient, StorageRouterClient, LocalStorageRouterClient +from ovs.lib.helpers.exceptions import EnsureSingleTimeoutReached from ovs.lib.mdsservice import MDSServiceController from ovs.log.log_handler import LogHandler @@ -1523,7 +1524,7 @@ def test_checkup_exclusivity_single_checkup_running(self): vpool_1, vpool_2 = vpools runtime_hooks = {'before_execution': lambda: execution_event.wait(), - 'after_validation': lambda : validation_event.set()} + 'after_validation': lambda: validation_event.set()} # Running in a thread to simulate a direct invocation single_vpool_task = Thread(target=MDSServiceController.mds_checkup_single, args=(vpool_1.guid,), kwargs={'ensure_single_runtime_hooks': runtime_hooks}) @@ -1644,3 +1645,108 @@ def test_ensure_safety_excluded_storagerouters(self): else: MDSServiceController.ensure_safety(vdisk_guid=vdisks[vdisk_id].guid, excluded_storagerouter_guids=[storagerouters[5].guid]) self._check_reality(configs=configs, loads=loads, vdisks=vdisks, mds_services=mds_services) + + def test_ensure_safety_concurrency_different_vpool(self): + """ + Test if the concurrency works + """ + validation_event = Event() + execution_events = [Event(), Event()] + + def wait_for_execute(event_to_set): + # type: (Event) -> None + event_to_set.set() + validation_event.wait() + + structure = DalHelper.build_dal_structure( + {'vpools': [1, 2], + 'storagerouters': [1, 2], + 'storagedrivers': [(1, 1, 1), (2, 2, 1)], # (, , ) + 'mds_services': [(1, 1), (2, 2)]} # (, ) + ) + mds_services = structure['mds_services'] + + vdisks = {} + for mds_service in mds_services.itervalues(): + vdisks.update(DalHelper.create_vdisks_for_mds_service(amount=1, start_id=len(vdisks) + 1, mds_service=mds_service)) + + self.assertTrue(len(vdisks) == 2, '2 VDisks should be created') + vdisk_1, vdisk_2 = vdisks.values() + event_1, event_2 = execution_events + + kwargs_runtime_hooks_1 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_1)}, + 'ensure_single_timeout': 5} + kwargs_runtime_hooks_2 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_2)}, + 'ensure_single_timeout': 5} + + # Both vdisks should be able to be processed + thread_1 = Thread(target=MDSServiceController.ensure_safety, args=(vdisk_1.guid,), kwargs=kwargs_runtime_hooks_1) + thread_1.start() + threads = [thread_1] + + event_1.wait(5) + + thread_2 = Thread(target=MDSServiceController.ensure_safety, args=(vdisk_2.guid,), kwargs=kwargs_runtime_hooks_2) + thread_2.start() + + event_2.wait(5) + + for event in execution_events: + self.assertTrue(event.is_set(), 'Both events should be set. No locking/discarding should occur') + + validation_event.set() + for thread in threads: + thread.join() + + def test_ensure_safety_concurrency_same_vpool(self): + """ + Test if concurrency doesn't happen + """ + validation_event = Event() + execution_events = [Event(), Event()] + + def wait_for_execute(event_to_set): + # type: (Event) -> None + event_to_set.set() + validation_event.wait() + + structure = DalHelper.build_dal_structure( + {'vpools': [1], + 'storagerouters': [1], + 'storagedrivers': [(1, 1, 1)], # (, , ) + 'mds_services': [(1, 1)]} # (, ) + ) + mds_services = structure['mds_services'] + + vdisks = {} + for mds_service in mds_services.itervalues(): + vdisks.update(DalHelper.create_vdisks_for_mds_service(amount=2, start_id=len(vdisks) + 1, mds_service=mds_service)) + + self.assertTrue(len(vdisks) == 2, '2 VDisks should be created') + vdisk_1, vdisk_2 = vdisks.values() + event_1, event_2 = execution_events + + kwargs_runtime_hooks_1 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_1)}, + 'ensure_single_timeout': 5} + kwargs_runtime_hooks_2 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_2)}, + 'ensure_single_timeout': 0.5} + + # Both vdisks should be able to be processed + thread_1 = Thread(target=MDSServiceController.ensure_safety, args=(vdisk_1.guid,), kwargs=kwargs_runtime_hooks_1) + thread_1.start() + threads = [thread_1] + + event_1.wait(5) + + with self.assertRaises(EnsureSingleTimeoutReached) as context: + MDSServiceController.ensure_safety(vdisk_2.guid, **kwargs_runtime_hooks_2) + + validation_event.set() + for index, event in enumerate(execution_events): + if index == 0: + self.assertTrue(event.is_set(), 'Only one event should be set. Locking/discarding should occur') + else: + self.assertFalse(event.is_set(), 'Only one event should be set. Locking/discarding should occur') + + for thread in threads: + thread.join()