From 13862cb02ad795a3d0c4a13ec4f1ab45a08aa629 Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Fri, 10 May 2019 08:34:12 +0200 Subject: [PATCH 1/3] Add concurrency to snapshot delete. Concurrency offloaded to celery --- ovs/lib/generic.py | 95 ++++++++++++++------ ovs/lib/tests/generic_tests/test_snapshot.py | 37 ++++++-- 2 files changed, 96 insertions(+), 36 deletions(-) diff --git a/ovs/lib/generic.py b/ovs/lib/generic.py index d91c1f5f7..66a09d1a7 100644 --- a/ovs/lib/generic.py +++ b/ovs/lib/generic.py @@ -20,12 +20,18 @@ import os import copy import time +from celery import group +from celery.utils import uuid +from celery.result import GroupResult from datetime import datetime, timedelta from threading import Thread from time import mktime from ovs.constants.vdisk import SCRUB_VDISK_EXCEPTION_MESSAGE from ovs.dal.hybrids.servicetype import ServiceType +from ovs.dal.hybrids.storagedriver import StorageDriver +from ovs.dal.hybrids.vdisk import VDisk from ovs.dal.lists.servicelist import ServiceList +from ovs.dal.lists.storagedriverlist import StorageDriverList from ovs.dal.lists.storagerouterlist import StorageRouterList from ovs.dal.lists.vdisklist import VDiskList from ovs.extensions.db.arakooninstaller import ArakoonClusterConfig @@ -74,8 +80,26 @@ def snapshot_all_vdisks(): @staticmethod @ovs_task(name='ovs.generic.delete_snapshots', schedule=Schedule(minute='1', hour='2'), ensure_single_info={'mode': 'DEFAULT'}) def delete_snapshots(timestamp=None): + # type: (float) -> GroupResult """ - Delete snapshots & scrubbing policy + Delete snapshots based on the retention policy + Offloads concurrency to celery + Returns a GroupResult. Waiting for the result can be done using result.get() + :param timestamp: Timestamp to determine whether snapshots should be kept or not, if none provided, current time will be used + :type timestamp: float + :return: The GroupResult + :rtype: GroupResult + """ + # The result cannot be fetched in this task + group_id = uuid() + return group(GenericController.delete_snapshots_storagedriver.s(storagedriver.guid, timestamp, group_id) + for storagedriver in StorageDriverList.get_storagedrivers()).apply_async(task_id=group_id) + + @staticmethod + @ovs_task(name='ovs.generic.delete_snapshots_storagedriver', ensure_single_info={'mode': 'DEFAULT'}) + def delete_snapshots_storagedriver(storagedriver_guid, timestamp=None, group_id=None): + """ + Delete snapshots per storagedriver & scrubbing policy Implemented delete snapshot policy: < 1d | 1d bucket | 1 | best of bucket | 1d @@ -83,12 +107,26 @@ def delete_snapshots(timestamp=None): < 1m | 1w bucket | 3 | oldest of bucket | 4w = 1m > 1m | delete + :param storagedriver_guid: Guid of the StorageDriver to remove snapshots on + :type storagedriver_guid: str :param timestamp: Timestamp to determine whether snapshots should be kept or not, if none provided, current time will be used :type timestamp: float - + :param group_id: ID of the group task. Used to identify which snapshot deletes were called during the scheduled task + :type group_id: str :return: None """ - GenericController._logger.info('Delete snapshots started') + if group_id: + log_id = 'Group job {} - '.format(group_id) + else: + log_id = '' + + def format_log(message): + return '{}{}'.format(log_id, message) + + GenericController._logger.info(format_log('Delete snapshots started for StorageDriver {0}'.format(storagedriver_guid))) + + storagedriver = StorageDriver(storagedriver_guid) + exceptions = [] day = timedelta(1) week = day * 7 @@ -128,27 +166,31 @@ def make_timestamp(offset): # Place all snapshots in bucket_chains bucket_chains = [] - for vdisk in VDiskList.get_vdisks(): - vdisk.invalidate_dynamics('being_scrubbed') - if vdisk.being_scrubbed: - continue + for vdisk_guid in storagedriver.vdisks_guids: + try: + vdisk = VDisk(vdisk_guid) + vdisk.invalidate_dynamics('being_scrubbed') + if vdisk.being_scrubbed: + continue - if vdisk.info['object_type'] in ['BASE']: - bucket_chain = copy.deepcopy(buckets) - for snapshot in vdisk.snapshots: - if snapshot.get('is_sticky') is True: - continue - if snapshot['guid'] in parent_snapshots: - GenericController._logger.info('Not deleting snapshot {0} because it has clones'.format(snapshot['guid'])) - continue - timestamp = int(snapshot['timestamp']) - for bucket in bucket_chain: - if bucket['start'] >= timestamp > bucket['end']: - bucket['snapshots'].append({'timestamp': timestamp, - 'snapshot_id': snapshot['guid'], - 'vdisk_guid': vdisk.guid, - 'is_consistent': snapshot['is_consistent']}) - bucket_chains.append(bucket_chain) + if vdisk.info['object_type'] in ['BASE']: + bucket_chain = copy.deepcopy(buckets) + for snapshot in vdisk.snapshots: + if snapshot.get('is_sticky') is True: + continue + if snapshot['guid'] in parent_snapshots: + GenericController._logger.info(format_log('Not deleting snapshot {0} because it has clones'.format(snapshot['guid']))) + continue + timestamp = int(snapshot['timestamp']) + for bucket in bucket_chain: + if bucket['start'] >= timestamp > bucket['end']: + bucket['snapshots'].append({'timestamp': timestamp, + 'snapshot_id': snapshot['guid'], + 'vdisk_guid': vdisk.guid, + 'is_consistent': snapshot['is_consistent']}) + bucket_chains.append(bucket_chain) + except Exception as ex: + exceptions.append(ex) # Clean out the snapshot bucket_chains, we delete the snapshots we want to keep # And we'll remove all snapshots that remain in the buckets @@ -181,7 +223,6 @@ def make_timestamp(offset): bucket['snapshots'] = [s for s in bucket['snapshots'] if s['timestamp'] != oldest['timestamp']] - exceptions = [] # Delete obsolete snapshots for bucket_chain in bucket_chains: # Each bucket chain represents one vdisk's snapshots @@ -196,13 +237,13 @@ def make_timestamp(offset): if vdisk_guid: vdisk_id_log = ' for VDisk with guid {}'.format(vdisk_guid) if SCRUB_VDISK_EXCEPTION_MESSAGE in ex.message: - GenericController._logger.warning('Being scrubbed exception occurred while deleting snapshots{}'.format(vdisk_id_log)) + GenericController._logger.warning(format_log('Being scrubbed exception occurred while deleting snapshots{}'.format(vdisk_id_log))) else: - GenericController._logger.exception('Exception occurred while deleting snapshots{}'.format(vdisk_id_log)) + GenericController._logger.exception(format_log('Exception occurred while deleting snapshots{}'.format(vdisk_id_log))) exceptions.append(ex) if exceptions: raise RuntimeError('Exceptions occurred while deleting snapshots: \n- {}'.format('\n- '.join((str(ex) for ex in exceptions)))) - GenericController._logger.info('Delete snapshots finished') + GenericController._logger.info(format_log('Delete snapshots finished for StorageDriver {0}')) @staticmethod @ovs_task(name='ovs.generic.execute_scrub', schedule=Schedule(minute='0', hour='3'), ensure_single_info={'mode': 'DEDUPED'}) diff --git a/ovs/lib/tests/generic_tests/test_snapshot.py b/ovs/lib/tests/generic_tests/test_snapshot.py index 97d795b3d..c5da3d186 100644 --- a/ovs/lib/tests/generic_tests/test_snapshot.py +++ b/ovs/lib/tests/generic_tests/test_snapshot.py @@ -95,6 +95,7 @@ def test_clone_snapshot(self): 'storagedrivers': [(1, 1, 1)]} # (, , ) ) vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] [dynamic for dynamic in vdisk_1._dynamics if dynamic.name == 'snapshots'][0].timeout = 0 base = datetime.datetime.now().date() @@ -124,7 +125,8 @@ def test_clone_snapshot(self): 'is_consistent': True, 'timestamp': str(timestamp)}) base_timestamp = self._make_timestamp(base, datetime.timedelta(1) * 2) - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) self.assertIn(base_snapshot_guid, vdisk_1.snapshot_ids, 'Snapshot was deleted while there are still clones of it') def test_snapshot_automatic_consistent(self): @@ -142,6 +144,7 @@ def test_snapshot_automatic_consistent(self): ) base = datetime.datetime.now().date() vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] label = 'c' # Extra time to add to the hourly timestamps @@ -161,7 +164,8 @@ def test_snapshot_automatic_consistent(self): self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d'))) self._print_message('- Deleting snapshots') - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) self._validate(vdisk=vdisk_1, current_day=day, @@ -195,6 +199,7 @@ def test_snapshot_automatic_not_consistent(self): ) base = datetime.datetime.now().date() vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] label = 'i' # Extra time to add to the hourly timestamps @@ -214,7 +219,8 @@ def test_snapshot_automatic_not_consistent(self): self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d'))) self._print_message('- Deleting snapshots') - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) self._validate(vdisk=vdisk_1, current_day=day, @@ -248,6 +254,7 @@ def test_snapshot_non_automatic_consistent(self): ) base = datetime.datetime.now().date() vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] label = 'c' # Extra time to add to the hourly timestamps @@ -267,7 +274,8 @@ def test_snapshot_non_automatic_consistent(self): self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d'))) self._print_message('- Deleting snapshots') - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) self._validate(vdisk=vdisk_1, current_day=day, @@ -301,6 +309,7 @@ def test_snapshot_not_automatic_not_consistent(self): ) base = datetime.datetime.now().date() vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] label = 'i' # Extra time to add to the hourly timestamps @@ -320,7 +329,8 @@ def test_snapshot_not_automatic_not_consistent(self): self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d'))) self._print_message('- Deleting snapshots') - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) self._validate(vdisk=vdisk_1, current_day=day, @@ -354,6 +364,7 @@ def test_snapshot_sticky(self): ) base = datetime.datetime.now().date() vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] label = 'c' # Extra time to add to the hourly timestamps @@ -373,7 +384,8 @@ def test_snapshot_sticky(self): self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d'))) self._print_message('- Deleting snapshots') - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) self._validate(vdisk=vdisk_1, current_day=day, @@ -405,6 +417,7 @@ def test_happypath(self): 'storagedrivers': [(1, 1, 1)]} # (, , ) ) vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] [dynamic for dynamic in vdisk_1._dynamics if dynamic.name == 'snapshots'][0].timeout = 0 # Run the testing scenario @@ -424,7 +437,8 @@ def test_happypath(self): # At the start of the day, delete snapshot policy runs at 00:30 self._print_message('- Deleting snapshots') - GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30)) + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid, + timestamp=base_timestamp + (minute * 30)) # Validate snapshots self._print_message('- Validating snapshots') @@ -468,6 +482,8 @@ def raise_an_exception(*args, **kwargs): ) vdisk_1, vdisk_2 = structure['vdisks'].values() + storagedriver_1 = structure['storagedrivers'][1] + vdisks = [vdisk_1, vdisk_2] for vdisk in vdisks: @@ -481,7 +497,7 @@ def raise_an_exception(*args, **kwargs): if vdisk == vdisk_1: StorageRouterClient.delete_snapshot_callbacks[vdisk.volume_id] = {snapshot_id: raise_an_exception} with self.assertRaises(RuntimeError): - GenericController.delete_snapshots() + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid) self.assertEqual(1, len(vdisk_2.snapshot_ids), 'One snapshot should be removed for vdisk 2') self.assertEqual(2, len(vdisk_1.snapshot_ids), 'No snapshots should be removed for vdisk 1') @@ -500,6 +516,8 @@ def raise_an_exception(*args, **kwargs): 'storagedrivers': [(1, 1, 1)]} # (, , ) ) vdisk_1 = structure['vdisks'][1] + storagedriver_1 = structure['storagedrivers'][1] + [dynamic for dynamic in vdisk_1._dynamics if dynamic.name == 'snapshots'][0].timeout = 0 for i in xrange(0, 2): @@ -510,7 +528,8 @@ def raise_an_exception(*args, **kwargs): snapshot_id = VDiskController.create_snapshot(vdisk_1.guid, metadata) StorageRouterClient.delete_snapshot_callbacks[vdisk_1.volume_id] = {snapshot_id: raise_an_exception} - GenericController.delete_snapshots() + GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid) + self.assertEqual(2, len(vdisk_1.snapshot_ids), 'No snapshots should be removed for vdisk 1') ################## # HELPER METHODS # From 27b6e85614294f147836a8577b9f28640a1cf6ba Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Fri, 10 May 2019 16:30:56 +0200 Subject: [PATCH 2/3] Set the task to deduped to avoid discarding --- ovs/lib/generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ovs/lib/generic.py b/ovs/lib/generic.py index 66a09d1a7..f86f42831 100644 --- a/ovs/lib/generic.py +++ b/ovs/lib/generic.py @@ -96,7 +96,7 @@ def delete_snapshots(timestamp=None): for storagedriver in StorageDriverList.get_storagedrivers()).apply_async(task_id=group_id) @staticmethod - @ovs_task(name='ovs.generic.delete_snapshots_storagedriver', ensure_single_info={'mode': 'DEFAULT'}) + @ovs_task(name='ovs.generic.delete_snapshots_storagedriver', ensure_single_info={'mode': 'DEDUPED'}) def delete_snapshots_storagedriver(storagedriver_guid, timestamp=None, group_id=None): """ Delete snapshots per storagedriver & scrubbing policy From 85885a646c716e150f2efa4375f9dbd077cd465d Mon Sep 17 00:00:00 2001 From: Jeffrey Devloo Date: Tue, 14 May 2019 11:39:31 +0200 Subject: [PATCH 3/3] Fix format string in validate_vdisk in safety ensurer --- ovs/lib/helpers/mds/safety.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ovs/lib/helpers/mds/safety.py b/ovs/lib/helpers/mds/safety.py index 31263cd95..5190aa648 100644 --- a/ovs/lib/helpers/mds/safety.py +++ b/ovs/lib/helpers/mds/safety.py @@ -96,7 +96,7 @@ def validate_vdisk(self): self.metadata_backend_config_start = self.vdisk.info['metadata_backend_config'] if self.vdisk.info['metadata_backend_config'] == {}: - raise RuntimeError('Configured MDS layout for vDisk {0} could not be retrieved}, cannot update MDS configuration'.format(self.vdisk.guid)) + raise RuntimeError('Configured MDS layout for vDisk {0} could not be retrieved, cannot update MDS configuration'.format(self.vdisk.guid)) def map_mds_services_by_socket(self): """