Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions ovs/lib/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import os
import copy
import time
from celery import group
from celery.utils import uuid
from celery.result import GroupResult
from datetime import datetime, timedelta
from threading import Thread
from time import mktime
from ovs.constants.vdisk import SCRUB_VDISK_EXCEPTION_MESSAGE
from ovs.dal.hybrids.servicetype import ServiceType
from ovs.dal.hybrids.storagedriver import StorageDriver
from ovs.dal.hybrids.vdisk import VDisk
from ovs.dal.lists.servicelist import ServiceList
from ovs.dal.lists.storagedriverlist import StorageDriverList
from ovs.dal.lists.storagerouterlist import StorageRouterList
from ovs.dal.lists.vdisklist import VDiskList
from ovs.extensions.db.arakooninstaller import ArakoonClusterConfig
Expand Down Expand Up @@ -74,21 +80,53 @@ def snapshot_all_vdisks():
@staticmethod
@ovs_task(name='ovs.generic.delete_snapshots', schedule=Schedule(minute='1', hour='2'), ensure_single_info={'mode': 'DEFAULT'})
def delete_snapshots(timestamp=None):
# type: (float) -> GroupResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional[float]

"""
Delete snapshots & scrubbing policy
Delete snapshots based on the retention policy
Offloads concurrency to celery
Returns a GroupResult. Waiting for the result can be done using result.get()
:param timestamp: Timestamp to determine whether snapshots should be kept or not, if none provided, current time will be used
:type timestamp: float
:return: The GroupResult
:rtype: GroupResult
"""
# The result cannot be fetched in this task
group_id = uuid()
return group(GenericController.delete_snapshots_storagedriver.s(storagedriver.guid, timestamp, group_id)
for storagedriver in StorageDriverList.get_storagedrivers()).apply_async(task_id=group_id)

@staticmethod
@ovs_task(name='ovs.generic.delete_snapshots_storagedriver', ensure_single_info={'mode': 'DEDUPED'})
def delete_snapshots_storagedriver(storagedriver_guid, timestamp=None, group_id=None):
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typing

Delete snapshots per storagedriver & scrubbing policy

Implemented delete snapshot policy:
< 1d | 1d bucket | 1 | best of bucket | 1d
< 1w | 1d bucket | 6 | oldest of bucket | 7d = 1w
< 1m | 1w bucket | 3 | oldest of bucket | 4w = 1m
> 1m | delete

:param storagedriver_guid: Guid of the StorageDriver to remove snapshots on
:type storagedriver_guid: str
:param timestamp: Timestamp to determine whether snapshots should be kept or not, if none provided, current time will be used
:type timestamp: float

