Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ovs/lib/helpers/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def lock_chained_mode(self, kwargs, timeout):

# Update kwargs with args
kwargs_dict = self._filter_ignorable_arguments(kwargs)
params_info = 'with params {0}'.format(kwargs_dict) if kwargs_dict else 'with default params'
params_info = 'with params {0} (all passed arguments: {1}'.format(kwargs_dict, kwargs) if kwargs_dict else 'with default params'
task_log_format = 'task {0} {1}'
task_log_name = task_log_format.format(self.ensure_single_container.task_name, params_info)
task_log_id = task_log_format.format(self.task_id, params_info)
Expand Down Expand Up @@ -801,7 +801,7 @@ def lock_deduped_mode(self, kwargs, timeout):
self.validate_no_extra_names()

kwargs_dict = self._filter_ignorable_arguments(kwargs)
params_info = 'with params {0}'.format(kwargs_dict) if kwargs_dict else 'with default params'
params_info = 'with params {0} (all passed arguments: {1}'.format(kwargs_dict, kwargs) if kwargs_dict else 'with default params'
task_log_format = 'task {0} {1}'
task_log_name = task_log_format.format(self.ensure_single_container.task_name, params_info)
task_log_id = task_log_format.format(self.task_id, params_info)
Expand Down
36 changes: 29 additions & 7 deletions ovs/lib/mdsservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ovs.dal.hybrids.diskpartition import DiskPartition
from ovs.dal.hybrids.storagerouter import StorageRouter
from ovs.dal.hybrids.j_storagedriverpartition import StorageDriverPartition
from ovs.dal.hybrids.vdisk import VDisk
from ovs.dal.hybrids.vpool import VPool
from ovs.dal.lists.storagerouterlist import StorageRouterList
from ovs.dal.lists.vdisklist import VDiskList
Expand Down Expand Up @@ -367,8 +368,32 @@ def mds_checkup():

# noinspection PyUnresolvedReferences
@staticmethod
@ovs_task(name='ovs.mds.ensure_safety', ensure_single_info={'mode': 'CHAINED'})
def ensure_safety(vdisk_guid, excluded_storagerouter_guids=None):
@ovs_task(name='ovs.mds.ensure_safety_vpool', ensure_single_info={'mode': 'DEDUPED', 'ignore_arguments': ['vdisk_guid', 'excluded_storagerouter_guids']})
def _ensure_safety_vpool(vpool_guid, vdisk_guid, excluded_storagerouter_guids=None):
"""
Ensures safety for a single vdisk of a vpool
Allows multiple ensure safeties to run at the same time for different vpool
Used internally
:param vpool_guid: Guid of the VPool associated with the vDisk
:type vpool_guid: str
:param vdisk_guid: Guid of the vDisk to the safety off
:type vdisk_guid: str
:param excluded_storagerouter_guids: GUIDs of StorageRouters to leave out of calculation (Eg: When 1 is down or unavailable)
:type excluded_storagerouter_guids: list[str]
:return: None
:rtype: NoneType
"""
_ = vpool_guid

if excluded_storagerouter_guids is None:
excluded_storagerouter_guids = []

safety_ensurer = SafetyEnsurer(vdisk_guid, excluded_storagerouter_guids)
safety_ensurer.ensure_safety()

@staticmethod
@ovs_task(name='ovs.mds.ensure_safety')
def ensure_safety(vdisk_guid, excluded_storagerouter_guids=None, **kwargs):
"""
Ensures (or tries to ensure) the safety of a given vDisk.
Assumptions:
Expand Down Expand Up @@ -398,11 +423,8 @@ def ensure_safety(vdisk_guid, excluded_storagerouter_guids=None):
:return: None
:rtype: NoneType
"""
if excluded_storagerouter_guids is None:
excluded_storagerouter_guids = []

safety_ensurer = SafetyEnsurer(vdisk_guid, excluded_storagerouter_guids)
safety_ensurer.ensure_safety()
vdisk = VDisk(vdisk_guid)
return MDSServiceController._ensure_safety_vpool(vdisk.vpool_guid, vdisk.guid, excluded_storagerouter_guids, **kwargs)

@staticmethod
def get_preferred_mds(storagerouter, vpool):
Expand Down
108 changes: 107 additions & 1 deletion ovs/lib/tests/mdsservice_tests/test_mdsservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ovs.extensions.generic.configuration import Configuration
from ovs.extensions.storageserver.storagedriver import MetadataServerClient, StorageDriverConfiguration
from ovs.extensions.storageserver.tests.mockups import MDSClient, StorageRouterClient, LocalStorageRouterClient
from ovs.lib.helpers.exceptions import EnsureSingleTimeoutReached
from ovs.lib.mdsservice import MDSServiceController
from ovs.log.log_handler import LogHandler

