From 66a06e9be7e1cd7fb0c6ea83ccade3b3e03bcdad Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Mon, 27 Apr 2020 14:33:17 -0400 Subject: [PATCH 1/4] changes to support new purge task executions and st2 purge workflow --- st2common/setup.py | 2 ++ st2common/st2common/models/db/__init__.py | 14 ++++++++++++++ st2common/st2common/persistence/workflow.py | 8 ++++++++ 3 files changed, 24 insertions(+) diff --git a/st2common/setup.py b/st2common/setup.py index 3e7bd4fb5c..fe975c3929 100644 --- a/st2common/setup.py +++ b/st2common/setup.py @@ -49,6 +49,8 @@ 'bin/st2-cleanup-db', 'bin/st2-register-content', 'bin/st2-purge-executions', + 'bin/st2-purge-workflows', + 'bin/st2-purge-task-executions', 'bin/st2-purge-trigger-instances', 'bin/st2-run-pack-tests', 'bin/st2ctl', diff --git a/st2common/st2common/models/db/__init__.py b/st2common/st2common/models/db/__init__.py index 098a8e45da..06392cccdf 100644 --- a/st2common/st2common/models/db/__init__.py +++ b/st2common/st2common/models/db/__init__.py @@ -475,6 +475,7 @@ def delete_by_query(self, *args, **query): """ Delete objects by query and return number of deleted objects. """ + super(self) qs = self.model.objects.filter(*args, **query) count = qs.delete() log_query_and_profile_data_for_queryset(queryset=qs) @@ -596,6 +597,19 @@ def save(self, instance, validate=True): return self._undo_dict_field_escape(instance) + def delete(self, instance): + return instance.delete() + + def delete_by_query(self, *args, **query): + """ + Delete objects by query and return number of deleted objects. + """ + qs = self.model.objects.filter(*args, **query) + count = qs.delete() + log_query_and_profile_data_for_queryset(queryset=qs) + + return count + def get_host_names_for_uri_dict(uri_dict): hosts = [] diff --git a/st2common/st2common/persistence/workflow.py b/st2common/st2common/persistence/workflow.py index aade4b648d..aa3f365475 100644 --- a/st2common/st2common/persistence/workflow.py +++ b/st2common/st2common/persistence/workflow.py @@ -41,6 +41,10 @@ def _get_publisher(cls): return cls.publisher + @classmethod + def delete_by_query(cls, *args, **query): + return cls._get_impl().delete_by_query(*args, **query) + class TaskExecution(persistence.StatusBasedResource): impl = db.ChangeRevisionMongoDBAccess(wf_db_models.TaskExecutionDB) @@ -49,3 +53,7 @@ class TaskExecution(persistence.StatusBasedResource): @classmethod def _get_impl(cls): return cls.impl + + @classmethod + def delete_by_query(cls, *args, **query): + return cls._get_impl().delete_by_query(*args, **query) From 5f1dab8bc63e437668196e335d852057fe04f9bc Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Tue, 28 Apr 2020 09:29:21 -0400 Subject: [PATCH 2/4] Support new purge task executions and st2 purge workflow --- st2common/bin/st2-purge-task-executions | 22 +++ st2common/bin/st2-purge-workflows | 22 +++ .../st2common/cmd/purge_task_executions.py | 88 ++++++++++ st2common/st2common/cmd/purge_workflows.py | 88 ++++++++++ .../st2common/garbage_collection/workflows.py | 160 ++++++++++++++++++ .../tests/unit/test_purge_task_executions.py | 72 ++++++++ st2common/tests/unit/test_purge_worklows.py | 72 ++++++++ 7 files changed, 524 insertions(+) create mode 100644 st2common/bin/st2-purge-task-executions create mode 100644 st2common/bin/st2-purge-workflows create mode 100644 st2common/st2common/cmd/purge_task_executions.py create mode 100644 st2common/st2common/cmd/purge_workflows.py create mode 100644 st2common/st2common/garbage_collection/workflows.py create mode 100644 st2common/tests/unit/test_purge_task_executions.py create mode 100644 st2common/tests/unit/test_purge_worklows.py diff --git a/st2common/bin/st2-purge-task-executions b/st2common/bin/st2-purge-task-executions new file mode 100644 index 0000000000..eb052676d7 --- /dev/null +++ b/st2common/bin/st2-purge-task-executions @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from st2common.cmd.purge_task_executions import main + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/st2common/bin/st2-purge-workflows b/st2common/bin/st2-purge-workflows new file mode 100644 index 0000000000..0480c22131 --- /dev/null +++ b/st2common/bin/st2-purge-workflows @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from st2common.cmd.purge_workflows import main + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/st2common/st2common/cmd/purge_task_executions.py b/st2common/st2common/cmd/purge_task_executions.py new file mode 100644 index 0000000000..2d6155c25b --- /dev/null +++ b/st2common/st2common/cmd/purge_task_executions.py @@ -0,0 +1,88 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +A utility script that purges st2 executions older than certain +timestamp. + +*** RISK RISK RISK. You will lose data. Run at your own risk. *** +""" + +from __future__ import absolute_import + +from datetime import datetime + +import six +import pytz +from oslo_config import cfg + +from st2common import config +from st2common import log as logging +from st2common.config import do_register_cli_opts +from st2common.script_setup import setup as common_setup +from st2common.script_setup import teardown as common_teardown +from st2common.constants.exit_codes import SUCCESS_EXIT_CODE +from st2common.constants.exit_codes import FAILURE_EXIT_CODE +from st2common.garbage_collection.workflows import purge_task_execution + +__all__ = [ + 'main' +] + +LOG = logging.getLogger(__name__) + + +def _register_cli_opts(): + cli_opts = [ + cfg.StrOpt('timestamp', default=None, + help='Will delete execution and liveaction models older than ' + + 'this UTC timestamp. ' + + 'Example value: 2015-03-13T19:01:27.255542Z.'), + cfg.StrOpt('action-ref', default='', + help='action-ref to delete executions for.'), + cfg.BoolOpt('purge-incomplete', default=False, + help='Purge all models irrespective of their ``status``.' + + 'By default, only executions in completed states such as "succeeeded" ' + + ', "failed", "canceled" and "timed_out" are deleted.'), + ] + do_register_cli_opts(cli_opts) + + +def main(): + _register_cli_opts() + common_setup(config=config, setup_db=True, register_mq_exchanges=False) + + # Get config values + timestamp = cfg.CONF.timestamp + action_ref = cfg.CONF.action_ref + purge_incomplete = cfg.CONF.purge_incomplete + + if not timestamp: + LOG.error('Please supply a timestamp for purging models. Aborting.') + return 1 + else: + timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') + timestamp = timestamp.replace(tzinfo=pytz.UTC) + + try: + purge_task_execution(logger=LOG, timestamp=timestamp, action_ref=action_ref, + purge_incomplete=purge_incomplete) + except Exception as e: + LOG.exception(six.text_type(e)) + return FAILURE_EXIT_CODE + finally: + common_teardown() + + return SUCCESS_EXIT_CODE diff --git a/st2common/st2common/cmd/purge_workflows.py b/st2common/st2common/cmd/purge_workflows.py new file mode 100644 index 0000000000..58479526c1 --- /dev/null +++ b/st2common/st2common/cmd/purge_workflows.py @@ -0,0 +1,88 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +A utility script that purges st2 executions older than certain +timestamp. + +*** RISK RISK RISK. You will lose data. Run at your own risk. *** +""" + +from __future__ import absolute_import + +from datetime import datetime + +import six +import pytz +from oslo_config import cfg + +from st2common import config +from st2common import log as logging +from st2common.config import do_register_cli_opts +from st2common.script_setup import setup as common_setup +from st2common.script_setup import teardown as common_teardown +from st2common.constants.exit_codes import SUCCESS_EXIT_CODE +from st2common.constants.exit_codes import FAILURE_EXIT_CODE +from st2common.garbage_collection.workflows import purge_workflow_execution + +__all__ = [ + 'main' +] + +LOG = logging.getLogger(__name__) + + +def _register_cli_opts(): + cli_opts = [ + cfg.StrOpt('timestamp', default=None, + help='Will delete execution and liveaction models older than ' + + 'this UTC timestamp. ' + + 'Example value: 2015-03-13T19:01:27.255542Z.'), + cfg.StrOpt('action-ref', default='', + help='action-ref to delete executions for.'), + cfg.BoolOpt('purge-incomplete', default=False, + help='Purge all models irrespective of their ``status``.' + + 'By default, only executions in completed states such as "succeeeded" ' + + ', "failed", "canceled" and "timed_out" are deleted.'), + ] + do_register_cli_opts(cli_opts) + + +def main(): + _register_cli_opts() + common_setup(config=config, setup_db=True, register_mq_exchanges=False) + + # Get config values + timestamp = cfg.CONF.timestamp + action_ref = cfg.CONF.action_ref + purge_incomplete = cfg.CONF.purge_incomplete + + if not timestamp: + LOG.error('Please supply a timestamp for purging models. Aborting.') + return 1 + else: + timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') + timestamp = timestamp.replace(tzinfo=pytz.UTC) + + try: + purge_workflow_execution(logger=LOG, timestamp=timestamp, action_ref=action_ref, + purge_incomplete=purge_incomplete) + except Exception as e: + LOG.exception(six.text_type(e)) + return FAILURE_EXIT_CODE + finally: + common_teardown() + + return SUCCESS_EXIT_CODE diff --git a/st2common/st2common/garbage_collection/workflows.py b/st2common/st2common/garbage_collection/workflows.py new file mode 100644 index 0000000000..8a2325d3e8 --- /dev/null +++ b/st2common/st2common/garbage_collection/workflows.py @@ -0,0 +1,160 @@ +# Copyright 2020 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module with utility functions for purging old action workflows. +""" +from __future__ import absolute_import + +import copy + +import six +from mongoengine.errors import InvalidQueryError + +from st2common.constants import action as action_constants +from st2common.persistence.workflow import WorkflowExecution +from st2common.persistence.workflow import TaskExecution + + +__all__ = [ + 'purge_workflow_execution', + 'purge_task_execution' +] + +#TODO: Are these valid too.. +DONE_STATES = [action_constants.LIVEACTION_STATUS_SUCCEEDED, + action_constants.LIVEACTION_STATUS_FAILED, + action_constants.LIVEACTION_STATUS_TIMED_OUT, + action_constants.LIVEACTION_STATUS_CANCELED] + + +def purge_workflow_execution(logger, timestamp, action_ref=None, purge_incomplete=False): + """ + Purge action executions and corresponding live action, execution output objects. + + :param timestamp: Exections older than this timestamp will be deleted. + :type timestamp: ``datetime.datetime + + :param action_ref: Only delete executions for the provided actions. + :type action_ref: ``str`` + + :param purge_incomplete: True to also delete executions which are not in a done state. + :type purge_incomplete: ``bool`` + """ + if not timestamp: + raise ValueError('Specify a valid timestamp to purge.') + + logger.info('Purging executions older than timestamp: %s' % + timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + + filters = {} + + if purge_incomplete: + filters['start_timestamp__lt'] = timestamp + else: + filters['end_timestamp__lt'] = timestamp + filters['start_timestamp__lt'] = timestamp + filters['status'] = {'$in': DONE_STATES} + + exec_filters = copy.copy(filters) + # TODO: Are these parameters valid. + # if action_ref: + # exec_filters['action__ref'] = action_ref + # + # liveaction_filters = copy.deepcopy(filters) + # if action_ref: + # liveaction_filters['action'] = action_ref + + # 1. Delete ActionExecutionDB objects + try: + # Note: We call list() on the query set object because it's lazyily evaluated otherwise + # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], + # no_dereference=True, + # **exec_filters)) + deleted_count = WorkflowExecution.delete_by_query(**exec_filters) + except InvalidQueryError as e: + msg = ('Bad query (%s) used to delete execution instances: %s' + 'Please contact support.' % (exec_filters, six.text_type(e))) + raise InvalidQueryError(msg) + except: + logger.exception('Deletion of execution models failed for query with filters: %s.', + exec_filters) + else: + logger.info('Deleted %s action execution objects' % deleted_count) + + zombie_execution_instances = len(WorkflowExecution.query(only_fields=['id'], + no_dereference=True, + **exec_filters)) + + if zombie_execution_instances > 0: + logger.error('Zombie execution instances left: %d.', zombie_execution_instances) + + # Print stats + logger.info('All execution models older than timestamp %s were deleted.', timestamp) + + +def purge_task_execution(logger, timestamp, action_ref=None, purge_incomplete=False): + """ + Purge action executions and corresponding live action, execution output objects. + + :param timestamp: Exections older than this timestamp will be deleted. + :type timestamp: ``datetime.datetime + + :param action_ref: Only delete executions for the provided actions. + :type action_ref: ``str`` + + :param purge_incomplete: True to also delete executions which are not in a done state. + :type purge_incomplete: ``bool`` + """ + if not timestamp: + raise ValueError('Specify a valid timestamp to purge.') + + logger.info('Purging executions older than timestamp: %s' % + timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + + filters = {} + + if purge_incomplete: + filters['start_timestamp__lt'] = timestamp + else: + filters['end_timestamp__lt'] = timestamp + filters['start_timestamp__lt'] = timestamp + filters['status'] = {'$in': DONE_STATES} + + exec_filters = copy.copy(filters) + try: + # Note: We call list() on the query set object because it's lazyily evaluated otherwise + # to_delete_execution_dbs = list(WorkflowExecution.query(only_fields=['id'], + # no_dereference=True, + # **exec_filters)) + deleted_count = TaskExecution.delete_by_query(**exec_filters) + except InvalidQueryError as e: + msg = ('Bad query (%s) used to delete execution instances: %s' + 'Please contact support.' % (exec_filters, six.text_type(e))) + raise InvalidQueryError(msg) + except: + logger.exception('Deletion of execution models failed for query with filters: %s.', + exec_filters) + else: + logger.info('Deleted %s action execution objects' % deleted_count) + + zombie_execution_instances = len(WorkflowExecution.query(only_fields=['id'], + no_dereference=True, + **exec_filters)) + + if zombie_execution_instances > 0: + logger.error('Zombie execution instances left: %d.', zombie_execution_instances) + + # Print stats + logger.info('All execution models older than timestamp %s were deleted.', timestamp) \ No newline at end of file diff --git a/st2common/tests/unit/test_purge_task_executions.py b/st2common/tests/unit/test_purge_task_executions.py new file mode 100644 index 0000000000..e67934d660 --- /dev/null +++ b/st2common/tests/unit/test_purge_task_executions.py @@ -0,0 +1,72 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from datetime import timedelta + +from st2common import log as logging +from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED +from st2common.garbage_collection.workflows import purge_task_execution +from st2common.models.db.workflow import TaskExecutionDB +from st2common.persistence.workflow import TaskExecution +from st2common.util import date as date_utils +from st2tests.base import CleanDbTestCase + +LOG = logging.getLogger(__name__) + + +class TestPurgeTaskExecutionInstances(CleanDbTestCase): + + @classmethod + def setUpClass(cls): + CleanDbTestCase.setUpClass() + super(TestPurgeTaskExecutionInstances, cls).setUpClass() + + def setUp(self): + super(TestPurgeTaskExecutionInstances, self).setUp() + + def test_no_timestamp_doesnt_delete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='succeeded') + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 1) + expected_msg = 'Specify a valid timestamp' + self.assertRaisesRegexp(ValueError, expected_msg, + purge_task_execution, + logger=LOG, timestamp=None) + self.assertEqual(len(TaskExecution.get_all()), 1) + + def test_purge(self): + now = date_utils.get_datetime_utc_now() + + instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='failed') + TaskExecution.add_or_update(instance_db) + + instance_db = TaskExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=5), + status='canceled') + TaskExecution.add_or_update(instance_db) + + self.assertEqual(len(TaskExecution.get_all()), 2) + purge_task_execution(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(TaskExecution.get_all()), 1) diff --git a/st2common/tests/unit/test_purge_worklows.py b/st2common/tests/unit/test_purge_worklows.py new file mode 100644 index 0000000000..5f6620c7ff --- /dev/null +++ b/st2common/tests/unit/test_purge_worklows.py @@ -0,0 +1,72 @@ +# Copyright 2019 Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from datetime import timedelta + +from st2common import log as logging +from st2common.constants.triggers import TRIGGER_INSTANCE_PROCESSED +from st2common.garbage_collection.workflows import purge_workflow_execution +from st2common.models.db.workflow import WorkflowExecutionDB +from st2common.persistence.workflow import WorkflowExecution +from st2common.util import date as date_utils +from st2tests.base import CleanDbTestCase + +LOG = logging.getLogger(__name__) + + +class TestPurgeWorkflowExecutionInstances(CleanDbTestCase): + + @classmethod + def setUpClass(cls): + CleanDbTestCase.setUpClass() + super(TestPurgeWorkflowExecutionInstances, cls).setUpClass() + + def setUp(self): + super(TestPurgeWorkflowExecutionInstances, self).setUp() + + def test_no_timestamp_doesnt_delete(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='running') + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 1) + expected_msg = 'Specify a valid timestamp' + self.assertRaisesRegexp(ValueError, expected_msg, + purge_workflow_execution, + logger=LOG, timestamp=None) + self.assertEqual(len(WorkflowExecution.get_all()), 1) + + def test_purge(self): + now = date_utils.get_datetime_utc_now() + + instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=20), + status='running') + WorkflowExecution.add_or_update(instance_db) + + instance_db = WorkflowExecutionDB(trigger='purge_tool.dummy.trigger', + payload={'hola': 'hi', 'kuraci': 'chicken'}, + occurrence_time=now - timedelta(days=5), + status='succeeded') + WorkflowExecution.add_or_update(instance_db) + + self.assertEqual(len(WorkflowExecution.get_all()), 2) + purge_workflow_execution(logger=LOG, timestamp=now - timedelta(days=10)) + self.assertEqual(len(WorkflowExecution.get_all()), 1) From 36c7fdf50b6675df4c1fd030cc08661e196ef551 Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Wed, 24 Feb 2021 14:33:56 -0500 Subject: [PATCH 3/4] Call the purge workflow and task execution methods in the garbage collector. --- conf/st2.conf.sample | 5 ++ .../st2reactor/garbage_collector/base.py | 67 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 758b743e75..c9956b5016 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -165,6 +165,11 @@ purge_inquiries = False sleep_delay = 2 # Trigger instances older than this value (days) will be automatically deleted. trigger_instances_ttl = None +# Workflow execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. +workflow_execution_ttl = 7 +# Task execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted. +task_execution_ttl = 7 + [keyvalue] # Allow encryption of values in key value stored qualified as "secret". diff --git a/st2reactor/st2reactor/garbage_collector/base.py b/st2reactor/st2reactor/garbage_collector/base.py index 3261458677..0e2f529f4c 100644 --- a/st2reactor/st2reactor/garbage_collector/base.py +++ b/st2reactor/st2reactor/garbage_collector/base.py @@ -40,6 +40,7 @@ from st2common.garbage_collection.executions import purge_execution_output_objects from st2common.garbage_collection.executions import purge_orphaned_workflow_executions from st2common.garbage_collection.inquiries import purge_inquiries +from st2common.garbage_collection.workflows import purge_workflow_execution, purge_task_execution from st2common.garbage_collection.trigger_instances import purge_trigger_instances __all__ = [ @@ -68,6 +69,8 @@ def __init__(self, collection_interval=DEFAULT_COLLECTION_INTERVAL, self._trigger_instances_ttl = cfg.CONF.garbagecollector.trigger_instances_ttl self._purge_inquiries = cfg.CONF.garbagecollector.purge_inquiries self._workflow_execution_max_idle = cfg.CONF.workflow_engine.gc_max_idle_sec + self._workflow_execution_ttl = cfg.CONF.garbagecollector.workflow_execution_ttl + self._task_execution_ttl = cfg.CONF.garbagecollector.task_execution_ttl self._validate_ttl_values() @@ -181,6 +184,22 @@ def _perform_garbage_collection(self): else: LOG.debug(skip_message, obj_type) + obj_type = 'task executions' + if self._task_execution_ttl and self._task_execution_ttl >= MINIMUM_TTL_DAYS: + LOG.info(proc_message, obj_type) + self._purge_task_executions() + concurrency.sleep(self._sleep_delay) + else: + LOG.debug(skip_message, obj_type) + + obj_type = 'workflow executions' + if self._workflow_execution_ttl and self._workflow_execution_ttl >= MINIMUM_TTL_DAYS: + LOG.info(proc_message, obj_type) + self._purge_workflow_executions() + concurrency.sleep(self._sleep_delay) + else: + LOG.debug(skip_message, obj_type) + def _purge_action_executions(self): """ Purge action executions and corresponding live action, stdout and stderr object which match @@ -205,6 +224,54 @@ def _purge_action_executions(self): return True + def _purge_workflow_executions(self): + """ + Purge workflow executions and corresponding live action, stdout and stderr object which match + the criteria defined in the config. + """ + utc_now = get_datetime_utc_now() + timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl)) + + # Another sanity check to make sure we don't delete new executions + if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): + raise ValueError('Calculated timestamp would violate the minimum TTL constraint') + + timestamp_str = isotime.format(dt=timestamp) + LOG.info('Deleting workflow executions older than: %s' % (timestamp_str)) + + assert timestamp < utc_now + + try: + purge_workflow_execution(logger=LOG, timestamp=timestamp) + except Exception as e: + LOG.exception('Failed to delete workflow executions: %s' % (six.text_type(e))) + + return True + + def _purge_task_executions(self): + """ + Purge task executions and corresponding live action, stdout and stderr object which match + the criteria defined in the config. + """ + utc_now = get_datetime_utc_now() + timestamp = (utc_now - datetime.timedelta(days=self._action_executions_ttl)) + + # Another sanity check to make sure we don't delete new executions + if timestamp > (utc_now - datetime.timedelta(days=MINIMUM_TTL_DAYS)): + raise ValueError('Calculated timestamp would violate the minimum TTL constraint') + + timestamp_str = isotime.format(dt=timestamp) + LOG.info('Deleting task executions older than: %s' % (timestamp_str)) + + assert timestamp < utc_now + + try: + purge_task_execution(logger=LOG, timestamp=timestamp) + except Exception as e: + LOG.exception('Failed to delete task executions: %s' % (six.text_type(e))) + + return True + def _purge_action_executions_output(self): utc_now = get_datetime_utc_now() timestamp = (utc_now - datetime.timedelta(days=self._action_executions_output_ttl)) From e9d9cbbd2945c4348fa0182535db523eb838407b Mon Sep 17 00:00:00 2001 From: Sri Mandaleeka Date: Wed, 24 Feb 2021 15:38:09 -0500 Subject: [PATCH 4/4] Fix unit tests ====================================================================== 1) ERROR: test_run (test_action_unload.UnloadActionTestCase) ---------------------------------------------------------------------- Traceback (most recent call last): tests/test_action_unload.py line 100 in test_run action.run(packs=[pack]) actions/pack_mgmt/unload.py line 68 in run self._unregister_rules(pack=pack) actions/pack_mgmt/unload.py line 99 in _unregister_rules cleanup_trigger_db_for_rule(rule_db=rule_db) /home/runner/work/st2/st2/st2common/st2common/services/triggers.py line 332 in cleanup_trigger_db_for_rule Trigger.delete_if_unreferenced(existing_trigger_db) /home/runner/work/st2/st2/st2common/st2common/persistence/trigger.py line 61 in delete_if_unreferenced cls._get_impl().delete_by_query(**delete_query) /home/runner/work/st2/st2/st2common/st2common/models/db/__init__.py line 479 in delete_by_query super(self) TypeError: super() argument 1 must be type, not MongoDBAccess --- st2common/st2common/models/db/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/st2common/st2common/models/db/__init__.py b/st2common/st2common/models/db/__init__.py index 93704ef542..d8572eb271 100644 --- a/st2common/st2common/models/db/__init__.py +++ b/st2common/st2common/models/db/__init__.py @@ -476,7 +476,6 @@ def delete_by_query(self, *args, **query): """ Delete objects by query and return number of deleted objects. """ - super(self) qs = self.model.objects.filter(*args, **query) count = qs.delete() log_query_and_profile_data_for_queryset(queryset=qs)