:param group_id: ID of the group task. Used to identify which snapshot deletes were called during the scheduled task
:type group_id: str
:return: None
"""
GenericController._logger.info('Delete snapshots started')
if group_id:
log_id = 'Group job {} - '.format(group_id)
else:
log_id = ''

def format_log(message):
return '{}{}'.format(log_id, message)

GenericController._logger.info(format_log('Delete snapshots started for StorageDriver {0}'.format(storagedriver_guid)))

storagedriver = StorageDriver(storagedriver_guid)
exceptions = []

day = timedelta(1)
week = day * 7
Expand Down Expand Up @@ -128,27 +166,31 @@ def make_timestamp(offset):

# Place all snapshots in bucket_chains
bucket_chains = []
for vdisk in VDiskList.get_vdisks():
vdisk.invalidate_dynamics('being_scrubbed')
if vdisk.being_scrubbed:
continue
for vdisk_guid in storagedriver.vdisks_guids:
try:
vdisk = VDisk(vdisk_guid)
vdisk.invalidate_dynamics('being_scrubbed')
if vdisk.being_scrubbed:
continue

if vdisk.info['object_type'] in ['BASE']:
bucket_chain = copy.deepcopy(buckets)
for snapshot in vdisk.snapshots:
if snapshot.get('is_sticky') is True:
continue
if snapshot['guid'] in parent_snapshots:
GenericController._logger.info('Not deleting snapshot {0} because it has clones'.format(snapshot['guid']))
continue
timestamp = int(snapshot['timestamp'])
for bucket in bucket_chain:
if bucket['start'] >= timestamp > bucket['end']:
bucket['snapshots'].append({'timestamp': timestamp,
'snapshot_id': snapshot['guid'],
'vdisk_guid': vdisk.guid,
'is_consistent': snapshot['is_consistent']})
bucket_chains.append(bucket_chain)
if vdisk.info['object_type'] in ['BASE']:
bucket_chain = copy.deepcopy(buckets)
for snapshot in vdisk.snapshots:
if snapshot.get('is_sticky') is True:
continue
if snapshot['guid'] in parent_snapshots:
GenericController._logger.info(format_log('Not deleting snapshot {0} because it has clones'.format(snapshot['guid'])))
continue
timestamp = int(snapshot['timestamp'])
for bucket in bucket_chain:
if bucket['start'] >= timestamp > bucket['end']:
bucket['snapshots'].append({'timestamp': timestamp,
'snapshot_id': snapshot['guid'],
'vdisk_guid': vdisk.guid,
'is_consistent': snapshot['is_consistent']})
bucket_chains.append(bucket_chain)
except Exception as ex:
exceptions.append(ex)

# Clean out the snapshot bucket_chains, we delete the snapshots we want to keep
# And we'll remove all snapshots that remain in the buckets
Expand Down Expand Up @@ -181,7 +223,6 @@ def make_timestamp(offset):
bucket['snapshots'] = [s for s in bucket['snapshots'] if
s['timestamp'] != oldest['timestamp']]

exceptions = []
# Delete obsolete snapshots
for bucket_chain in bucket_chains:
# Each bucket chain represents one vdisk's snapshots
Expand All @@ -196,13 +237,13 @@ def make_timestamp(offset):
if vdisk_guid:
vdisk_id_log = ' for VDisk with guid {}'.format(vdisk_guid)
if SCRUB_VDISK_EXCEPTION_MESSAGE in ex.message:
GenericController._logger.warning('Being scrubbed exception occurred while deleting snapshots{}'.format(vdisk_id_log))
GenericController._logger.warning(format_log('Being scrubbed exception occurred while deleting snapshots{}'.format(vdisk_id_log)))
else:
GenericController._logger.exception('Exception occurred while deleting snapshots{}'.format(vdisk_id_log))
GenericController._logger.exception(format_log('Exception occurred while deleting snapshots{}'.format(vdisk_id_log)))
exceptions.append(ex)
if exceptions:
raise RuntimeError('Exceptions occurred while deleting snapshots: \n- {}'.format('\n- '.join((str(ex) for ex in exceptions))))
GenericController._logger.info('Delete snapshots finished')
GenericController._logger.info(format_log('Delete snapshots finished for StorageDriver {0}'))

@staticmethod
@ovs_task(name='ovs.generic.execute_scrub', schedule=Schedule(minute='0', hour='3'), ensure_single_info={'mode': 'DEDUPED'})
Expand Down
2 changes: 1 addition & 1 deletion ovs/lib/helpers/mds/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def validate_vdisk(self):

self.metadata_backend_config_start = self.vdisk.info['metadata_backend_config']
if self.vdisk.info['metadata_backend_config'] == {}:
raise RuntimeError('Configured MDS layout for vDisk {0} could not be retrieved}, cannot update MDS configuration'.format(self.vdisk.guid))
raise RuntimeError('Configured MDS layout for vDisk {0} could not be retrieved, cannot update MDS configuration'.format(self.vdisk.guid))

def map_mds_services_by_socket(self):
"""
Expand Down
37 changes: 28 additions & 9 deletions ovs/lib/tests/generic_tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def test_clone_snapshot(self):
'storagedrivers': [(1, 1, 1)]} # (<id>, <vpool_id>, <storagerouter_id>)
)
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]
[dynamic for dynamic in vdisk_1._dynamics if dynamic.name == 'snapshots'][0].timeout = 0

base = datetime.datetime.now().date()
Expand Down Expand Up @@ -124,7 +125,8 @@ def test_clone_snapshot(self):
'is_consistent': True,
'timestamp': str(timestamp)})
base_timestamp = self._make_timestamp(base, datetime.timedelta(1) * 2)
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))
self.assertIn(base_snapshot_guid, vdisk_1.snapshot_ids, 'Snapshot was deleted while there are still clones of it')

def test_snapshot_automatic_consistent(self):
Expand All @@ -142,6 +144,7 @@ def test_snapshot_automatic_consistent(self):
)
base = datetime.datetime.now().date()
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]

label = 'c'
# Extra time to add to the hourly timestamps
Expand All @@ -161,7 +164,8 @@ def test_snapshot_automatic_consistent(self):
self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d')))

self._print_message('- Deleting snapshots')
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))

self._validate(vdisk=vdisk_1,
current_day=day,
Expand Down Expand Up @@ -195,6 +199,7 @@ def test_snapshot_automatic_not_consistent(self):
)
base = datetime.datetime.now().date()
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]