Expand Down Expand Up @@ -1523,7 +1524,7 @@ def test_checkup_exclusivity_single_checkup_running(self):

vpool_1, vpool_2 = vpools
runtime_hooks = {'before_execution': lambda: execution_event.wait(),
'after_validation': lambda : validation_event.set()}
'after_validation': lambda: validation_event.set()}

# Running in a thread to simulate a direct invocation
single_vpool_task = Thread(target=MDSServiceController.mds_checkup_single, args=(vpool_1.guid,), kwargs={'ensure_single_runtime_hooks': runtime_hooks})
Expand Down Expand Up @@ -1644,3 +1645,108 @@ def test_ensure_safety_excluded_storagerouters(self):
else:
MDSServiceController.ensure_safety(vdisk_guid=vdisks[vdisk_id].guid, excluded_storagerouter_guids=[storagerouters[5].guid])
self._check_reality(configs=configs, loads=loads, vdisks=vdisks, mds_services=mds_services)

def test_ensure_safety_concurrency_different_vpool(self):
"""
Test if the concurrency works
"""
validation_event = Event()
execution_events = [Event(), Event()]

def wait_for_execute(event_to_set):
# type: (Event) -> None
event_to_set.set()
validation_event.wait()

structure = DalHelper.build_dal_structure(
{'vpools': [1, 2],
'storagerouters': [1, 2],
'storagedrivers': [(1, 1, 1), (2, 2, 1)], # (<id>, <vpool_id>, <storagerouter_id>)
'mds_services': [(1, 1), (2, 2)]} # (<id>, <storagedriver_id>)
)
mds_services = structure['mds_services']

vdisks = {}
for mds_service in mds_services.itervalues():
vdisks.update(DalHelper.create_vdisks_for_mds_service(amount=1, start_id=len(vdisks) + 1, mds_service=mds_service))

self.assertTrue(len(vdisks) == 2, '2 VDisks should be created')
vdisk_1, vdisk_2 = vdisks.values()
event_1, event_2 = execution_events

kwargs_runtime_hooks_1 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_1)},
'ensure_single_timeout': 5}
kwargs_runtime_hooks_2 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_2)},
'ensure_single_timeout': 5}

# Both vdisks should be able to be processed
thread_1 = Thread(target=MDSServiceController.ensure_safety, args=(vdisk_1.guid,), kwargs=kwargs_runtime_hooks_1)
thread_1.start()
threads = [thread_1]

event_1.wait(5)

thread_2 = Thread(target=MDSServiceController.ensure_safety, args=(vdisk_2.guid,), kwargs=kwargs_runtime_hooks_2)
thread_2.start()

event_2.wait(5)

for event in execution_events:
self.assertTrue(event.is_set(), 'Both events should be set. No locking/discarding should occur')

validation_event.set()
for thread in threads:
thread.join()

def test_ensure_safety_concurrency_same_vpool(self):
"""
Test if concurrency doesn't happen
"""
validation_event = Event()
execution_events = [Event(), Event()]

def wait_for_execute(event_to_set):
# type: (Event) -> None
event_to_set.set()
validation_event.wait()

structure = DalHelper.build_dal_structure(
{'vpools': [1],
'storagerouters': [1],
'storagedrivers': [(1, 1, 1)], # (<id>, <vpool_id>, <storagerouter_id>)
'mds_services': [(1, 1)]} # (<id>, <storagedriver_id>)
)
mds_services = structure['mds_services']

vdisks = {}
for mds_service in mds_services.itervalues():
vdisks.update(DalHelper.create_vdisks_for_mds_service(amount=2, start_id=len(vdisks) + 1, mds_service=mds_service))

self.assertTrue(len(vdisks) == 2, '2 VDisks should be created')
vdisk_1, vdisk_2 = vdisks.values()
event_1, event_2 = execution_events

kwargs_runtime_hooks_1 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_1)},
'ensure_single_timeout': 5}
kwargs_runtime_hooks_2 = {'ensure_single_runtime_hooks': {'before_execution': lambda: wait_for_execute(event_2)},
'ensure_single_timeout': 0.5}

# Both vdisks should be able to be processed
thread_1 = Thread(target=MDSServiceController.ensure_safety, args=(vdisk_1.guid,), kwargs=kwargs_runtime_hooks_1)
thread_1.start()
threads = [thread_1]

event_1.wait(5)

with self.assertRaises(EnsureSingleTimeoutReached) as context:
MDSServiceController.ensure_safety(vdisk_2.guid, **kwargs_runtime_hooks_2)

validation_event.set()
for index, event in enumerate(execution_events):
if index == 0:
self.assertTrue(event.is_set(), 'Only one event should be set. Locking/discarding should occur')
else:
self.assertFalse(event.is_set(), 'Only one event should be set. Locking/discarding should occur')

for thread in threads:
thread.join()