label = 'i'
# Extra time to add to the hourly timestamps
Expand All @@ -214,7 +219,8 @@ def test_snapshot_automatic_not_consistent(self):
self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d')))

self._print_message('- Deleting snapshots')
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))

self._validate(vdisk=vdisk_1,
current_day=day,
Expand Down Expand Up @@ -248,6 +254,7 @@ def test_snapshot_non_automatic_consistent(self):
)
base = datetime.datetime.now().date()
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]

label = 'c'
# Extra time to add to the hourly timestamps
Expand All @@ -267,7 +274,8 @@ def test_snapshot_non_automatic_consistent(self):
self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d')))

self._print_message('- Deleting snapshots')
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))

self._validate(vdisk=vdisk_1,
current_day=day,
Expand Down Expand Up @@ -301,6 +309,7 @@ def test_snapshot_not_automatic_not_consistent(self):
)
base = datetime.datetime.now().date()
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]

label = 'i'
# Extra time to add to the hourly timestamps
Expand All @@ -320,7 +329,8 @@ def test_snapshot_not_automatic_not_consistent(self):
self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d')))

self._print_message('- Deleting snapshots')
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))

self._validate(vdisk=vdisk_1,
current_day=day,
Expand Down Expand Up @@ -354,6 +364,7 @@ def test_snapshot_sticky(self):
)
base = datetime.datetime.now().date()
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]

label = 'c'
# Extra time to add to the hourly timestamps
Expand All @@ -373,7 +384,8 @@ def test_snapshot_sticky(self):
self._print_message('Day cycle: {0}: {1}'.format(day, datetime.datetime.fromtimestamp(base_timestamp).strftime('%Y-%m-%d')))

self._print_message('- Deleting snapshots')
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))

self._validate(vdisk=vdisk_1,
current_day=day,
Expand Down Expand Up @@ -405,6 +417,7 @@ def test_happypath(self):
'storagedrivers': [(1, 1, 1)]} # (<id>, <vpool_id>, <storagerouter_id>)
)
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]
[dynamic for dynamic in vdisk_1._dynamics if dynamic.name == 'snapshots'][0].timeout = 0

# Run the testing scenario
Expand All @@ -424,7 +437,8 @@ def test_happypath(self):

# At the start of the day, delete snapshot policy runs at 00:30
self._print_message('- Deleting snapshots')
GenericController.delete_snapshots(timestamp=base_timestamp + (minute * 30))
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid,
timestamp=base_timestamp + (minute * 30))

# Validate snapshots
self._print_message('- Validating snapshots')
Expand Down Expand Up @@ -468,6 +482,8 @@ def raise_an_exception(*args, **kwargs):
)

vdisk_1, vdisk_2 = structure['vdisks'].values()
storagedriver_1 = structure['storagedrivers'][1]

vdisks = [vdisk_1, vdisk_2]

for vdisk in vdisks:
Expand All @@ -481,7 +497,7 @@ def raise_an_exception(*args, **kwargs):
if vdisk == vdisk_1:
StorageRouterClient.delete_snapshot_callbacks[vdisk.volume_id] = {snapshot_id: raise_an_exception}
with self.assertRaises(RuntimeError):
GenericController.delete_snapshots()
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid)
self.assertEqual(1, len(vdisk_2.snapshot_ids), 'One snapshot should be removed for vdisk 2')
self.assertEqual(2, len(vdisk_1.snapshot_ids), 'No snapshots should be removed for vdisk 1')

Expand All @@ -500,6 +516,8 @@ def raise_an_exception(*args, **kwargs):
'storagedrivers': [(1, 1, 1)]} # (<id>, <vpool_id>, <storagerouter_id>)
)
vdisk_1 = structure['vdisks'][1]
storagedriver_1 = structure['storagedrivers'][1]

[dynamic for dynamic in vdisk_1._dynamics if dynamic.name == 'snapshots'][0].timeout = 0

for i in xrange(0, 2):
Expand All @@ -510,7 +528,8 @@ def raise_an_exception(*args, **kwargs):
snapshot_id = VDiskController.create_snapshot(vdisk_1.guid, metadata)
StorageRouterClient.delete_snapshot_callbacks[vdisk_1.volume_id] = {snapshot_id: raise_an_exception}

GenericController.delete_snapshots()
GenericController.delete_snapshots_storagedriver(storagedriver_guid=storagedriver_1.guid)
self.assertEqual(2, len(vdisk_1.snapshot_ids), 'No snapshots should be removed for vdisk 1')

##################
# HELPER METHODS #
Expand Down